Source code for core.training_manager

"""
Cerebrum Forex - Training Manager V2
Orchestrates model training with KPI-based labeling, walk-forward validation,
and Congress Engine integration.

Key features:
- KPI-based labeling (Noble Safe Range)
- Quantile feature normalization
- Walk-forward cross-validation
- Weighted loss (NEUTRAL penalized)
- Drift detection for conditional retraining
"""

import logging
import threading
import time
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Callable, Dict, List, Tuple

import numpy as np
import pandas as pd

from config.settings import TIMEFRAMES, MODELS_DIR, TIMEFRAME_NAMES
from core.mt5_connector import MT5Connector
from core.feature_engine import FeatureEngine
from core.labeling_engine import LabelingEngine, get_labeling_engine
from core.feature_normalizer import FeatureNormalizer
from core.walk_forward import WalkForwardValidator, get_walk_forward_validator
from core.drift_detector import DriftDetector, get_drift_detector
from database import get_database
from models.xgboost_model import XGBoostModel
from models.lightgbm_model import LightGBMModel
from models.randomforest_model import RandomForestModel
from models.catboost_model import CatBoostModel
from models.stacking_model import StackingModel
from models.meta_model import MetaModel

from core.regime_detector import RegimeDetector
from core.audit_cache import get_audit_cache
from core.debug_manager import get_debug_manager
from core.model_validator import ModelValidator
from core.sentinel import get_sentinel

logger = logging.getLogger(__name__)

# --- GLOBAL RANDOM SEED FOR REPRODUCIBILITY ---
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

[docs] class TrainingLogBridge(logging.Handler): """Bridge for standard logging -> Training Manager UI callbacks""" def __init__(self, tm_callback): super().__init__() self.tm_callback = tm_callback self.setFormatter(logging.Formatter('%(message)s'))
[docs] def emit(self, record): if not self.tm_callback: return msg = self.format(record) level = "info" if record.levelno >= logging.ERROR: level = "error" elif record.levelno >= logging.WARNING: level = "warning" try: self.tm_callback("log", {"level": level, "message": msg}) except Exception: pass
[docs] class RecoveryRequired(Exception): """Internal signal to restart timeframe training in Global mode""" pass
[docs] class TrainingManager: """ Manages training of all models across all timeframes. V2 features: - KPI-based labeling using Noble Safe Range - Walk-forward validation (no shuffle) - Quantile normalization for features - Weighted loss penalizing NEUTRAL - Drift detection for conditional retraining """ def __init__(self, mt5_connector: MT5Connector): self.mt5 = mt5_connector self.feature_engine = FeatureEngine() self.labeling_engine = get_labeling_engine() self.drift_detector = get_drift_detector() self.validator = ModelValidator(min_accuracy=0.0) self.models: Dict[str, Dict[str, object]] = {} # Feature normalizers per timeframe self.normalizers: Dict[str, FeatureNormalizer] = {} # Training state self.is_training = False self.current_training_tf: Optional[str] = None self.training_thread: Optional[threading.Thread] = None self._stop_training = threading.Event() # LRU Model Cache (Timeframe based) self._loaded_tfs: List[str] = [] self._max_loaded_tfs = 2 # Standard self._lock = threading.Lock() # Callbacks self._callbacks: List[Callable] = [] # Logging bridge self._log_handler = TrainingLogBridge(self._notify) self._log_handler.setLevel(logging.INFO) # Filter to only catch relevant modules or just everything during training # We'll attach it to the parent logger or specific ones self._bridge_active = False # Initialize models for each timeframe self._init_models() def _init_models(self): """Initialize models for all timeframes""" MODELS_DIR.mkdir(parents=True, exist_ok=True) for tf_name in TIMEFRAME_NAMES: self.models[tf_name] = { 'xgboost': XGBoostModel(tf_name, MODELS_DIR), 'lightgbm': LightGBMModel(tf_name, MODELS_DIR), 'randomforest': RandomForestModel(tf_name, MODELS_DIR), 'catboost': CatBoostModel(tf_name, MODELS_DIR), 'stacking': StackingModel(tf_name, MODELS_DIR), } self.normalizers[tf_name] = FeatureNormalizer()
[docs] def add_callback(self, callback: Callable): """Add callback for training status updates""" self._callbacks.append(callback)
def _notify(self, event: str, data: dict): """Notify all callbacks""" for callback in self._callbacks: try: callback(event, data) except Exception as e: # Don't use logger here if bridge is active to avoid recursion if not self._bridge_active: logger.error(f"Callback error: {e}") def _enable_log_bridge(self, enabled: bool = True): """Attach/Detach log bridge to global logging""" root_logger = logging.getLogger() if enabled and not self._bridge_active: root_logger.addHandler(self._log_handler) self._bridge_active = True logger.info("📡 Live Log Bridge: ACTIVE") elif not enabled and self._bridge_active: root_logger.removeHandler(self._log_handler) self._bridge_active = False logger.info("📡 Live Log Bridge: DEACTIVATED") def _validate_and_log(self, timeframe: str, model_type: str, accuracy: float, is_incremental: bool) -> bool: """ Validates model and logs appropriate English messages. Returns True if RECOVERY (Global Retrain) is required. """ model = self.get_model(timeframe, model_type) if not model: return False # Accuracy is passed as parameter. Validator checks file existence/size too. # However, validator checks file existence/size too. path = model.model_path healthy, reason = self.validator.is_healthy(path, model_type, accuracy) if not healthy: # ONLY block on Integrity/File issues now. Accuracy is "libre". msg = f"⚠️ Integrity issue detected: {reason}. Rebuilding via Global training..." if is_incremental: logger.warning(f"[{timeframe}] {model_type.upper()} {msg}") self._notify("log", {"level": "warning", "message": f"[{timeframe}] {msg}"}) return True # Trigger Recovery for integrity else: # NOTE: We do NOT call Sentinel here to avoid infinite loop. # The caller (train_timeframe) will handle the failure gracefully. fail_msg = f"⚠️ {model_type.upper()} integrity check failed. Continuing with fresh training..." logger.warning(f"[{timeframe}] {fail_msg}") self._notify("log", {"level": "warning", "message": f"[{timeframe}] {fail_msg}"}) # Just delete the bad model file if it exists try: if path.exists(): path.unlink() except Exception: pass return False # Continue training, don't loop # Healthy (Accuracy is always healthy now since min=0.0) logger.info(f"[{timeframe}] {model_type.upper()} Validation: OK (Accuracy: {accuracy:.1%})") if not is_incremental: success_msg = f"✅ Model Ready: {model_type.upper()} accuracy reported as {accuracy:.1%}." self._notify("log", {"level": "success", "message": f"[{timeframe}] {success_msg}"}) return False
[docs] def get_model(self, timeframe: str, model_type: str): """ Get a specific model with LRU loading/unloading logic. """ from config.settings import default_settings # Determine max loaded TFs based on profile profile = getattr(default_settings, "performance_profile", "BALANCED") if profile == "ECO": self._max_loaded_tfs = 1 elif profile == "PRO": self._max_loaded_tfs = 6 else: self._max_loaded_tfs = 2 with self._lock: # Check if this timeframe is already considered "active" if timeframe in self._loaded_tfs: # Move to end of LRU (Most Recent) self._loaded_tfs.remove(timeframe) self._loaded_tfs.append(timeframe) else: # New timeframe requested if len(self._loaded_tfs) >= self._max_loaded_tfs: # Evict least recently used timeframe old_tf = self._loaded_tfs.pop(0) logger.info(f"LRU EVICTION: Unloading {old_tf} models to save RAM (Profile: {profile})") for m_name in self.models.get(old_tf, {}): m_obj = self.models[old_tf][m_name] if hasattr(m_obj, 'unload'): m_obj.unload() # Add current to LRU self._loaded_tfs.append(timeframe) model = self.models.get(timeframe, {}).get(model_type) # Auto-load if needed (PredictionEngine also does this, but keeping it here for safety) if model and not model.is_trained: if model.model_path.exists(): model.load() return model
[docs] def get_normalizer(self, timeframe: str) -> FeatureNormalizer: """ Get feature normalizer for a timeframe. Auto-loads from disk if not already in memory/fitted. """ normalizer = self.normalizers.get(timeframe) if normalizer is None: normalizer = FeatureNormalizer() self.normalizers[timeframe] = normalizer # If not fitted, try to load from disk if not normalizer.is_fitted: norm_path = MODELS_DIR / f"normalizer_{timeframe}.pkl" if norm_path.exists(): if normalizer.load(norm_path): logger.debug(f"[{timeframe}] Auto-loaded normalizer from disk") else: logger.warning(f"[{timeframe}] Failed to load existing normalizer") else: # It's expected if never trained, but bad for prediction pass return normalizer
[docs] def get_training_status(self, timeframe: str) -> dict: """Get training status for a timeframe""" db = get_database() models_info = {} for model_type in ['xgboost', 'lightgbm', 'randomforest', 'catboost', 'stacking']: model = self.get_model(timeframe, model_type) last_training = db.get_last_training(timeframe, model_type) models_info[model_type] = { 'is_trained': model.is_trained if model else False, 'accuracy': model.accuracy if model else 0.0 } return { 'models': models_info, 'is_training': self.current_training_tf == timeframe, }
[docs] def should_train_incremental(self, timeframe: str) -> Tuple[bool, Optional[datetime]]: """ Determine if training should be incremental. Checks if models have been trained recently (within 6 hours). If so, returns (True, last_training_date) to train only on new data. Otherwise returns (False, None) for full training. Returns: Tuple of (is_incremental, train_from_date) """ db = get_database() # Check last training for any model last_dates = [] for model_type in ['xgboost', 'lightgbm', 'randomforest', 'catboost', 'stacking']: last_training = db.get_last_training(timeframe, model_type) if last_training and last_training.completed_at: completed = last_training.completed_at # Handle string dates from SQLite if isinstance(completed, str): try: completed = datetime.fromisoformat(completed.replace('Z', '+00:00')) except ValueError: try: completed = datetime.strptime(completed, "%Y-%m-%d %H:%M:%S.%f") except ValueError: completed = datetime.strptime(completed, "%Y-%m-%d %H:%M:%S") # Remove timezone info for comparison with naive datetime if hasattr(completed, 'replace') and completed.tzinfo is not None: completed = completed.replace(tzinfo=None) last_dates.append(completed) if not last_dates: # No previous training - do global training return False, None # Use oldest last training date oldest_training = min(last_dates) # If trained within last 6 hours, do incremental # We relax this check slightly to allow manual incremental triggers # But generally, incremental implies we have a baseline. return True, oldest_training
[docs] def prepare_training_data( self, timeframe: str, from_date: Optional[datetime] = None, use_kpi_labels: bool = True ) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], int, List[str], Optional[pd.DataFrame]]: """ Prepare features and labels for training. """ logger.info(f"─── DATA PREPARATION SECTION: {timeframe} ───") self._notify("phase_update", {"phase": "data", "status": "active", "detail": f"Loading {timeframe}..."}) # Load OHLC data logger.info(f"[{timeframe}] Loading OHLC from disk...") try: df = self.mt5.load_ohlc(timeframe) except Exception as e: logger.error(f"[{timeframe}] Failed to load OHLC: {e}") df = None if df is None: logger.warning(f"[{timeframe}] df is None after load") else: logger.debug(f"[{timeframe}] df shape: {df.shape}") # --- GENERAL STALENESS CHECK --- # Ensure data is fresh regardless of incremental settings if df is not None and not df.empty: try: # Ensure time column is datetime if not pd.api.types.is_datetime64_any_dtype(df['time']): df['time'] = pd.to_datetime(df['time']) last_local_date = df['time'].iloc[-1] now = datetime.now() if pd.isna(last_local_date): logger.warning(f"[{timeframe}] Last date is NaT! skipping staleness check.") else: # Calculate stale threshold based on timeframe from config.settings import get_timeframe_by_name tf_conf = get_timeframe_by_name(timeframe) minutes_per_candle = tf_conf.minutes if tf_conf else 60 is_stale = False age = now - last_local_date if age.total_seconds() > 24 * 3600: # Older than 24h is_stale = True elif age.total_seconds() > (minutes_per_candle * 60 * 5): # Missing > 5 candles is_stale = True if is_stale: logger.warning(f"[{timeframe}] Data appears stale (Last: {last_local_date}, Now: {now}).") self._notify("phase_update", {"phase": "data", "status": "warning", "detail": "Stale Data"}) except Exception as e: logger.error(f"Staleness check error: {e}") # SMART UPDATE CHECK (Incremental specifics): if from_date and df is not None and not df.empty: pass # HISTORICAL DATA CHECK: from config.settings import default_settings req_start_year = default_settings.extract_from_year req_start_date = pd.Timestamp(datetime(req_start_year, 1, 1)) if df is not None and not df.empty: first_date = df['time'].iloc[0] if first_date > (req_start_date + timedelta(days=30)): logger.warning(f"[{timeframe}] History starts at {first_date}, expected {req_start_date}.") if df is None or df.empty: logger.warning(f"[{timeframe}] No OHLC data found") self._notify("phase_update", {"phase": "data", "status": "error", "detail": "No Data"}) return None, None, 0, [], None logger.info(f"[{timeframe}] Loaded {len(df)} OHLC candles (from {df['time'].min()} to {df['time'].max()})") self._notify("phase_update", {"phase": "data", "status": "complete", "detail": f"{len(df):,}"}) # Store incremental filter date (will apply AFTER feature calculation) incremental_from_date = None if from_date: if isinstance(from_date, str): incremental_from_date = pd.to_datetime(from_date) else: incremental_from_date = pd.Timestamp(from_date) logger.info(f"[{timeframe}] Incremental training mode: will filter samples from {incremental_from_date} after feature calculation") # CHECK TOTAL DATA SUFFICIENCY (before filtering for incremental) if len(df) < 200: logger.warning(f"[{timeframe}] Not enough historical data for indicators: {len(df)} (need 200+)") return None, None, 0, [], None # Reset index for proper alignment df = df.reset_index(drop=True) # CREATE LABELS on FULL dataset (before dropna) if use_kpi_labels: self._notify("phase_update", {"phase": "labels", "status": "active", "detail": "Labeling..."}) logger.info(f"[{timeframe}] Creating KPI-based labels using Noble Safe Range...") labels = self.labeling_engine.create_labels(df, timeframe) # Label stats buy, sell, neutral = int(sum(labels == 2)), int(sum(labels == 0)), int(sum(labels == 1)) self._notify("phase_update", {"phase": "labels", "status": "complete", "detail": f"B:{buy:,}"}) else: labels = XGBoostModel(timeframe, MODELS_DIR).prepare_labels(df) # Store original indices for alignment original_len = len(df) # Calculate features on FULL data (includes dropna which removes some rows) logger.info(f"[{timeframe}] Calculating features (all indicators + Noble KPI)...") self._notify("phase_update", {"phase": "features", "status": "active", "detail": "Calculating..."}) def progress_log(msg): self._notify("log", {"level": "info", "message": msg}) try: # Calculate features (bypass 5000 row limit for training) df = self.feature_engine.calculate_all_features( df, timeframe=timeframe, progress_callback=progress_log, is_training=True ) except Exception as e: logger.error(f"[{timeframe}] Feature calculation failed: {e}. Triggering FE-200 Recovery...") # Index FE-200: Feature Cache Recovery get_sentinel(tm=self).repair("FE-200", {"timeframe": timeframe}) # One last try after cache wipe df = self.feature_engine.calculate_all_features( df, timeframe=timeframe, progress_callback=progress_log, is_training=True ) if df.empty: logger.warning(f"[{timeframe}] No features calculated") return None, None, 0, [], None # Get valid indices after dropna valid_indices = df.index if len(df) < original_len else np.arange(len(df)) # Get feature columns # Get feature columns exclude = ['time', 'open', 'high', 'low', 'close', 'volume'] all_cols = [c for c in df.columns if c not in exclude] # --- USER CONFIG FILTERING --- # Respect the "Enabled" checkboxes in the UI try: db = get_database() enabled_features = db.get_enabled_indicators() # Fallback if list is empty (user disabled everything by mistake) if not enabled_features or len(enabled_features) < 3: logger.warning(f"[{timeframe}] User disabled almost all indicators. Using ALL defaults for safety.") feature_cols = all_cols else: # Filter: Keep only enabled features (and special columns like lags which might not be in the list) # Note: Lags usually named 'close_lag_1' etc. might not be in the default list. # We should be permissive: If it's in the list, check status. If NOT in list (new feature), keep it or drop it? # Safer strategy: Filter ONLY those that explicitly appear in the config table. feature_cols = [] for col in all_cols: # Check exact match first if col in enabled_features: feature_cols.append(col) # Check if it's a lag/custom feature NOT in the DB config (Keep them to avoid breaking custom logic) # Or check if it's explicitly DISABLED (would need a get_disabled_indicators) # Simplified: If the DB returns a list of "Enabled", we assume anything NOT in the DB is either custom or unknown. # Let's trust the DB list covers the main indicators shown in UI. # If a feature is NOT in the DB at all (e.g. lag), we KEEP it to be safe. else: # Check if it exists in the ALL indicators list (to see if it was possible to disable it) # This is expensive. Let's just assume if it starts with 'return' or 'lag' it's internal. if any(x in col for x in ['lag', 'return', 'candle', 'volatility']): feature_cols.append(col) # Else, it might be an indicator that is disabled. # Wait, db.init_default_indicators() populates everything. # So if 'rsi' is not in enabled_features, it means it is Disabled. logger.info(f"[{timeframe}] Applied User Filter: {len(feature_cols)} features enabled (out of {len(all_cols)})") except Exception as e: logger.warning(f"Failed to apply indicator config: {e}") feature_cols = all_cols # --- DATA LEAKAGE FIX: Do NOT normalize here --- # We return raw features, splitting and normalization happens in train_timeframe # Get X from raw features X = df[feature_cols].values # Align labels with valid indices (after dropna) # FIX: Use precise index-based alignment. FeatureEngine now preserves indices. # This handles cases where rows are dropped in the middle (not just head). # Ensure labels are numpy array labels = np.array(labels) # Filter labels using the valid indices from the feature dataframe try: y = labels[df.index] except IndexError: # Fallback if indices are out of bounds (should not happen if df was reset before) logger.error(f"[{timeframe}] Index alignment failed. Fallback to tail slicing (RISKY).") y = labels[-len(X):] # --- FEATURE SELECTION (Agile Mode) --- # REMOVED from here to avoid DATA LEAKAGE. # Feature selection must be done on X_train ONLY inside train_timeframe. logger.info(f"[{timeframe}] Generated {len(feature_cols)} features for {len(X)} samples (RAW)") self._notify("phase_update", {"phase": "features", "status": "complete", "detail": f"{len(feature_cols)} feats"}) # Remove rows with NaN/Inf in features valid_mask = ~np.isnan(X).any(axis=1) & ~np.isinf(X).any(axis=1) X = X[valid_mask] y = y[valid_mask] df = df.iloc[valid_mask].reset_index(drop=True) # Keep df in sync for time filter # --- WINSORIZATION (Outlier Treatment) --- # User Feedback: "Pas de Winsorization/Clip Outliers" # We clip at 0.1/99.9 percentile to remove extreme noise while keeping signal. if len(X) > 100: for i in range(X.shape[1]): try: lower = np.percentile(X[:, i], 0.1) upper = np.percentile(X[:, i], 99.9) X[:, i] = np.clip(X[:, i], lower, upper) except Exception: continue # Replace remaining Infs (if any) with 0 X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0) if len(X) != len(y): logger.error(f"[{timeframe}] CRITICAL: X/y size mismatch: X={len(X)}, y={len(y)}") min_len = min(len(X), len(y)) X = X[:min_len] y = y[:min_len] # APPLY INCREMENTAL FILTER (after all features are ready) if incremental_from_date is not None: time_col = df['time'] mask = time_col >= incremental_from_date X = X[mask.values] if hasattr(mask, 'values') else X[mask] y = y[mask.values] if hasattr(mask, 'values') else y[mask] df = df[mask].reset_index(drop=True) logger.info(f"[{timeframe}] Incremental training: sliced to {len(X)} samples from {incremental_from_date}") # Check for class diversity unique_classes = np.unique(y) if len(unique_classes) < 2: msg = f"Incremental slice contains too few classes ({len(unique_classes)}). Skipping incremental training." logger.warning(f"[{timeframe}] {msg}") return None, None, 0, [], None if len(X) < 10: logger.info(f"[{timeframe}] Not enough new samples for incremental training: {len(X)} (need 10+)") return None, None, 0, [], None # --- FINAL RAM OPTIMIZATION --- logger.info(f"[{timeframe}] 🧹 Data preparation complete. Features: {len(feature_cols)}, Samples: {len(X)}") self._notify("phase_update", {"phase": "features", "status": "complete", "detail": f"{len(feature_cols)} feats"}) return X, y, len(X), feature_cols, df
def _filter_history_by_limit(self, df: pd.DataFrame, timeframe: str) -> pd.DataFrame: """ Enforce strict historical data limits for TurboTrain efficiency: 1m -> User defined (default 6 months) 5m -> 1 year 15m -> 3 years Others -> 5 years """ try: from config.settings import default_settings cutoff_months = 60 # Default 5 years if timeframe == '1m': cutoff_months = getattr(default_settings, 'history_1m_months', 6) elif timeframe == '5m': cutoff_months = 12 elif timeframe == '15m': cutoff_months = 36 cutoff_date = pd.Timestamp.now() - pd.DateOffset(months=cutoff_months) # Check if index is datetime if isinstance(df.index, pd.DatetimeIndex): if df.index[0] < cutoff_date: return df[df.index >= cutoff_date] elif 'time' in df.columns: # If time column exists ensure it matches # But typically we expect the DF passed here to be the one we can slice pass return df except Exception as e: logger.warning(f"History filter failed: {e}") return df
[docs] def train_timeframe(self, timeframe: str, force_global: bool = False, precomputed_data: tuple = None, _recovery_depth: int = 0, use_walk_forward: bool = False) -> dict: """ Train all models for a single timeframe. (Automatic Recovery with depth limit) Accepts optional precomputed_data to support Hybrid Parallel flow. Args: use_walk_forward: If True, run Walk-Forward CV before normal training to log fold metrics. """ # RECURSION PROTECTION: Max 1 retry to prevent infinite loops MAX_RECOVERY_DEPTH = 1 if _recovery_depth > MAX_RECOVERY_DEPTH: logger.error(f"[{timeframe}] Max recovery depth reached. Aborting training.") return {"error": "Max recovery depth exceeded"} logger.info(f"════════════════════════════════════════════════════════════") logger.info(f" TRAINING CYCLE START: {timeframe}") logger.info(f"════════════════════════════════════════════════════════════") db = get_database() results = {} # RECOVERY WRAPPER try: self.current_training_tf = timeframe self._notify("training_start", {"timeframe": timeframe}) # Check if models actually exist from config.settings import MODELS_DIR model_types = ["xgboost", "lightgbm", "randomforest", "catboost", "stacking"] models_exist = all( (MODELS_DIR / f"{mt}_{timeframe}.pkl").exists() for mt in model_types ) if not models_exist and not force_global: logger.info(f"[{timeframe}] Models missing -> Forcing Global Training") force_global = True # Determine training mode if force_global: is_incremental, from_date = False, None else: is_incremental, from_date = self.should_train_incremental(timeframe) self._notify("training_mode", { "timeframe": timeframe, "is_incremental": is_incremental, "from_date": from_date, }) # Prepare data (RAW) if precomputed_data: X, y, sample_count, feature_names, df = precomputed_data from_date = None # Not needed as data is already sliced else: self._notify("log", {"level": "info", "message": f"[{timeframe}] 📥 Preparing training data..."}) X, y, sample_count, feature_names, df = self.prepare_training_data( timeframe, from_date, use_kpi_labels=True ) if X is None: if is_incremental: logger.info(f"[{timeframe}] Incremental slice insufficient. Falling back to GLOBAL...") is_incremental = False from_date = None X, y, sample_count, feature_names, df = self.prepare_training_data(timeframe, None, use_kpi_labels=True) if X is None: logger.info(f"[{timeframe}] Training cycle skipped.") self._notify("phase_update", {"phase": "train_xgb", "status": "complete", "detail": "Up to date"}) return {"status": "skipped"} # --- OPTIONAL: WALK-FORWARD CROSS-VALIDATION --- if use_walk_forward and df is not None: logger.info(f"[{timeframe}] 🔄 Running Walk-Forward Cross-Validation...") self._notify("phase_update", {"phase": "wf_cv", "status": "active", "detail": "WF-CV..."}) try: from core.walk_forward import get_walk_forward_validator wf_validator = get_walk_forward_validator(train_months=6, val_months=1) wf_results = [] for train_idx, val_idx, fold_info in wf_validator.split(df, 'time'): X_train_fold, y_train_fold = X[train_idx], y[train_idx] X_val_fold, y_val_fold = X[val_idx], y[val_idx] # Quick evaluation with XGBoost only (fastest) from models.xgboost_model import XGBoostModel from config.settings import MODELS_DIR fold_model = XGBoostModel(timeframe, MODELS_DIR) fold_acc = fold_model.train(X_train_fold, y_train_fold, X_val_fold, y_val_fold) wf_results.append({ 'fold': fold_info['fold'], 'train_samples': fold_info['train_samples'], 'val_samples': fold_info['val_samples'], 'accuracy': fold_acc }) logger.info(f"[{timeframe}] WF Fold {fold_info['fold']}: Accuracy={fold_acc:.2%}") if wf_results: avg_wf_acc = np.mean([r['accuracy'] for r in wf_results]) logger.info(f"[{timeframe}] ✅ Walk-Forward CV Complete: {len(wf_results)} folds, Avg={avg_wf_acc:.2%}") results['walk_forward'] = {'folds': len(wf_results), 'avg_accuracy': avg_wf_acc, 'details': wf_results} self._notify("phase_update", {"phase": "wf_cv", "status": "complete", "detail": f"{avg_wf_acc:.1%}"}) except Exception as wf_err: logger.warning(f"[{timeframe}] Walk-Forward CV failed: {wf_err}") self._notify("phase_update", {"phase": "wf_cv", "status": "error", "detail": "Failed"}) # --- DATA SPLIT (3-WAY: Train/Val/Test) --- # 70% Train / 15% Validation / 15% Test (TRUE OUT-OF-SAMPLE HOLDOUT) n = len(X) train_end = int(n * 0.70) val_end = int(n * 0.85) # SPLIT RAW DATA FIRST (Before normalization/selection to prevent leakage) X_train_raw, y_train = X[:train_end], y[:train_end] X_val_raw, y_val = X[train_end:val_end], y[train_end:val_end] X_test_raw, y_test = X[val_end:], y[val_end:] # --- LEAK-PROOF FEATURE SELECTION (On X_train only) --- # Select Top 15 features using ONLY training data logger.info(f"[{timeframe}] 🛡️ Running Leak-Proof Feature Selection (Top 15)...") try: # Convert back to DF for feature engine (needs column names) df_train = pd.DataFrame(X_train_raw, columns=feature_names) selected_cols = self.feature_engine.select_best_features( df_train, y_train, n_features=15 # Slightly more permissive than 10 ) if selected_cols and len(selected_cols) >= 5: # Filter All Sets to selected columns indices = [feature_names.index(c) for c in selected_cols] X_train = X_train_raw[:, indices] X_val = X_val_raw[:, indices] X_test = X_test_raw[:, indices] feature_names = selected_cols # Update feature names logger.info(f"[{timeframe}] ✅ Selected {len(feature_names)} features: {feature_names}") else: X_train, X_val, X_test = X_train_raw, X_val_raw, X_test_raw logger.warning(f"[{timeframe}] Feature selection failed/insufficient. Using all features.") except Exception as fs_err: logger.error(f"[{timeframe}] Feature selection error: {fs_err}. Using all features.") X_train, X_val, X_test = X_train_raw, X_val_raw, X_test_raw logger.info(f"[{timeframe}] 🔀 3-WAY SPLIT: Train={len(X_train)}, Val={len(X_val)}, Test={len(X_test)}") # --- NORMALIZATION (Quantile-based) --- logger.info(f"[{timeframe}] 📊 Fitting normalizer on training data...") normalizer = self.normalizers[timeframe] # Create temp DataFrame for fit (using training data ONLY to avoid data leakage) X_train_df = pd.DataFrame(X_train, columns=feature_names) normalizer.fit(X_train_df, feature_names) # Transform all sets using the normalizer fitted on training data X_train = normalizer.transform_array(X_train, feature_names) X_val = normalizer.transform_array(X_val, feature_names) X_test = normalizer.transform_array(X_test, feature_names) # Save normalizer for inference norm_path = MODELS_DIR / f"normalizer_{timeframe}.pkl" normalizer.save(norm_path) self._notify("phase_update", {"phase": "normalize", "status": "complete", "detail": "Fitted"}) # --- CLASS WEIGHTS (Balanced based on distribution) --- from sklearn.utils.class_weight import compute_class_weight try: unique_classes = np.unique(y_train) weights = compute_class_weight('balanced', classes=unique_classes, y=y_train) class_weight_dict = dict(zip(unique_classes, weights)) # Reduce NEUTRAL (class 1) weight further to discourage trivial predictions if 1 in class_weight_dict: class_weight_dict[1] *= 0.3 # Penalize NEUTRAL predictions class_weights = np.array([class_weight_dict.get(y, 1.0) for y in y_train]) logger.info(f"[{timeframe}] Class weights: {class_weight_dict}") except Exception as cw_err: logger.warning(f"[{timeframe}] Class weight computation failed: {cw_err}. Using uniform weights.") class_weights = np.ones(len(y_train)) # --- TEMPORAL SAMPLE WEIGHTS (Decay) --- # Give more weight to recent data (linear decay from 1.0 to 0.5) try: # Create a time decay factor sample_count = len(y_train) # Linear increase from 0.5 (oldest) to 1.0 (newest) time_weights = np.linspace(0.5, 1.0, sample_count) # Combine class weights and time weights # class_weights is already an array of size sample_count (mapped from y) combined_weights = class_weights * time_weights # Normalize to keep sum roughly same (optional, but good for stability) combined_weights = combined_weights * (len(combined_weights) / combined_weights.sum()) class_weights = combined_weights logger.info(f"[{timeframe}] ⏳ Applied Temporal Sample Weighting (Old=0.5x -> New=1.0x)") except Exception as tw_err: logger.warning(f"[{timeframe}] Temporal weighting failed: {tw_err}") # --- PHASE 2: MODEL TRAINING --- for model_type in model_types: if self._stop_training.is_set(): break # --- UNIFIED LOGIC FOR ALL MODELS (Trees + Stacking) --- model = self.get_model(timeframe, model_type) if not model: continue # Assign feature names to the model for future alignment model.feature_names = feature_names log_id = db.log_training_start(timeframe, model_type, is_incremental) phase_key = f"train_{'xgb' if model_type=='xgboost' else 'lgb' if model_type=='lightgbm' else 'rf' if model_type=='randomforest' else 'cat' if model_type=='catboost' else 'stk'}" self._notify("phase_update", {"phase": phase_key, "status": "active", "detail": "Training..."}) try: acc = model.train(X_train, y_train, X_val, y_val, class_weights) # --- OUT-OF-SAMPLE EVALUATION (TRUE HOLDOUT) --- test_acc = None try: # FIX: usage of internal model for raw class prediction # The wrapper's .predict() returns (Signal, Confidence), we need classes [0, 1, 2] if hasattr(model, 'model') and hasattr(model.model, 'predict'): test_preds = model.model.predict(X_test) else: # Fallback if internal model not accessible (should not happen with our wrappers) logger.warning(f"[{timeframe}] Cannot access internal model for {model_type}. Skipping test eval.") test_preds = None if test_preds is not None: # Handle case where predict returns probabilities instead of classes if hasattr(test_preds, 'ndim') and test_preds.ndim == 2: test_preds = np.argmax(test_preds, axis=1) test_acc = float((test_preds == y_test).mean()) logger.info(f"[{timeframe}] 📊 {model_type.upper()} OUT-OF-SAMPLE Accuracy: {test_acc:.1%}") results[model_type] = {"accuracy": acc, "test_accuracy": test_acc, "is_incremental": is_incremental} else: results[model_type] = {"accuracy": acc, "is_incremental": is_incremental} except Exception as eval_err: logger.warning(f"[{timeframe}] Test evaluation failed: {eval_err}") results[model_type] = {"accuracy": acc, "is_incremental": is_incremental} # Log to database WITH test_accuracy db.log_training_complete(log_id, sample_count, acc, test_acc) self._notify("phase_update", {"phase": phase_key, "status": "complete", "detail": f"{acc:.1%}"}) except RecoveryRequired: raise except Exception as e: logger.error(f"[{timeframe}] {model_type.upper()} Failed: {e}") self._notify("phase_update", {"phase": phase_key, "status": "error", "detail": "Failed"}) return results except RecoveryRequired: logger.info(f"[{timeframe}] Recovery Triggered: Restarting in GLOBAL mode (depth={_recovery_depth + 1})...") return self.train_timeframe(timeframe, force_global=True, _recovery_depth=_recovery_depth + 1) except Exception as e: logger.error(f"Training failed for {timeframe}: {e}", exc_info=True) return {"error": str(e)} finally: self.current_training_tf = None self._notify("training_complete", {"timeframe": timeframe, "results": results})
[docs] def train_all(self, force_global: bool = False): """ HyperSafe Training (V4): Sequential, Memory-Safe Batch Training. Refactored to avoid OOM (Out of Memory) crashes by processing timeframes one-by-one and releasing memory immediately after each cycle. """ import gc from config.settings import TIMEFRAME_NAMES logger.info(f"🚀 [HyperSafe V4] STARTING RAM-OPTIMIZED TRAINING (ForceGlobal={force_global})") self._enable_log_bridge(True) results = {} try: for tf in TIMEFRAME_NAMES: if self._stop_training.is_set(): logger.warning(f"⏹ Training interrupted by user at timeframe {tf}") break logger.info(f"🧵 [HyperSafe] Processing Timeframe: {tf}...") try: # Execute training for this timeframe # DATA PREPARATION and MODEL FITTING happen inside sequential train_timeframe res = self.train_timeframe(tf, force_global) results[tf] = res # LOG SUCCESS if "error" in res: logger.error(f"❌ [{tf}] Training resulted in error: {res['error']}") else: logger.info(f"✅ [{tf}] Training completed successfully.") except Exception as e: logger.error(f"❌ [HyperSafe] Timeframe {tf} Failed: {e}", exc_info=True) results[tf] = {"error": str(e)} finally: # CRITICAL: Force garbage collection after EACH timeframe to free RAM gc.collect() time.sleep(0.5) # Give OS time to breathe # --- RESULTS ASSEMBLY --- total_success = sum(1 for tf in results if "error" not in results[tf] and results[tf].get("status") != "skipped") logger.info(f"🏁 [HyperSafe V4] COMPLETED. Success: {total_success}/{len(TIMEFRAME_NAMES)} timeframes.") # Cleanup features (delete temporary cache if configured) self._cleanup_features() except Exception as global_err: logger.error(f"❌ [HyperSafe] Global training loop failed: {global_err}") finally: self._enable_log_bridge(False) gc.collect() return results
[docs] def train_with_drift_check(self, timeframe: str) -> dict: """ Train only if drift is detected. Args: timeframe: Timeframe to check and potentially train Returns: Dict with drift status and training results """ # Get current features for drift check df = self.mt5.load_ohlc(timeframe) if df is None or df.empty: return {"error": "No data for drift check"} df = self.feature_engine.calculate_all_features(df, timeframe=timeframe) exclude = ['time', 'open', 'high', 'low', 'close', 'volume'] feature_cols = [c for c in df.columns if c not in exclude] # Check for drift drift_report = self.drift_detector.detect_drift(df[feature_cols]) result = { "drift_detected": drift_report.drift_detected, "should_retrain": drift_report.should_retrain, "psi_score": drift_report.psi_score, "reasons": drift_report.reasons, } if drift_report.should_retrain: logger.info(f"[{timeframe}] Drift detected, triggering retraining: {drift_report.reasons}") result["training_results"] = self.train_timeframe(timeframe, force_global=True) else: logger.info(f"[{timeframe}] No significant drift detected, skipping retraining") return result
[docs] def start_scheduled_training(self, interval_hours: int = 12): """Start scheduled training in background thread""" if self.training_thread and self.training_thread.is_alive(): logger.warning("Training already running") return self._stop_training.clear() def training_loop(): while not self._stop_training.is_set(): self.is_training = True self._notify("scheduled_training_start", {"interval_hours": interval_hours}) try: self.train_all() except Exception as e: logger.error(f"Scheduled training failed: {e}") self.is_training = False # Wait for next interval for _ in range(interval_hours * 3600): if self._stop_training.is_set(): break time.sleep(1) self.training_thread = threading.Thread(target=training_loop, daemon=True) self.training_thread.start() logger.info(f"Started scheduled training every {interval_hours} hours")
[docs] def stop_scheduled_training(self): """Stop scheduled training""" self._stop_training.set() self.is_training = False logger.info("Stopped scheduled training")
[docs] def load_all_models(self): """Load all saved models from disk""" for tf_name in TIMEFRAME_NAMES: # Load normalizer normalizer_path = MODELS_DIR / f"normalizer_{tf_name}.pkl" if normalizer_path.exists(): self.normalizers[tf_name].load(normalizer_path) # Load models for model_type in ['xgboost', 'lightgbm', 'randomforest', 'stacking']: model = self.get_model(tf_name, model_type) if model: model.load()
def _cleanup_features(self): """Clean up temporary feature parquet files after training to free disk space""" from config.settings import FEATURES_DIR try: if not FEATURES_DIR.exists(): return # Count files before cleanup parquet_files = list(FEATURES_DIR.glob("*.parquet")) if not parquet_files: logger.info("[Cleanup] No feature files to clean up") return total_size_mb = sum(f.stat().st_size for f in parquet_files) / (1024 * 1024) file_count = len(parquet_files) # Delete all parquet files for f in parquet_files: try: f.unlink() except Exception: pass logger.info(f"🧹 [Cleanup] Deleted {file_count} feature files ({total_size_mb:.1f} MB freed)") except Exception as e: logger.warning(f"[Cleanup] Failed to clean features: {e}")