Source code for core.model_scorer


from config.settings import DATA_DIR
import logging
from pathlib import Path
from collections import deque
from datetime import datetime, timedelta
from typing import Dict, Optional, List
from dataclasses import dataclass, field
import json

logger = logging.getLogger(__name__)

[docs] @dataclass class PredictionOutcome: """Single prediction outcome for tracking""" model_name: str timeframe: str predicted_signal: str # BUY, SELL, NEUTRAL actual_outcome: str # PROFIT, LOSS, BREAKEVEN confidence: float timestamp: datetime = field(default_factory=datetime.now)
[docs] class ModelScorer: """ Dynamic model performance scorer. Tracks prediction accuracy and adjusts weights based on recent performance. Uses exponential moving average for smooth weight transitions. """ # Configuration HISTORY_SIZE = 100 # Track last N predictions per model MIN_SAMPLES = 10 # Minimum samples before adjusting weights DYNAMIC_WEIGHT = 0.3 # Blend: 30% dynamic, 70% static EMA_ALPHA = 0.1 # EMA smoothing factor def __init__(self, data_dir: Path = None): self.data_dir = data_dir or (DATA_DIR / "scoring") self.data_dir.mkdir(parents=True, exist_ok=True) # Model performance history: {model_name: deque of outcomes} self.history: Dict[str, deque] = {} # Current performance scores: {model_name: rolling_accuracy} self.scores: Dict[str, float] = {} # EMA scores for smooth transitions self.ema_scores: Dict[str, float] = {} # Default models self.models = ['xgboost', 'lightgbm', 'randomforest', 'catboost', 'stacking'] # Initialize history for each model for model in self.models: self.history[model] = deque(maxlen=self.HISTORY_SIZE) self.scores[model] = 0.5 # Start at 50% (neutral) self.ema_scores[model] = 0.5 # Load persisted scores self._load_scores() logger.info(f"[ModelScorer] Initialized with {len(self.models)} models")
[docs] def record_outcome( self, model_name: str, timeframe: str, predicted_signal: str, actual_outcome: str, confidence: float = 0.5 ): """ Record a prediction outcome for a model. Args: model_name: xgboost, lightgbm, randomforest, stacking timeframe: 1m, 5m, 15m, 30m, 1h, 4h predicted_signal: BUY, SELL, NEUTRAL actual_outcome: PROFIT, LOSS, BREAKEVEN confidence: Model's confidence [0, 1] """ if model_name not in self.history: self.history[model_name] = deque(maxlen=self.HISTORY_SIZE) self.scores[model_name] = 0.5 self.ema_scores[model_name] = 0.5 outcome = PredictionOutcome( model_name=model_name, timeframe=timeframe, predicted_signal=predicted_signal, actual_outcome=actual_outcome, confidence=confidence, timestamp=datetime.now() ) self.history[model_name].append(outcome) # Recalculate score self._update_score(model_name) # Persist periodically (every 10 outcomes) total_outcomes = sum(len(h) for h in self.history.values()) if total_outcomes % 10 == 0: self._save_scores() logger.debug(f"[ModelScorer] Recorded {model_name} {timeframe}: {predicted_signal}{actual_outcome}")
def _update_score(self, model_name: str): """Update rolling accuracy score for a model""" history = self.history.get(model_name, []) if len(history) < self.MIN_SAMPLES: return # Not enough data # Calculate accuracy (PROFIT = correct, LOSS = incorrect) correct = sum(1 for o in history if o.actual_outcome == 'PROFIT') total = len(history) raw_accuracy = correct / total if total > 0 else 0.5 # Update EMA score old_ema = self.ema_scores.get(model_name, 0.5) new_ema = self.EMA_ALPHA * raw_accuracy + (1 - self.EMA_ALPHA) * old_ema self.scores[model_name] = raw_accuracy self.ema_scores[model_name] = new_ema logger.debug(f"[ModelScorer] {model_name}: raw={raw_accuracy:.2%}, ema={new_ema:.2%}")
[docs] def get_dynamic_weights( self, base_weights: Dict[str, float], model_role_map: Dict[str, str] = None ) -> Dict[str, float]: """ Get dynamically adjusted weights based on recent performance. Blends static weights (70%) with dynamic performance (30%). Args: base_weights: Static weights from config {role: weight} model_role_map: Map model name to role {xgboost: quant, ...} Returns: Adjusted weights {role: weight} """ if model_role_map is None: model_role_map = { 'xgboost': 'quant', 'lightgbm': 'archivist', 'randomforest': 'futurist', 'catboost': 'guardian', 'stacking': 'leader' } # Check if we have enough data total_samples = sum(len(h) for h in self.history.values()) if total_samples < self.MIN_SAMPLES * len(self.models): logger.debug("[ModelScorer] Not enough samples, using static weights") return base_weights.copy() # Calculate performance-based weight adjustments adjusted = {} for model, role in model_role_map.items(): base_w = base_weights.get(role, 0.25) # Get EMA score (0.5 = neutral, >0.5 = good, <0.5 = poor) ema = self.ema_scores.get(model, 0.5) # Calculate adjustment factor (0.5 → 1.0, 0.7 → 1.4, 0.3 → 0.6) adjustment = 0.5 + ema # Range: [0.5, 1.5] # Blend: 70% static + 30% dynamic dynamic_w = base_w * adjustment final_w = (1 - self.DYNAMIC_WEIGHT) * base_w + self.DYNAMIC_WEIGHT * dynamic_w adjusted[role] = final_w # Normalize to sum to 1 total = sum(adjusted.values()) if total > 0: adjusted = {k: v / total for k, v in adjusted.items()} # Log significant changes for role in adjusted: if role in base_weights: diff = adjusted[role] - base_weights[role] if abs(diff) > 0.05: logger.info(f"[ModelScorer] {role}: {base_weights[role]:.1%}{adjusted[role]:.1%} ({diff:+.1%})") return adjusted
[docs] def get_model_stats(self) -> Dict[str, Dict]: """Get performance statistics for all models""" stats = {} for model in self.models: history = self.history.get(model, []) if len(history) == 0: stats[model] = { 'samples': 0, 'accuracy': None, 'ema_accuracy': 0.5, 'last_update': None } continue correct = sum(1 for o in history if o.actual_outcome == 'PROFIT') stats[model] = { 'samples': len(history), 'accuracy': correct / len(history), 'ema_accuracy': self.ema_scores.get(model, 0.5), 'last_update': history[-1].timestamp.isoformat() if history else None } return stats
def _save_scores(self): """Persist scores to disk""" try: data = { 'scores': self.scores, 'ema_scores': self.ema_scores, 'updated_at': datetime.now().isoformat() } path = self.data_dir / 'model_scores.json' with open(path, 'w') as f: json.dump(data, f, indent=2) logger.debug(f"[ModelScorer] Scores saved to {path}") except Exception as e: logger.warning(f"[ModelScorer] Failed to save scores: {e}") def _load_scores(self): """Load persisted scores from disk""" try: path = self.data_dir / 'model_scores.json' if path.exists(): with open(path, 'r') as f: data = json.load(f) self.scores = data.get('scores', self.scores) self.ema_scores = data.get('ema_scores', self.ema_scores) logger.info(f"[ModelScorer] Loaded scores from {path}") except Exception as e: logger.warning(f"[ModelScorer] Failed to load scores: {e}")
# Singleton instance _scorer_instance: Optional[ModelScorer] = None
[docs] def get_model_scorer() -> ModelScorer: """Get singleton ModelScorer instance""" global _scorer_instance if _scorer_instance is None: _scorer_instance = ModelScorer() return _scorer_instance