"""
Cerebrum Forex - Training Manager V2
Orchestrates model training with KPI-based labeling, walk-forward validation,
and Congress Engine integration.
Key features:
- KPI-based labeling (Noble Safe Range)
- Quantile feature normalization
- Walk-forward cross-validation
- Weighted loss (NEUTRAL penalized)
- Drift detection for conditional retraining
"""
import logging
import threading
import time
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Callable, Dict, List, Tuple
import numpy as np
import pandas as pd
from config.settings import TIMEFRAMES, MODELS_DIR, TIMEFRAME_NAMES
from core.mt5_connector import MT5Connector
from core.feature_engine import FeatureEngine
from core.labeling_engine import LabelingEngine, get_labeling_engine
from core.feature_normalizer import FeatureNormalizer
from core.walk_forward import WalkForwardValidator, get_walk_forward_validator
from core.drift_detector import DriftDetector, get_drift_detector
from database import get_database
from models.xgboost_model import XGBoostModel
from models.lightgbm_model import LightGBMModel
from models.randomforest_model import RandomForestModel
from models.catboost_model import CatBoostModel
from models.stacking_model import StackingModel
from models.meta_model import MetaModel
from core.regime_detector import RegimeDetector
from core.audit_cache import get_audit_cache
from core.debug_manager import get_debug_manager
from core.model_validator import ModelValidator
from core.sentinel import get_sentinel
logger = logging.getLogger(__name__)
# --- GLOBAL RANDOM SEED FOR REPRODUCIBILITY ---
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
[docs]
class TrainingLogBridge(logging.Handler):
"""Bridge for standard logging -> Training Manager UI callbacks"""
def __init__(self, tm_callback):
super().__init__()
self.tm_callback = tm_callback
self.setFormatter(logging.Formatter('%(message)s'))
[docs]
def emit(self, record):
if not self.tm_callback: return
msg = self.format(record)
level = "info"
if record.levelno >= logging.ERROR: level = "error"
elif record.levelno >= logging.WARNING: level = "warning"
try:
self.tm_callback("log", {"level": level, "message": msg})
except Exception:
pass
[docs]
class RecoveryRequired(Exception):
"""Internal signal to restart timeframe training in Global mode"""
pass
[docs]
class TrainingManager:
"""
Manages training of all models across all timeframes.
V2 features:
- KPI-based labeling using Noble Safe Range
- Walk-forward validation (no shuffle)
- Quantile normalization for features
- Weighted loss penalizing NEUTRAL
- Drift detection for conditional retraining
"""
def __init__(self, mt5_connector: MT5Connector):
self.mt5 = mt5_connector
self.feature_engine = FeatureEngine()
self.labeling_engine = get_labeling_engine()
self.drift_detector = get_drift_detector()
self.validator = ModelValidator(min_accuracy=0.0)
self.models: Dict[str, Dict[str, object]] = {}
# Feature normalizers per timeframe
self.normalizers: Dict[str, FeatureNormalizer] = {}
# Training state
self.is_training = False
self.current_training_tf: Optional[str] = None
self.training_thread: Optional[threading.Thread] = None
self._stop_training = threading.Event()
# LRU Model Cache (Timeframe based)
self._loaded_tfs: List[str] = []
self._max_loaded_tfs = 2 # Standard
self._lock = threading.Lock()
# Callbacks
self._callbacks: List[Callable] = []
# Logging bridge
self._log_handler = TrainingLogBridge(self._notify)
self._log_handler.setLevel(logging.INFO)
# Filter to only catch relevant modules or just everything during training
# We'll attach it to the parent logger or specific ones
self._bridge_active = False
# Initialize models for each timeframe
self._init_models()
def _init_models(self):
"""Initialize models for all timeframes"""
MODELS_DIR.mkdir(parents=True, exist_ok=True)
for tf_name in TIMEFRAME_NAMES:
self.models[tf_name] = {
'xgboost': XGBoostModel(tf_name, MODELS_DIR),
'lightgbm': LightGBMModel(tf_name, MODELS_DIR),
'randomforest': RandomForestModel(tf_name, MODELS_DIR),
'catboost': CatBoostModel(tf_name, MODELS_DIR),
'stacking': StackingModel(tf_name, MODELS_DIR),
}
self.normalizers[tf_name] = FeatureNormalizer()
[docs]
def add_callback(self, callback: Callable):
"""Add callback for training status updates"""
self._callbacks.append(callback)
def _notify(self, event: str, data: dict):
"""Notify all callbacks"""
for callback in self._callbacks:
try:
callback(event, data)
except Exception as e:
# Don't use logger here if bridge is active to avoid recursion
if not self._bridge_active:
logger.error(f"Callback error: {e}")
def _enable_log_bridge(self, enabled: bool = True):
"""Attach/Detach log bridge to global logging"""
root_logger = logging.getLogger()
if enabled and not self._bridge_active:
root_logger.addHandler(self._log_handler)
self._bridge_active = True
logger.info("📡 Live Log Bridge: ACTIVE")
elif not enabled and self._bridge_active:
root_logger.removeHandler(self._log_handler)
self._bridge_active = False
logger.info("📡 Live Log Bridge: DEACTIVATED")
def _validate_and_log(self, timeframe: str, model_type: str, accuracy: float, is_incremental: bool) -> bool:
"""
Validates model and logs appropriate English messages.
Returns True if RECOVERY (Global Retrain) is required.
"""
model = self.get_model(timeframe, model_type)
if not model: return False
# Accuracy is passed as parameter. Validator checks file existence/size too.
# However, validator checks file existence/size too.
path = model.model_path
healthy, reason = self.validator.is_healthy(path, model_type, accuracy)
if not healthy:
# ONLY block on Integrity/File issues now. Accuracy is "libre".
msg = f"⚠️ Integrity issue detected: {reason}. Rebuilding via Global training..."
if is_incremental:
logger.warning(f"[{timeframe}] {model_type.upper()} {msg}")
self._notify("log", {"level": "warning", "message": f"[{timeframe}] {msg}"})
return True # Trigger Recovery for integrity
else:
# NOTE: We do NOT call Sentinel here to avoid infinite loop.
# The caller (train_timeframe) will handle the failure gracefully.
fail_msg = f"⚠️ {model_type.upper()} integrity check failed. Continuing with fresh training..."
logger.warning(f"[{timeframe}] {fail_msg}")
self._notify("log", {"level": "warning", "message": f"[{timeframe}] {fail_msg}"})
# Just delete the bad model file if it exists
try:
if path.exists(): path.unlink()
except Exception: pass
return False # Continue training, don't loop
# Healthy (Accuracy is always healthy now since min=0.0)
logger.info(f"[{timeframe}] {model_type.upper()} Validation: OK (Accuracy: {accuracy:.1%})")
if not is_incremental:
success_msg = f"✅ Model Ready: {model_type.upper()} accuracy reported as {accuracy:.1%}."
self._notify("log", {"level": "success", "message": f"[{timeframe}] {success_msg}"})
return False
[docs]
def get_model(self, timeframe: str, model_type: str):
"""
Get a specific model with LRU loading/unloading logic.
"""
from config.settings import default_settings
# Determine max loaded TFs based on profile
profile = getattr(default_settings, "performance_profile", "BALANCED")
if profile == "ECO": self._max_loaded_tfs = 1
elif profile == "PRO": self._max_loaded_tfs = 6
else: self._max_loaded_tfs = 2
with self._lock:
# Check if this timeframe is already considered "active"
if timeframe in self._loaded_tfs:
# Move to end of LRU (Most Recent)
self._loaded_tfs.remove(timeframe)
self._loaded_tfs.append(timeframe)
else:
# New timeframe requested
if len(self._loaded_tfs) >= self._max_loaded_tfs:
# Evict least recently used timeframe
old_tf = self._loaded_tfs.pop(0)
logger.info(f"LRU EVICTION: Unloading {old_tf} models to save RAM (Profile: {profile})")
for m_name in self.models.get(old_tf, {}):
m_obj = self.models[old_tf][m_name]
if hasattr(m_obj, 'unload'):
m_obj.unload()
# Add current to LRU
self._loaded_tfs.append(timeframe)
model = self.models.get(timeframe, {}).get(model_type)
# Auto-load if needed (PredictionEngine also does this, but keeping it here for safety)
if model and not model.is_trained:
if model.model_path.exists():
model.load()
return model
[docs]
def get_normalizer(self, timeframe: str) -> FeatureNormalizer:
"""
Get feature normalizer for a timeframe.
Auto-loads from disk if not already in memory/fitted.
"""
normalizer = self.normalizers.get(timeframe)
if normalizer is None:
normalizer = FeatureNormalizer()
self.normalizers[timeframe] = normalizer
# If not fitted, try to load from disk
if not normalizer.is_fitted:
norm_path = MODELS_DIR / f"normalizer_{timeframe}.pkl"
if norm_path.exists():
if normalizer.load(norm_path):
logger.debug(f"[{timeframe}] Auto-loaded normalizer from disk")
else:
logger.warning(f"[{timeframe}] Failed to load existing normalizer")
else:
# It's expected if never trained, but bad for prediction
pass
return normalizer
[docs]
def get_training_status(self, timeframe: str) -> dict:
"""Get training status for a timeframe"""
db = get_database()
models_info = {}
for model_type in ['xgboost', 'lightgbm', 'randomforest', 'catboost', 'stacking']:
model = self.get_model(timeframe, model_type)
last_training = db.get_last_training(timeframe, model_type)
models_info[model_type] = {
'is_trained': model.is_trained if model else False,
'accuracy': model.accuracy if model else 0.0
}
return {
'models': models_info,
'is_training': self.current_training_tf == timeframe,
}
[docs]
def should_train_incremental(self, timeframe: str) -> Tuple[bool, Optional[datetime]]:
"""
Determine if training should be incremental.
Checks if models have been trained recently (within 6 hours).
If so, returns (True, last_training_date) to train only on new data.
Otherwise returns (False, None) for full training.
Returns:
Tuple of (is_incremental, train_from_date)
"""
db = get_database()
# Check last training for any model
last_dates = []
for model_type in ['xgboost', 'lightgbm', 'randomforest', 'catboost', 'stacking']:
last_training = db.get_last_training(timeframe, model_type)
if last_training and last_training.completed_at:
completed = last_training.completed_at
# Handle string dates from SQLite
if isinstance(completed, str):
try:
completed = datetime.fromisoformat(completed.replace('Z', '+00:00'))
except ValueError:
try:
completed = datetime.strptime(completed, "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
completed = datetime.strptime(completed, "%Y-%m-%d %H:%M:%S")
# Remove timezone info for comparison with naive datetime
if hasattr(completed, 'replace') and completed.tzinfo is not None:
completed = completed.replace(tzinfo=None)
last_dates.append(completed)
if not last_dates:
# No previous training - do global training
return False, None
# Use oldest last training date
oldest_training = min(last_dates)
# If trained within last 6 hours, do incremental
# We relax this check slightly to allow manual incremental triggers
# But generally, incremental implies we have a baseline.
return True, oldest_training
[docs]
def prepare_training_data(
self,
timeframe: str,
from_date: Optional[datetime] = None,
use_kpi_labels: bool = True
) -> Tuple[Optional[np.ndarray], Optional[np.ndarray], int, List[str], Optional[pd.DataFrame]]:
"""
Prepare features and labels for training.
"""
logger.info(f"─── DATA PREPARATION SECTION: {timeframe} ───")
self._notify("phase_update", {"phase": "data", "status": "active", "detail": f"Loading {timeframe}..."})
# Load OHLC data
logger.info(f"[{timeframe}] Loading OHLC from disk...")
try:
df = self.mt5.load_ohlc(timeframe)
except Exception as e:
logger.error(f"[{timeframe}] Failed to load OHLC: {e}")
df = None
if df is None:
logger.warning(f"[{timeframe}] df is None after load")
else:
logger.debug(f"[{timeframe}] df shape: {df.shape}")
# --- GENERAL STALENESS CHECK ---
# Ensure data is fresh regardless of incremental settings
if df is not None and not df.empty:
try:
# Ensure time column is datetime
if not pd.api.types.is_datetime64_any_dtype(df['time']):
df['time'] = pd.to_datetime(df['time'])
last_local_date = df['time'].iloc[-1]
now = datetime.now()
if pd.isna(last_local_date):
logger.warning(f"[{timeframe}] Last date is NaT! skipping staleness check.")
else:
# Calculate stale threshold based on timeframe
from config.settings import get_timeframe_by_name
tf_conf = get_timeframe_by_name(timeframe)
minutes_per_candle = tf_conf.minutes if tf_conf else 60
is_stale = False
age = now - last_local_date
if age.total_seconds() > 24 * 3600: # Older than 24h
is_stale = True
elif age.total_seconds() > (minutes_per_candle * 60 * 5): # Missing > 5 candles
is_stale = True
if is_stale:
logger.warning(f"[{timeframe}] Data appears stale (Last: {last_local_date}, Now: {now}).")
self._notify("phase_update", {"phase": "data", "status": "warning", "detail": "Stale Data"})
except Exception as e:
logger.error(f"Staleness check error: {e}")
# SMART UPDATE CHECK (Incremental specifics):
if from_date and df is not None and not df.empty:
pass
# HISTORICAL DATA CHECK:
from config.settings import default_settings
req_start_year = default_settings.extract_from_year
req_start_date = pd.Timestamp(datetime(req_start_year, 1, 1))
if df is not None and not df.empty:
first_date = df['time'].iloc[0]
if first_date > (req_start_date + timedelta(days=30)):
logger.warning(f"[{timeframe}] History starts at {first_date}, expected {req_start_date}.")
if df is None or df.empty:
logger.warning(f"[{timeframe}] No OHLC data found")
self._notify("phase_update", {"phase": "data", "status": "error", "detail": "No Data"})
return None, None, 0, [], None
logger.info(f"[{timeframe}] Loaded {len(df)} OHLC candles (from {df['time'].min()} to {df['time'].max()})")
self._notify("phase_update", {"phase": "data", "status": "complete", "detail": f"{len(df):,}"})
# Store incremental filter date (will apply AFTER feature calculation)
incremental_from_date = None
if from_date:
if isinstance(from_date, str):
incremental_from_date = pd.to_datetime(from_date)
else:
incremental_from_date = pd.Timestamp(from_date)
logger.info(f"[{timeframe}] Incremental training mode: will filter samples from {incremental_from_date} after feature calculation")
# CHECK TOTAL DATA SUFFICIENCY (before filtering for incremental)
if len(df) < 200:
logger.warning(f"[{timeframe}] Not enough historical data for indicators: {len(df)} (need 200+)")
return None, None, 0, [], None
# Reset index for proper alignment
df = df.reset_index(drop=True)
# CREATE LABELS on FULL dataset (before dropna)
if use_kpi_labels:
self._notify("phase_update", {"phase": "labels", "status": "active", "detail": "Labeling..."})
logger.info(f"[{timeframe}] Creating KPI-based labels using Noble Safe Range...")
labels = self.labeling_engine.create_labels(df, timeframe)
# Label stats
buy, sell, neutral = int(sum(labels == 2)), int(sum(labels == 0)), int(sum(labels == 1))
self._notify("phase_update", {"phase": "labels", "status": "complete", "detail": f"B:{buy:,}"})
else:
labels = XGBoostModel(timeframe, MODELS_DIR).prepare_labels(df)
# Store original indices for alignment
original_len = len(df)
# Calculate features on FULL data (includes dropna which removes some rows)
logger.info(f"[{timeframe}] Calculating features (all indicators + Noble KPI)...")
self._notify("phase_update", {"phase": "features", "status": "active", "detail": "Calculating..."})
def progress_log(msg):
self._notify("log", {"level": "info", "message": msg})
try:
# Calculate features (bypass 5000 row limit for training)
df = self.feature_engine.calculate_all_features(
df,
timeframe=timeframe,
progress_callback=progress_log,
is_training=True
)
except Exception as e:
logger.error(f"[{timeframe}] Feature calculation failed: {e}. Triggering FE-200 Recovery...")
# Index FE-200: Feature Cache Recovery
get_sentinel(tm=self).repair("FE-200", {"timeframe": timeframe})
# One last try after cache wipe
df = self.feature_engine.calculate_all_features(
df,
timeframe=timeframe,
progress_callback=progress_log,
is_training=True
)
if df.empty:
logger.warning(f"[{timeframe}] No features calculated")
return None, None, 0, [], None
# Get valid indices after dropna
valid_indices = df.index if len(df) < original_len else np.arange(len(df))
# Get feature columns
# Get feature columns
exclude = ['time', 'open', 'high', 'low', 'close', 'volume']
all_cols = [c for c in df.columns if c not in exclude]
# --- USER CONFIG FILTERING ---
# Respect the "Enabled" checkboxes in the UI
try:
db = get_database()
enabled_features = db.get_enabled_indicators()
# Fallback if list is empty (user disabled everything by mistake)
if not enabled_features or len(enabled_features) < 3:
logger.warning(f"[{timeframe}] User disabled almost all indicators. Using ALL defaults for safety.")
feature_cols = all_cols
else:
# Filter: Keep only enabled features (and special columns like lags which might not be in the list)
# Note: Lags usually named 'close_lag_1' etc. might not be in the default list.
# We should be permissive: If it's in the list, check status. If NOT in list (new feature), keep it or drop it?
# Safer strategy: Filter ONLY those that explicitly appear in the config table.
feature_cols = []
for col in all_cols:
# Check exact match first
if col in enabled_features:
feature_cols.append(col)
# Check if it's a lag/custom feature NOT in the DB config (Keep them to avoid breaking custom logic)
# Or check if it's explicitly DISABLED (would need a get_disabled_indicators)
# Simplified: If the DB returns a list of "Enabled", we assume anything NOT in the DB is either custom or unknown.
# Let's trust the DB list covers the main indicators shown in UI.
# If a feature is NOT in the DB at all (e.g. lag), we KEEP it to be safe.
else:
# Check if it exists in the ALL indicators list (to see if it was possible to disable it)
# This is expensive. Let's just assume if it starts with 'return' or 'lag' it's internal.
if any(x in col for x in ['lag', 'return', 'candle', 'volatility']):
feature_cols.append(col)
# Else, it might be an indicator that is disabled.
# Wait, db.init_default_indicators() populates everything.
# So if 'rsi' is not in enabled_features, it means it is Disabled.
logger.info(f"[{timeframe}] Applied User Filter: {len(feature_cols)} features enabled (out of {len(all_cols)})")
except Exception as e:
logger.warning(f"Failed to apply indicator config: {e}")
feature_cols = all_cols
# --- DATA LEAKAGE FIX: Do NOT normalize here ---
# We return raw features, splitting and normalization happens in train_timeframe
# Get X from raw features
X = df[feature_cols].values
# Align labels with valid indices (after dropna)
# FIX: Use precise index-based alignment. FeatureEngine now preserves indices.
# This handles cases where rows are dropped in the middle (not just head).
# Ensure labels are numpy array
labels = np.array(labels)
# Filter labels using the valid indices from the feature dataframe
try:
y = labels[df.index]
except IndexError:
# Fallback if indices are out of bounds (should not happen if df was reset before)
logger.error(f"[{timeframe}] Index alignment failed. Fallback to tail slicing (RISKY).")
y = labels[-len(X):]
# --- FEATURE SELECTION (Agile Mode) ---
# REMOVED from here to avoid DATA LEAKAGE.
# Feature selection must be done on X_train ONLY inside train_timeframe.
logger.info(f"[{timeframe}] Generated {len(feature_cols)} features for {len(X)} samples (RAW)")
self._notify("phase_update", {"phase": "features", "status": "complete", "detail": f"{len(feature_cols)} feats"})
# Remove rows with NaN/Inf in features
valid_mask = ~np.isnan(X).any(axis=1) & ~np.isinf(X).any(axis=1)
X = X[valid_mask]
y = y[valid_mask]
df = df.iloc[valid_mask].reset_index(drop=True) # Keep df in sync for time filter
# --- WINSORIZATION (Outlier Treatment) ---
# User Feedback: "Pas de Winsorization/Clip Outliers"
# We clip at 0.1/99.9 percentile to remove extreme noise while keeping signal.
if len(X) > 100:
for i in range(X.shape[1]):
try:
lower = np.percentile(X[:, i], 0.1)
upper = np.percentile(X[:, i], 99.9)
X[:, i] = np.clip(X[:, i], lower, upper)
except Exception: continue
# Replace remaining Infs (if any) with 0
X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
if len(X) != len(y):
logger.error(f"[{timeframe}] CRITICAL: X/y size mismatch: X={len(X)}, y={len(y)}")
min_len = min(len(X), len(y))
X = X[:min_len]
y = y[:min_len]
# APPLY INCREMENTAL FILTER (after all features are ready)
if incremental_from_date is not None:
time_col = df['time']
mask = time_col >= incremental_from_date
X = X[mask.values] if hasattr(mask, 'values') else X[mask]
y = y[mask.values] if hasattr(mask, 'values') else y[mask]
df = df[mask].reset_index(drop=True)
logger.info(f"[{timeframe}] Incremental training: sliced to {len(X)} samples from {incremental_from_date}")
# Check for class diversity
unique_classes = np.unique(y)
if len(unique_classes) < 2:
msg = f"Incremental slice contains too few classes ({len(unique_classes)}). Skipping incremental training."
logger.warning(f"[{timeframe}] {msg}")
return None, None, 0, [], None
if len(X) < 10:
logger.info(f"[{timeframe}] Not enough new samples for incremental training: {len(X)} (need 10+)")
return None, None, 0, [], None
# --- FINAL RAM OPTIMIZATION ---
logger.info(f"[{timeframe}] 🧹 Data preparation complete. Features: {len(feature_cols)}, Samples: {len(X)}")
self._notify("phase_update", {"phase": "features", "status": "complete", "detail": f"{len(feature_cols)} feats"})
return X, y, len(X), feature_cols, df
def _filter_history_by_limit(self, df: pd.DataFrame, timeframe: str) -> pd.DataFrame:
"""
Enforce strict historical data limits for TurboTrain efficiency:
1m -> User defined (default 6 months)
5m -> 1 year
15m -> 3 years
Others -> 5 years
"""
try:
from config.settings import default_settings
cutoff_months = 60 # Default 5 years
if timeframe == '1m': cutoff_months = getattr(default_settings, 'history_1m_months', 6)
elif timeframe == '5m': cutoff_months = 12
elif timeframe == '15m': cutoff_months = 36
cutoff_date = pd.Timestamp.now() - pd.DateOffset(months=cutoff_months)
# Check if index is datetime
if isinstance(df.index, pd.DatetimeIndex):
if df.index[0] < cutoff_date:
return df[df.index >= cutoff_date]
elif 'time' in df.columns:
# If time column exists ensure it matches
# But typically we expect the DF passed here to be the one we can slice
pass
return df
except Exception as e:
logger.warning(f"History filter failed: {e}")
return df
[docs]
def train_timeframe(self, timeframe: str, force_global: bool = False, precomputed_data: tuple = None, _recovery_depth: int = 0, use_walk_forward: bool = False) -> dict:
"""
Train all models for a single timeframe. (Automatic Recovery with depth limit)
Accepts optional precomputed_data to support Hybrid Parallel flow.
Args:
use_walk_forward: If True, run Walk-Forward CV before normal training to log fold metrics.
"""
# RECURSION PROTECTION: Max 1 retry to prevent infinite loops
MAX_RECOVERY_DEPTH = 1
if _recovery_depth > MAX_RECOVERY_DEPTH:
logger.error(f"[{timeframe}] Max recovery depth reached. Aborting training.")
return {"error": "Max recovery depth exceeded"}
logger.info(f"════════════════════════════════════════════════════════════")
logger.info(f" TRAINING CYCLE START: {timeframe}")
logger.info(f"════════════════════════════════════════════════════════════")
db = get_database()
results = {}
# RECOVERY WRAPPER
try:
self.current_training_tf = timeframe
self._notify("training_start", {"timeframe": timeframe})
# Check if models actually exist
from config.settings import MODELS_DIR
model_types = ["xgboost", "lightgbm", "randomforest", "catboost", "stacking"]
models_exist = all(
(MODELS_DIR / f"{mt}_{timeframe}.pkl").exists()
for mt in model_types
)
if not models_exist and not force_global:
logger.info(f"[{timeframe}] Models missing -> Forcing Global Training")
force_global = True
# Determine training mode
if force_global:
is_incremental, from_date = False, None
else:
is_incremental, from_date = self.should_train_incremental(timeframe)
self._notify("training_mode", {
"timeframe": timeframe, "is_incremental": is_incremental, "from_date": from_date,
})
# Prepare data (RAW)
if precomputed_data:
X, y, sample_count, feature_names, df = precomputed_data
from_date = None # Not needed as data is already sliced
else:
self._notify("log", {"level": "info", "message": f"[{timeframe}] 📥 Preparing training data..."})
X, y, sample_count, feature_names, df = self.prepare_training_data(
timeframe, from_date, use_kpi_labels=True
)
if X is None:
if is_incremental:
logger.info(f"[{timeframe}] Incremental slice insufficient. Falling back to GLOBAL...")
is_incremental = False
from_date = None
X, y, sample_count, feature_names, df = self.prepare_training_data(timeframe, None, use_kpi_labels=True)
if X is None:
logger.info(f"[{timeframe}] Training cycle skipped.")
self._notify("phase_update", {"phase": "train_xgb", "status": "complete", "detail": "Up to date"})
return {"status": "skipped"}
# --- OPTIONAL: WALK-FORWARD CROSS-VALIDATION ---
if use_walk_forward and df is not None:
logger.info(f"[{timeframe}] 🔄 Running Walk-Forward Cross-Validation...")
self._notify("phase_update", {"phase": "wf_cv", "status": "active", "detail": "WF-CV..."})
try:
from core.walk_forward import get_walk_forward_validator
wf_validator = get_walk_forward_validator(train_months=6, val_months=1)
wf_results = []
for train_idx, val_idx, fold_info in wf_validator.split(df, 'time'):
X_train_fold, y_train_fold = X[train_idx], y[train_idx]
X_val_fold, y_val_fold = X[val_idx], y[val_idx]
# Quick evaluation with XGBoost only (fastest)
from models.xgboost_model import XGBoostModel
from config.settings import MODELS_DIR
fold_model = XGBoostModel(timeframe, MODELS_DIR)
fold_acc = fold_model.train(X_train_fold, y_train_fold, X_val_fold, y_val_fold)
wf_results.append({
'fold': fold_info['fold'],
'train_samples': fold_info['train_samples'],
'val_samples': fold_info['val_samples'],
'accuracy': fold_acc
})
logger.info(f"[{timeframe}] WF Fold {fold_info['fold']}: Accuracy={fold_acc:.2%}")
if wf_results:
avg_wf_acc = np.mean([r['accuracy'] for r in wf_results])
logger.info(f"[{timeframe}] ✅ Walk-Forward CV Complete: {len(wf_results)} folds, Avg={avg_wf_acc:.2%}")
results['walk_forward'] = {'folds': len(wf_results), 'avg_accuracy': avg_wf_acc, 'details': wf_results}
self._notify("phase_update", {"phase": "wf_cv", "status": "complete", "detail": f"{avg_wf_acc:.1%}"})
except Exception as wf_err:
logger.warning(f"[{timeframe}] Walk-Forward CV failed: {wf_err}")
self._notify("phase_update", {"phase": "wf_cv", "status": "error", "detail": "Failed"})
# --- DATA SPLIT (3-WAY: Train/Val/Test) ---
# 70% Train / 15% Validation / 15% Test (TRUE OUT-OF-SAMPLE HOLDOUT)
n = len(X)
train_end = int(n * 0.70)
val_end = int(n * 0.85)
# SPLIT RAW DATA FIRST (Before normalization/selection to prevent leakage)
X_train_raw, y_train = X[:train_end], y[:train_end]
X_val_raw, y_val = X[train_end:val_end], y[train_end:val_end]
X_test_raw, y_test = X[val_end:], y[val_end:]
# --- LEAK-PROOF FEATURE SELECTION (On X_train only) ---
# Select Top 15 features using ONLY training data
logger.info(f"[{timeframe}] 🛡️ Running Leak-Proof Feature Selection (Top 15)...")
try:
# Convert back to DF for feature engine (needs column names)
df_train = pd.DataFrame(X_train_raw, columns=feature_names)
selected_cols = self.feature_engine.select_best_features(
df_train, y_train, n_features=15 # Slightly more permissive than 10
)
if selected_cols and len(selected_cols) >= 5:
# Filter All Sets to selected columns
indices = [feature_names.index(c) for c in selected_cols]
X_train = X_train_raw[:, indices]
X_val = X_val_raw[:, indices]
X_test = X_test_raw[:, indices]
feature_names = selected_cols # Update feature names
logger.info(f"[{timeframe}] ✅ Selected {len(feature_names)} features: {feature_names}")
else:
X_train, X_val, X_test = X_train_raw, X_val_raw, X_test_raw
logger.warning(f"[{timeframe}] Feature selection failed/insufficient. Using all features.")
except Exception as fs_err:
logger.error(f"[{timeframe}] Feature selection error: {fs_err}. Using all features.")
X_train, X_val, X_test = X_train_raw, X_val_raw, X_test_raw
logger.info(f"[{timeframe}] 🔀 3-WAY SPLIT: Train={len(X_train)}, Val={len(X_val)}, Test={len(X_test)}")
# --- NORMALIZATION (Quantile-based) ---
logger.info(f"[{timeframe}] 📊 Fitting normalizer on training data...")
normalizer = self.normalizers[timeframe]
# Create temp DataFrame for fit (using training data ONLY to avoid data leakage)
X_train_df = pd.DataFrame(X_train, columns=feature_names)
normalizer.fit(X_train_df, feature_names)
# Transform all sets using the normalizer fitted on training data
X_train = normalizer.transform_array(X_train, feature_names)
X_val = normalizer.transform_array(X_val, feature_names)
X_test = normalizer.transform_array(X_test, feature_names)
# Save normalizer for inference
norm_path = MODELS_DIR / f"normalizer_{timeframe}.pkl"
normalizer.save(norm_path)
self._notify("phase_update", {"phase": "normalize", "status": "complete", "detail": "Fitted"})
# --- CLASS WEIGHTS (Balanced based on distribution) ---
from sklearn.utils.class_weight import compute_class_weight
try:
unique_classes = np.unique(y_train)
weights = compute_class_weight('balanced', classes=unique_classes, y=y_train)
class_weight_dict = dict(zip(unique_classes, weights))
# Reduce NEUTRAL (class 1) weight further to discourage trivial predictions
if 1 in class_weight_dict:
class_weight_dict[1] *= 0.3 # Penalize NEUTRAL predictions
class_weights = np.array([class_weight_dict.get(y, 1.0) for y in y_train])
logger.info(f"[{timeframe}] Class weights: {class_weight_dict}")
except Exception as cw_err:
logger.warning(f"[{timeframe}] Class weight computation failed: {cw_err}. Using uniform weights.")
class_weights = np.ones(len(y_train))
# --- TEMPORAL SAMPLE WEIGHTS (Decay) ---
# Give more weight to recent data (linear decay from 1.0 to 0.5)
try:
# Create a time decay factor
sample_count = len(y_train)
# Linear increase from 0.5 (oldest) to 1.0 (newest)
time_weights = np.linspace(0.5, 1.0, sample_count)
# Combine class weights and time weights
# class_weights is already an array of size sample_count (mapped from y)
combined_weights = class_weights * time_weights
# Normalize to keep sum roughly same (optional, but good for stability)
combined_weights = combined_weights * (len(combined_weights) / combined_weights.sum())
class_weights = combined_weights
logger.info(f"[{timeframe}] ⏳ Applied Temporal Sample Weighting (Old=0.5x -> New=1.0x)")
except Exception as tw_err:
logger.warning(f"[{timeframe}] Temporal weighting failed: {tw_err}")
# --- PHASE 2: MODEL TRAINING ---
for model_type in model_types:
if self._stop_training.is_set(): break
# --- UNIFIED LOGIC FOR ALL MODELS (Trees + Stacking) ---
model = self.get_model(timeframe, model_type)
if not model: continue
# Assign feature names to the model for future alignment
model.feature_names = feature_names
log_id = db.log_training_start(timeframe, model_type, is_incremental)
phase_key = f"train_{'xgb' if model_type=='xgboost' else 'lgb' if model_type=='lightgbm' else 'rf' if model_type=='randomforest' else 'cat' if model_type=='catboost' else 'stk'}"
self._notify("phase_update", {"phase": phase_key, "status": "active", "detail": "Training..."})
try:
acc = model.train(X_train, y_train, X_val, y_val, class_weights)
# --- OUT-OF-SAMPLE EVALUATION (TRUE HOLDOUT) ---
test_acc = None
try:
# FIX: usage of internal model for raw class prediction
# The wrapper's .predict() returns (Signal, Confidence), we need classes [0, 1, 2]
if hasattr(model, 'model') and hasattr(model.model, 'predict'):
test_preds = model.model.predict(X_test)
else:
# Fallback if internal model not accessible (should not happen with our wrappers)
logger.warning(f"[{timeframe}] Cannot access internal model for {model_type}. Skipping test eval.")
test_preds = None
if test_preds is not None:
# Handle case where predict returns probabilities instead of classes
if hasattr(test_preds, 'ndim') and test_preds.ndim == 2:
test_preds = np.argmax(test_preds, axis=1)
test_acc = float((test_preds == y_test).mean())
logger.info(f"[{timeframe}] 📊 {model_type.upper()} OUT-OF-SAMPLE Accuracy: {test_acc:.1%}")
results[model_type] = {"accuracy": acc, "test_accuracy": test_acc, "is_incremental": is_incremental}
else:
results[model_type] = {"accuracy": acc, "is_incremental": is_incremental}
except Exception as eval_err:
logger.warning(f"[{timeframe}] Test evaluation failed: {eval_err}")
results[model_type] = {"accuracy": acc, "is_incremental": is_incremental}
# Log to database WITH test_accuracy
db.log_training_complete(log_id, sample_count, acc, test_acc)
self._notify("phase_update", {"phase": phase_key, "status": "complete", "detail": f"{acc:.1%}"})
except RecoveryRequired: raise
except Exception as e:
logger.error(f"[{timeframe}] {model_type.upper()} Failed: {e}")
self._notify("phase_update", {"phase": phase_key, "status": "error", "detail": "Failed"})
return results
except RecoveryRequired:
logger.info(f"[{timeframe}] Recovery Triggered: Restarting in GLOBAL mode (depth={_recovery_depth + 1})...")
return self.train_timeframe(timeframe, force_global=True, _recovery_depth=_recovery_depth + 1)
except Exception as e:
logger.error(f"Training failed for {timeframe}: {e}", exc_info=True)
return {"error": str(e)}
finally:
self.current_training_tf = None
self._notify("training_complete", {"timeframe": timeframe, "results": results})
[docs]
def train_all(self, force_global: bool = False):
"""
HyperSafe Training (V4): Sequential, Memory-Safe Batch Training.
Refactored to avoid OOM (Out of Memory) crashes by processing timeframes
one-by-one and releasing memory immediately after each cycle.
"""
import gc
from config.settings import TIMEFRAME_NAMES
logger.info(f"🚀 [HyperSafe V4] STARTING RAM-OPTIMIZED TRAINING (ForceGlobal={force_global})")
self._enable_log_bridge(True)
results = {}
try:
for tf in TIMEFRAME_NAMES:
if self._stop_training.is_set():
logger.warning(f"⏹ Training interrupted by user at timeframe {tf}")
break
logger.info(f"🧵 [HyperSafe] Processing Timeframe: {tf}...")
try:
# Execute training for this timeframe
# DATA PREPARATION and MODEL FITTING happen inside sequential train_timeframe
res = self.train_timeframe(tf, force_global)
results[tf] = res
# LOG SUCCESS
if "error" in res:
logger.error(f"❌ [{tf}] Training resulted in error: {res['error']}")
else:
logger.info(f"✅ [{tf}] Training completed successfully.")
except Exception as e:
logger.error(f"❌ [HyperSafe] Timeframe {tf} Failed: {e}", exc_info=True)
results[tf] = {"error": str(e)}
finally:
# CRITICAL: Force garbage collection after EACH timeframe to free RAM
gc.collect()
time.sleep(0.5) # Give OS time to breathe
# --- RESULTS ASSEMBLY ---
total_success = sum(1 for tf in results if "error" not in results[tf] and results[tf].get("status") != "skipped")
logger.info(f"🏁 [HyperSafe V4] COMPLETED. Success: {total_success}/{len(TIMEFRAME_NAMES)} timeframes.")
# Cleanup features (delete temporary cache if configured)
self._cleanup_features()
except Exception as global_err:
logger.error(f"❌ [HyperSafe] Global training loop failed: {global_err}")
finally:
self._enable_log_bridge(False)
gc.collect()
return results
[docs]
def train_with_drift_check(self, timeframe: str) -> dict:
"""
Train only if drift is detected.
Args:
timeframe: Timeframe to check and potentially train
Returns:
Dict with drift status and training results
"""
# Get current features for drift check
df = self.mt5.load_ohlc(timeframe)
if df is None or df.empty:
return {"error": "No data for drift check"}
df = self.feature_engine.calculate_all_features(df, timeframe=timeframe)
exclude = ['time', 'open', 'high', 'low', 'close', 'volume']
feature_cols = [c for c in df.columns if c not in exclude]
# Check for drift
drift_report = self.drift_detector.detect_drift(df[feature_cols])
result = {
"drift_detected": drift_report.drift_detected,
"should_retrain": drift_report.should_retrain,
"psi_score": drift_report.psi_score,
"reasons": drift_report.reasons,
}
if drift_report.should_retrain:
logger.info(f"[{timeframe}] Drift detected, triggering retraining: {drift_report.reasons}")
result["training_results"] = self.train_timeframe(timeframe, force_global=True)
else:
logger.info(f"[{timeframe}] No significant drift detected, skipping retraining")
return result
[docs]
def start_scheduled_training(self, interval_hours: int = 12):
"""Start scheduled training in background thread"""
if self.training_thread and self.training_thread.is_alive():
logger.warning("Training already running")
return
self._stop_training.clear()
def training_loop():
while not self._stop_training.is_set():
self.is_training = True
self._notify("scheduled_training_start", {"interval_hours": interval_hours})
try:
self.train_all()
except Exception as e:
logger.error(f"Scheduled training failed: {e}")
self.is_training = False
# Wait for next interval
for _ in range(interval_hours * 3600):
if self._stop_training.is_set():
break
time.sleep(1)
self.training_thread = threading.Thread(target=training_loop, daemon=True)
self.training_thread.start()
logger.info(f"Started scheduled training every {interval_hours} hours")
[docs]
def stop_scheduled_training(self):
"""Stop scheduled training"""
self._stop_training.set()
self.is_training = False
logger.info("Stopped scheduled training")
[docs]
def load_all_models(self):
"""Load all saved models from disk"""
for tf_name in TIMEFRAME_NAMES:
# Load normalizer
normalizer_path = MODELS_DIR / f"normalizer_{tf_name}.pkl"
if normalizer_path.exists():
self.normalizers[tf_name].load(normalizer_path)
# Load models
for model_type in ['xgboost', 'lightgbm', 'randomforest', 'stacking']:
model = self.get_model(tf_name, model_type)
if model:
model.load()
def _cleanup_features(self):
"""Clean up temporary feature parquet files after training to free disk space"""
from config.settings import FEATURES_DIR
try:
if not FEATURES_DIR.exists():
return
# Count files before cleanup
parquet_files = list(FEATURES_DIR.glob("*.parquet"))
if not parquet_files:
logger.info("[Cleanup] No feature files to clean up")
return
total_size_mb = sum(f.stat().st_size for f in parquet_files) / (1024 * 1024)
file_count = len(parquet_files)
# Delete all parquet files
for f in parquet_files:
try:
f.unlink()
except Exception:
pass
logger.info(f"🧹 [Cleanup] Deleted {file_count} feature files ({total_size_mb:.1f} MB freed)")
except Exception as e:
logger.warning(f"[Cleanup] Failed to clean features: {e}")