from config.settings import DATA_DIR
import logging
from pathlib import Path
from collections import deque
from datetime import datetime, timedelta
from typing import Dict, Optional, List
from dataclasses import dataclass, field
import json
logger = logging.getLogger(__name__)
[docs]
@dataclass
class PredictionOutcome:
"""Single prediction outcome for tracking"""
model_name: str
timeframe: str
predicted_signal: str # BUY, SELL, NEUTRAL
actual_outcome: str # PROFIT, LOSS, BREAKEVEN
confidence: float
timestamp: datetime = field(default_factory=datetime.now)
[docs]
class ModelScorer:
"""
Dynamic model performance scorer.
Tracks prediction accuracy and adjusts weights based on recent performance.
Uses exponential moving average for smooth weight transitions.
"""
# Configuration
HISTORY_SIZE = 100 # Track last N predictions per model
MIN_SAMPLES = 10 # Minimum samples before adjusting weights
DYNAMIC_WEIGHT = 0.3 # Blend: 30% dynamic, 70% static
EMA_ALPHA = 0.1 # EMA smoothing factor
def __init__(self, data_dir: Path = None):
self.data_dir = data_dir or (DATA_DIR / "scoring")
self.data_dir.mkdir(parents=True, exist_ok=True)
# Model performance history: {model_name: deque of outcomes}
self.history: Dict[str, deque] = {}
# Current performance scores: {model_name: rolling_accuracy}
self.scores: Dict[str, float] = {}
# EMA scores for smooth transitions
self.ema_scores: Dict[str, float] = {}
# Default models
self.models = ['xgboost', 'lightgbm', 'randomforest', 'catboost', 'stacking']
# Initialize history for each model
for model in self.models:
self.history[model] = deque(maxlen=self.HISTORY_SIZE)
self.scores[model] = 0.5 # Start at 50% (neutral)
self.ema_scores[model] = 0.5
# Load persisted scores
self._load_scores()
logger.info(f"[ModelScorer] Initialized with {len(self.models)} models")
[docs]
def record_outcome(
self,
model_name: str,
timeframe: str,
predicted_signal: str,
actual_outcome: str,
confidence: float = 0.5
):
"""
Record a prediction outcome for a model.
Args:
model_name: xgboost, lightgbm, randomforest, stacking
timeframe: 1m, 5m, 15m, 30m, 1h, 4h
predicted_signal: BUY, SELL, NEUTRAL
actual_outcome: PROFIT, LOSS, BREAKEVEN
confidence: Model's confidence [0, 1]
"""
if model_name not in self.history:
self.history[model_name] = deque(maxlen=self.HISTORY_SIZE)
self.scores[model_name] = 0.5
self.ema_scores[model_name] = 0.5
outcome = PredictionOutcome(
model_name=model_name,
timeframe=timeframe,
predicted_signal=predicted_signal,
actual_outcome=actual_outcome,
confidence=confidence,
timestamp=datetime.now()
)
self.history[model_name].append(outcome)
# Recalculate score
self._update_score(model_name)
# Persist periodically (every 10 outcomes)
total_outcomes = sum(len(h) for h in self.history.values())
if total_outcomes % 10 == 0:
self._save_scores()
logger.debug(f"[ModelScorer] Recorded {model_name} {timeframe}: {predicted_signal} → {actual_outcome}")
def _update_score(self, model_name: str):
"""Update rolling accuracy score for a model"""
history = self.history.get(model_name, [])
if len(history) < self.MIN_SAMPLES:
return # Not enough data
# Calculate accuracy (PROFIT = correct, LOSS = incorrect)
correct = sum(1 for o in history if o.actual_outcome == 'PROFIT')
total = len(history)
raw_accuracy = correct / total if total > 0 else 0.5
# Update EMA score
old_ema = self.ema_scores.get(model_name, 0.5)
new_ema = self.EMA_ALPHA * raw_accuracy + (1 - self.EMA_ALPHA) * old_ema
self.scores[model_name] = raw_accuracy
self.ema_scores[model_name] = new_ema
logger.debug(f"[ModelScorer] {model_name}: raw={raw_accuracy:.2%}, ema={new_ema:.2%}")
[docs]
def get_dynamic_weights(
self,
base_weights: Dict[str, float],
model_role_map: Dict[str, str] = None
) -> Dict[str, float]:
"""
Get dynamically adjusted weights based on recent performance.
Blends static weights (70%) with dynamic performance (30%).
Args:
base_weights: Static weights from config {role: weight}
model_role_map: Map model name to role {xgboost: quant, ...}
Returns:
Adjusted weights {role: weight}
"""
if model_role_map is None:
model_role_map = {
'xgboost': 'quant',
'lightgbm': 'archivist',
'randomforest': 'futurist',
'catboost': 'guardian',
'stacking': 'leader'
}
# Check if we have enough data
total_samples = sum(len(h) for h in self.history.values())
if total_samples < self.MIN_SAMPLES * len(self.models):
logger.debug("[ModelScorer] Not enough samples, using static weights")
return base_weights.copy()
# Calculate performance-based weight adjustments
adjusted = {}
for model, role in model_role_map.items():
base_w = base_weights.get(role, 0.25)
# Get EMA score (0.5 = neutral, >0.5 = good, <0.5 = poor)
ema = self.ema_scores.get(model, 0.5)
# Calculate adjustment factor (0.5 → 1.0, 0.7 → 1.4, 0.3 → 0.6)
adjustment = 0.5 + ema # Range: [0.5, 1.5]
# Blend: 70% static + 30% dynamic
dynamic_w = base_w * adjustment
final_w = (1 - self.DYNAMIC_WEIGHT) * base_w + self.DYNAMIC_WEIGHT * dynamic_w
adjusted[role] = final_w
# Normalize to sum to 1
total = sum(adjusted.values())
if total > 0:
adjusted = {k: v / total for k, v in adjusted.items()}
# Log significant changes
for role in adjusted:
if role in base_weights:
diff = adjusted[role] - base_weights[role]
if abs(diff) > 0.05:
logger.info(f"[ModelScorer] {role}: {base_weights[role]:.1%} → {adjusted[role]:.1%} ({diff:+.1%})")
return adjusted
[docs]
def get_model_stats(self) -> Dict[str, Dict]:
"""Get performance statistics for all models"""
stats = {}
for model in self.models:
history = self.history.get(model, [])
if len(history) == 0:
stats[model] = {
'samples': 0,
'accuracy': None,
'ema_accuracy': 0.5,
'last_update': None
}
continue
correct = sum(1 for o in history if o.actual_outcome == 'PROFIT')
stats[model] = {
'samples': len(history),
'accuracy': correct / len(history),
'ema_accuracy': self.ema_scores.get(model, 0.5),
'last_update': history[-1].timestamp.isoformat() if history else None
}
return stats
def _save_scores(self):
"""Persist scores to disk"""
try:
data = {
'scores': self.scores,
'ema_scores': self.ema_scores,
'updated_at': datetime.now().isoformat()
}
path = self.data_dir / 'model_scores.json'
with open(path, 'w') as f:
json.dump(data, f, indent=2)
logger.debug(f"[ModelScorer] Scores saved to {path}")
except Exception as e:
logger.warning(f"[ModelScorer] Failed to save scores: {e}")
def _load_scores(self):
"""Load persisted scores from disk"""
try:
path = self.data_dir / 'model_scores.json'
if path.exists():
with open(path, 'r') as f:
data = json.load(f)
self.scores = data.get('scores', self.scores)
self.ema_scores = data.get('ema_scores', self.ema_scores)
logger.info(f"[ModelScorer] Loaded scores from {path}")
except Exception as e:
logger.warning(f"[ModelScorer] Failed to load scores: {e}")
# Singleton instance
_scorer_instance: Optional[ModelScorer] = None
[docs]
def get_model_scorer() -> ModelScorer:
"""Get singleton ModelScorer instance"""
global _scorer_instance
if _scorer_instance is None:
_scorer_instance = ModelScorer()
return _scorer_instance