Source code for core.prediction_engine

"""
Cerebrum Forex - Prediction Engine V2
Generate signals using Congress Engine weighted ensemble.

Key features:
- Weighted aggregation via Congress Engine
- Regime-adaptive weights
- Full audit trail
- Standardized model outputs
"""

import logging
from models.xgboost_model import XGBoostModel
from models.lightgbm_model import LightGBMModel
from models.catboost_model import CatBoostModel
from models.stacking_model import StackingModel
from typing import Optional, Callable, Dict, List, Tuple
from models.meta_model import MetaModel
import threading
import time
import json
from pathlib import Path
from datetime import datetime

import pandas as pd
import numpy as np

from config.settings import TIMEFRAMES, SIGNALS_DIR, TIMEFRAME_NAMES, DATA_DIR
from core.mt5_connector import MT5Connector
from core.feature_engine import FeatureEngine
from core.training_manager import TrainingManager
from core.model_scorer import ModelScorer
from core.noble_kpi import calculate_atr, detect_regime, noble_safe_range
from core.prop_firm_guardian import get_guardian
from core.risk_filter import get_risk_filter
from core.congress_engine import (
    CongressEngine, get_congress_engine, 
    ModelPrediction, AladdinPersona, CongressDecision
)
from core.audit_cache import get_audit_cache
from core.drift_detector import get_drift_detector
from core.regime_detector import RegimeDetector
from config.settings import MODELS_DIR, get_timeframe_by_name
from database import get_database
from core.exceptions import (
    DataMissingError, DataStaleError, ModelNotTrainedError,
    FeatureCalculationError, InsufficientModelsError, PredictionError
)
from core.debug_manager import get_debug_manager
from core.sentinel import get_sentinel

logger = logging.getLogger(__name__)


[docs] class PredictionEngine: """ Generate predictions using Congress Engine weighted ensemble. V2 features: - Congress Engine for weighted aggregation - Regime-aware weight adjustment - Full audit trail for explainability - Drift detection monitoring """ def __init__(self, training_manager: TrainingManager, mt5_connector: MT5Connector): self.training_manager = training_manager self.mt5 = mt5_connector self.symbol = mt5_connector.symbol # Required for Guardian checks self.feature_engine = FeatureEngine() self.congress = get_congress_engine() self.drift_detector = get_drift_detector() # EXPERIMENTAL: Forbidden Logic from core.smart_money import SmartMoneyEngine self.smc_engine = SmartMoneyEngine() # Prediction state self.is_predicting = False self._callbacks: List[Callable] = [] # OPTIMIZATION: Feature cache to avoid recalculating when data unchanged # Key: (timeframe, last_candle_time) -> Value: (X_normalized, feature_cols) self._feature_cache: dict = {} self._feature_cache_max_size = 8 # One per timeframe self._cache_lock = threading.Lock() # Ensure signals directory exists SIGNALS_DIR.mkdir(parents=True, exist_ok=True)
[docs] def add_callback(self, callback: Callable): """Add callback for prediction status updates""" self._callbacks.append(callback)
[docs] def remove_callback(self, callback: Callable): """Remove callback""" if callback in self._callbacks: self._callbacks.remove(callback)
def _notify(self, event: str, data: dict): """Notify all callbacks""" for callback in self._callbacks: try: callback(event, data) except Exception as e: logger.error(f"Callback error: {e}")
[docs] def get_signal_path(self, timeframe: str) -> Path: """Get path to signal CSV file""" try: db = get_database() custom_dir = db.get_config("signals_dir") if custom_dir: return Path(custom_dir) / f"signal_{timeframe}.csv" except Exception: pass return SIGNALS_DIR / f"signal_{timeframe}.csv"
def _get_default_lot_size(self) -> float: """Read fixed_lots from ea_config.json as fallback""" try: config_path = DATA_DIR / "ea_config.json" if config_path.exists(): with open(config_path, 'r') as f: config = json.load(f) # Support both nested and flat structures if "order" in config: return float(config["order"].get("volume", 0.1)) return float(config.get("fixed_lots", 0.1)) except Exception as e: logger.warning(f"Could not read default lot from ea_config.json: {e}") return 0.1 def _get_model_role(self, model_type: str, timeframe: str) -> AladdinPersona: """ Determine model role (Aladdin Persona) based on type. """ # Map current models to Aladdin Experts # XGBoost -> The Quant (Microstructure) # LightGBM -> The Archivist (Technical) # RandomForest -> The Futurist (Context) # Stacking -> The Leader (Ensemble) role_map = { 'xgboost': AladdinPersona.QUANT, 'lightgbm': AladdinPersona.ARCHIVIST, 'randomforest': AladdinPersona.FUTURIST, 'catboost': AladdinPersona.SENTINEL, 'stacking': AladdinPersona.LEADER, } return role_map.get(model_type, AladdinPersona.ARCHIVIST) def _check_financial_viability(self, current_price: float, signal: str, atr: float, timeframe: str = '1h') -> Tuple[bool, str]: """ ALADDIN FINANCIAL GUARD (Expert Tuned): Ensure trade is viable relative to timeframe-specific expectations. """ if signal == 'NEUTRAL': return True, "Neutral" # Standard constants (Assuming EURUSD-like 5 decimals) POINT = 0.00001 # Expert Trader ROI Tuning: # Scalping (M1/M5) needs lower friction thresholds to function. if timeframe in ['1m', '5m']: # Scalping Mode: Fast exits, lower R:R requirement for entry MIN_PROFIT_PTS = 20 # 2 pips (consistent with user request) REQ_ATR_MULT = 1.2 # Lowered slightly for high-frequency signals elif timeframe in ['15m', '30m']: # Day Trading Mode MIN_PROFIT_PTS = 40 # 4 pips REQ_ATR_MULT = 1.8 else: # Swing Mode (H1+) MIN_PROFIT_PTS = 80 # 8 pips REQ_ATR_MULT = 2.2 # Estimate Potential Move potential_move_pts = (atr * REQ_ATR_MULT) / POINT # Strict Aladdin Check if potential_move_pts < MIN_PROFIT_PTS: return False, f"Insufficient Potential: {potential_move_pts:.0f} pts < {MIN_PROFIT_PTS} pts ({timeframe} req)" return True, "Viable" def _is_market_open(self) -> Tuple[bool, str]: """ Check if Forex market is currently open. Forex market hours: - Open: Sunday 22:00 UTC to Friday 22:00 UTC - Closed: Friday 22:00 to Sunday 22:00 (Weekend) - Closed: January 1 (New Year's Day) - Closed: December 25 (Christmas Day) Returns: (is_open, reason) tuple """ from datetime import timezone now_utc = datetime.now(timezone.utc) weekday = now_utc.weekday() # Monday=0, Sunday=6 hour = now_utc.hour month = now_utc.month day = now_utc.day # Check major Forex holidays if month == 1 and day == 1: return False, "New Year's Day" if month == 12 and day == 25: return False, "Christmas Day" # Check weekend closure if weekday == 4 and hour >= 22: # Friday after 22:00 UTC return False, "Weekend (Friday close)" elif weekday == 5: # Saturday return False, "Weekend (Saturday)" elif weekday == 6 and hour < 22: # Sunday before 22:00 UTC return False, "Weekend (Sunday pre-open)" return True, "Market Open"
[docs] def get_enabled_models(self) -> List[str]: """Get list of enabled model types from settings""" from config.settings import default_settings # Ensure we return valid model types that exist in the system valid_types = ["xgboost", "lightgbm", "randomforest", "catboost", "stacking"] enabled = getattr(default_settings, "models_enabled", valid_types) # Helper: Intersection of enabled and valid return [m for m in enabled if m in valid_types]
[docs] def predict_timeframe(self, timeframe: str, df: Optional[pd.DataFrame] = None) -> Optional[dict]: """ Generate prediction for a single timeframe using Congress Engine. """ logger.info(f"════════════════════════════════════════════════════════════") logger.info(f" PREDICTION START: {timeframe}") logger.info(f"════════════════════════════════════════════════════════════") self._notify("prediction_start", {"timeframe": timeframe}) # === MARKET CLOSED CHECK === # Skip predictions when Forex market is closed (saves resources) is_open, reason = self._is_market_open() if not is_open: logger.info(f"[{timeframe}] ⏸️ Market closed ({reason}). Skipping prediction.") self._notify("prediction_skipped", {"timeframe": timeframe, "reason": reason}) return { "timeframe": timeframe, "signal": "NEUTRAL", "confidence": 0.0, "final_score": 0.0, "status": "market_closed", "reason": reason, "timestamp": datetime.now().isoformat(), } try: # 1. LOAD DATA (OHLC) self._notify("step_update", {"step": "data", "status": "active"}) t_start = datetime.now() # If df provided (In-Memory Pipeline), use it. Otherwise load FAST buffer. if df is None: # OPTIMIZATION: Use get_buffer(6000) instead of load_ohlc() # load_ohlc reads the full CSV (100MB+), which is too slow (2s+) for real-time. # get_buffer fetches recent data from MT5 API (ms). df = self.mt5.get_buffer(timeframe, n=6000) # Fallback to disk if API fails but we have data if df is None or df.empty: logger.warning(f"[{timeframe}] Buffer empty, falling back to disk load...") # OPTIMIZATION: Use load_ohlc_buffer(2000) instead of full load df = self.mt5.load_ohlc_buffer(timeframe, n=2000) t_data = datetime.now() if df is None or df.empty: raise DataMissingError( f"OHLC Data missing for {timeframe}. Please Extract Data first.", timeframe=timeframe ) # 2. APPLY OPTIMIZED ROW LIMITS FOR INFERENCE self._notify("step_update", {"step": "data", "status": "done", "elapsed": (datetime.now()-t_start).total_seconds()}) self._notify("step_update", {"step": "optimize", "status": "active"}) t_opt = datetime.now() # CRITICAL OPTIMIZATION: # We only need the last ~2000 candles to correctly calculate indicators (SMA200, etc). # Processing 5000+ rows wastes CPU. inference_limit = 2000 if len(df) > inference_limit: df = df.tail(inference_limit).reset_index(drop=True) self._notify("step_update", {"step": "optimize", "status": "done", "elapsed": (datetime.now()-t_opt).total_seconds()}) # --- FRESHNESS CHECK --- last_time = df['time'].iloc[-1] # ... (Existing freshness logic) ... # 3. FRESHNESS CHECK t_feat_start = datetime.now() # Placeholder logger.info("─── STEP 1/5: DATA PREPARATION ───") logger.info(f"Stopwatch: Data Load={(t_data-t_start).total_seconds()*1000:.1f}ms (Rows: {len(df)})") tf_config = get_timeframe_by_name(timeframe) data_status = "fresh" data_age_str = "0m" if tf_config: from config.settings import default_settings # --- SIMPLIFIED TIMEZONE HANDLING --- # MT5 servers are often in GMT+2/3, local machine may differ. # Instead of complex heuristics, we use a generous allowed offset. # This prevents false "Stale" warnings due to timezone differences. now = datetime.now() time_diff = now - last_time age_seconds = abs(time_diff.total_seconds()) # Use abs() for both directions # Allow up to 2 hours of timezone offset (covers most cases: GMT+0 to GMT+3) TIMEZONE_OFFSET_TOLERANCE_SECONDS = 2 * 3600 # 2 hours # If age is suspiciously close to exact hours (within 5 min), assume TZ offset if age_seconds > 3300: # > 55 minutes residual = age_seconds % 3600 # Remainder after removing full hours if residual < 300 or residual > 3300: # Within 5 min of exact hour # Likely TZ offset, use residual as "true" age adjusted_seconds = min(residual, 3600 - residual) else: adjusted_seconds = age_seconds else: adjusted_seconds = age_seconds minutes_old = int(adjusted_seconds / 60) # Display string if minutes_old < 60: data_age_str = f"{minutes_old}m" else: data_age_str = f"{minutes_old // 60}h {minutes_old % 60}m" max_age_minutes = tf_config.minutes * default_settings.staleness_multiplier # Use ADJUSTED seconds for the staleness check (more lenient) is_truly_stale = adjusted_seconds > (max_age_minutes * 60) if is_truly_stale: logger.warning(f"[{timeframe}] Data STALE (Age: {data_age_str}). Triggering update...") self._notify("data_update", {"timeframe": timeframe, "status": "refreshing"}) try: update_df = self.mt5.extract_ohlc(timeframe, is_update=True) if update_df is None or update_df.empty: logger.warning(f"[{timeframe}] Update returned no new data. Proceeding with existing data.") data_status = "stale_usable" self._notify("data_update", {"timeframe": timeframe, "status": "stale_warning", "age": data_age_str}) else: df = self.mt5.load_ohlc_buffer(timeframe, n=2000) if df is None or df.empty: raise ValueError(f"CRITICAL: Failed to reload data after update for {timeframe}.") logger.info(f"[{timeframe}] Data refreshed. New length: {len(df)}") data_status = "fresh" last_time = df['time'].iloc[-1] time_diff = datetime.now() - last_time minutes_old = int(time_diff.total_seconds() / 60) if minutes_old < 60: data_age_str = f"{minutes_old}m" else: data_age_str = f"{minutes_old // 60}h {minutes_old % 60}m" except Exception as e: logger.error(f"Auto-update failed: {e}") data_status = "stale_error" # --- WARMUP CHECK --- if len(df) < 500: raise DataMissingError( f"Insufficient history ({len(df)} candles). Need > 500.", timeframe=timeframe ) # 2. CHECK MODELS (with graceful degradation) required_models = ['xgboost', 'lightgbm', 'randomforest', 'stacking'] available_models = [] missing_models = [] for m_type in required_models: m_path = MODELS_DIR / f"{m_type}_{timeframe}.pkl" if m_path.exists(): available_models.append(m_type) else: missing_models.append(m_type) logger.warning(f"[{timeframe}] Model missing: {m_type}") # Need at least 2 models for reliable prediction if len(available_models) < 2: raise InsufficientModelsError( f"Only {len(available_models)} model(s) available for {timeframe}. Need at least 2.", available_models=len(available_models), required_models=2 ) # 3. FEATURE ENGINEERING (Indicators + Features) self._notify("feature_calc", {"status": "calculating"}) self._notify("step_update", {"step": "features", "status": "active"}) # Ensure 'spread' column exists if 'spread' not in df.columns: df['spread'] = 0 # OPTIMIZATION: Feature Cache Check last_time = df['time'].iloc[-1] cache_key = (timeframe, str(last_time)) # --- FEATURE CALCULATION STOPWATCH --- t_feat_start = datetime.now() cached = self._feature_cache.get(cache_key) if cached: X_final, feature_cols = cached logger.debug(f"[{timeframe}] CACHE HIT: Reusing pre-computed features") else: try: # Calculate all features df = self.feature_engine.calculate_all_features(df, timeframe=timeframe) except Exception as feat_err: logger.error(f"[{timeframe}] Feature calculation failed in prediction: {feat_err}") # Index FE-200: Feature Cache recovery get_sentinel(tm=self.training_manager).repair("FE-200", {"timeframe": timeframe}) raise FeatureCalculationError( f"Feature calculation failed: {feat_err}", timeframe=timeframe ) if df is None or df.empty: raise FeatureCalculationError( "Feature calculation returned empty data", timeframe=timeframe ) # If we get here, resources are valid. Proceed. # --- FEATURE ALIGNMENT --- # Load training schema to align columns EXACTLY # Priority: 1. Normalizer's fitted columns, 2. JSON schema, 3. All columns (fallback) feature_cols = [] # First, try to get features from the fitted normalizer normalizer = self.training_manager.get_normalizer(timeframe) if normalizer and normalizer.is_fitted and normalizer.quantile_stats: feature_cols = list(normalizer.quantile_stats.keys()) logger.debug(f"[{timeframe}] Using normalizer features: {len(feature_cols)} features") # Fallback to JSON schema file if not feature_cols: schema_path = MODELS_DIR / f"features_{timeframe}.json" if schema_path.exists(): try: with open(schema_path, 'r') as f: feature_cols = json.load(f) logger.debug(f"[{timeframe}] Loaded feature schema: {len(feature_cols)} features") except Exception as e: logger.warning(f"Failed to load feature schema: {e}") if not feature_cols: # Last resort fallback (Risky!) exclude = ['time', 'open', 'high', 'low', 'close', 'volume'] feature_cols = [c for c in df.columns if c not in exclude] logger.warning(f"[{timeframe}] No schema found! (Models likely outdated). Please RETRAIN models to fix capabilities. Using {len(feature_cols)} features as fallback.") # Align DataFrame to schema # 1. Add missing columns with 0 missing_cols = set(feature_cols) - set(df.columns) if missing_cols: logger.warning(f"[{timeframe}] Missing {len(missing_cols)} features (filling 0): {list(missing_cols)[:5]}...") for c in missing_cols: df[c] = 0.0 # 2. Select only schema columns in correct order X_df = df[feature_cols] # Apply normalization normalizer = self.training_manager.get_normalizer(timeframe) if normalizer.is_fitted: X_normalized = normalizer.transform(X_df, feature_cols) X_final = X_normalized else: logger.warning(f"[{timeframe}] Normalizer NOT FITTED! Using raw features (Models will likely fail)") X_final = X_df # Store in cache with self._cache_lock: self._feature_cache[cache_key] = (X_final, feature_cols) # Evict old entries if cache is full if len(self._feature_cache) > self._feature_cache_max_size: oldest_key = next(iter(self._feature_cache)) del self._feature_cache[oldest_key] logger.debug(f"[{timeframe}] CACHE MISS: Computed and stored features") t_feat_end = datetime.now() self._notify("step_update", {"step": "features", "status": "done", "elapsed": (t_feat_end - t_feat_start).total_seconds()}) # === DEBUG: FEATURE SNAPSHOT === if get_debug_manager().is_enabled("PREDICTION"): if X_final.shape[1] > 0: # Use .iloc[-1] for pandas DataFrame to get last row row_snapshot = X_final.iloc[-1] if hasattr(X_final, 'iloc') else X_final[-1] get_debug_manager().log("PREDICTION", f"[{timeframe}] Feature Input (Last Row)", row_snapshot) get_debug_manager().log("PREDICTION", f"[{timeframe}] Feature Cols", feature_cols) # --- REGIME DETECTION (AI Core 3.0) --- # Use new GMM Regime Detector detector = RegimeDetector() regime = "RANGE" # Default regime_conf = 0.5 if detector.load(timeframe): try: # Detect on recent data # Predict returns value for whole DF, take last one pad_idx = detector.predict(df) last_r_id = pad_idx[-1] regime = {0: "RANGE", 1: "TREND", 2: "PANIC"}.get(last_r_id, "RANGE") regime_conf = 0.8 # GMM usually high confidence if fitted except Exception as e: logger.error(f"Regime detection failed: {e}") else: logger.warning(f"[{timeframe}] Regime Detector not trained. Defaulting to RANGE.") logger.info("─── STEP 2/5: REGIME DETECTION ───") logger.info(f"Detected Regime: {regime.upper()} (Confidence: {regime_conf:.2f})") # --- MAGIC LOGIC: SMART MONEY CALCULATIONS --- smc_signals = {} try: # Calculate ATR for dynamic threshold (V2 Logic) from core.noble_kpi import calculate_atr atr_series = calculate_atr(df, period=14) # Calculate footprints on full DF with ATR filter fvg_df = self.smc_engine.detect_fair_value_gaps(df, atr=atr_series) ob_df = self.smc_engine.detect_order_blocks(df) sweep_df = self.smc_engine.detect_liquidity_sweeps(df) # Get signals from last closed candle (or second to last if current is forming?) # We use -1 (latest available data, usually open candle if live, or closed if history) # Assuming df includes current forming candle, we might want to check if it's closed? # For now using -1 to align with models. last_idx = -1 smc_signals = { 'fvg_bull': bool(fvg_df['fvg_bull'].iloc[last_idx]), 'fvg_bear': bool(fvg_df['fvg_bear'].iloc[last_idx]), 'ob_bull': bool(ob_df['ob_bull'].iloc[last_idx]), 'ob_bear': bool(ob_df['ob_bear'].iloc[last_idx]), 'sweep_bull': bool(sweep_df['sweep_bull'].iloc[last_idx]), 'sweep_bear': bool(sweep_df['sweep_bear'].iloc[last_idx]) } # Log if any magic is detected if any(smc_signals.values()): logger.info(f"[{timeframe}] 🔮 SMC FOOTPRINTS DETECTED: {smc_signals}") except Exception as e: logger.warning(f"SMC Calculation failed: {e}") # Get predictions from each model as ModelPrediction objects logger.info("─── STEP 3/5: MULTI-MODEL INFERENCE ───") model_signals_legacy = {} # For backward compatibility model_predictions = [] # List of ModelPrediction objects for Congress # --- OPTIMIZATION: PRE-WARM MODELS (Avoid Lock Contention) --- # We load all models sequentially first so that the parallel threads # don't fight over the TrainingManager LRU lock. active_models = [] model_types = self.get_enabled_models() for mt in model_types: m = self.training_manager.get_model(timeframe, mt) if m: if not m.is_trained: m.load() # Force load if needed if m.is_trained: active_models.append(mt) # OPTIMIZATION: Parallel model inference using ThreadPoolExecutor # Now fully non-blocking since models are pre-loaded in memory from concurrent.futures import ThreadPoolExecutor, as_completed def _run_model_prediction(model_type: str): """Run single model prediction - thread-safe for inference.""" try: # Model is GUARANTEED to be loaded by pre-warming step above # But we use get_model to retrieve the reference safely # (Lock duration is minimal since it just returns existing obj) model = self.training_manager.get_model(timeframe, model_type) if model is None or not model.is_trained: return None # Get standardized prediction (all models use last row) input_data = X_final.iloc[-1:] std_pred = model.predict_standard(input_data, regime=regime) # === DEBUG: MODEL PROBE === if get_debug_manager().is_enabled("PREDICTION"): get_debug_manager().log("PREDICTION", f"[{timeframe}] {model_type.upper()} Raw", { "Score": std_pred.score, "Conf": std_pred.confidence, "Regime": regime }) # Create ModelPrediction for Congress model_pred = ModelPrediction( model_name=model_type, model_role=self._get_model_role(model_type, timeframe), score=std_pred.score, confidence=std_pred.confidence, regime=regime, timeframe=timeframe, ) return (model_type, model_pred, std_pred) except Exception as e: logger.warning(f"[{timeframe}] Model {model_type} prediction failed (graceful skip): {e}") return None # Run all 5 models in parallel with ThreadPoolExecutor(max_workers=5) as executor: futures = {executor.submit(_run_model_prediction, mt): mt for mt in ['xgboost', 'lightgbm', 'randomforest', 'catboost', 'stacking']} for future in as_completed(futures): result = future.result() if result: model_type, model_pred, std_pred = result model_predictions.append(model_pred) model_signals_legacy[model_type] = { "signal": std_pred.signal, "confidence": std_pred.confidence } # Notify specific model done self._notify("step_update", {"step": model_type, "status": "done", "elapsed": 0.5}) # Elapsed is tricky in parallel self._notify("model_prediction", { "timeframe": timeframe, "model_type": model_type, "signal": std_pred.signal, "confidence": std_pred.confidence, "score": std_pred.score, }) logger.info(f"[{timeframe}] {model_type} -> Signal={std_pred.signal}, Score={std_pred.score:.4f}, Probs={std_pred.probabilities}") # 4. CONGRESS CONSENSUS (With Continuum Flux) logger.info("─── STEP 4/5: CONGRESS CONSENSUS ───") self._notify("step_update", {"step": "congress", "status": "active"}) t_cong = datetime.now() # --- CONTINUUM FLUX PHYSICS V2 --- flux_boost = 0.0 flux_details = {} try: from core.continuum_engine import get_continuum_engine cont_engine = get_continuum_engine(self.mt5) # Pass primary_df for resistance context (optional future expansion) f_boost, f_details = cont_engine.analyze_flux_breakout(timeframe, primary_df=df) flux_boost = f_boost flux_details = f_details if flux_boost != 0: self._notify("flux_alert", { "timeframe": timeframe, "boost": flux_boost, "type": flux_details.get('type', 'kinetic_action') }) except Exception as e: logger.warning(f"Continuum Flux failed: {e}") logger.info(f"[{timeframe}] ⚛️ FLUX INPUT: {flux_boost} (Type: {flux_details.get('type','none')})") # Defaults for Prop Firm / Result construction suggested_lots = self._get_default_lot_size() guardian_alert = "" if model_predictions: decision = self.congress.aggregate( predictions=model_predictions, df=df, timeframe=timeframe, smc_signals=smc_signals, flux_boost=flux_boost, flux_details=flux_details ) final_signal = decision.final_signal final_confidence = decision.final_confidence final_score = decision.final_score # --- TECHNICAL VALIDATION (RiskFilter) --- if final_signal != "NEUTRAL": try: # 1. Calc Noble Safe Range (if not already done) # Need ATR and Mean. Use current_atr if avail or approx. last_row_atr = df.iloc[-1] current_close = float(last_row_atr['close']) current_atr = float(last_row_atr.get('atr', 0.0010)) # Calculate Safe Range (Standard Deviation Bounds) safe_low, safe_high = noble_safe_range( current_close, current_atr, timeframe, regime={ "RANGE":0, "TREND":1, "PANIC":2 }.get(regime, 0) ) # 2. Apply Risk Filter risk_filter = get_risk_filter() # Convert Decision to Dict for processing congress_dict = { "signal": final_signal, "confidence": final_confidence, "score": final_score } congress_dict = risk_filter.apply_to_congress_result( congress_dict, current_close, df, safe_low, safe_high ) # 3. Update Decision with Risk Adjustments final_signal = congress_dict["signal"] final_confidence = congress_dict["confidence"] # Add Note if risk adjusted if "risk_check" in congress_dict: chk = congress_dict["risk_check"] if not chk["is_valid"]: logger.warning(f"[{timeframe}] 🛡️ RiskFilter Adjusted: {chk['original_signal']} -> {final_signal} ({chk['reason']})") if decision: decision.override_type = f"risk_filter_{chk['reason'][:10]}" except Exception as e: logger.error(f"RiskFilter error: {e}") # --- ALADDIN FINANCIAL GUARD (+1 USD Protection) --- if final_signal != "NEUTRAL": try: # Extract metrics last_row_atr = df.iloc[-1] current_close = float(last_row_atr['close']) current_atr = float(last_row_atr.get('atr', 0.0010)) # === PROP FIRM GUARDIAN CHECK === guardian = get_guardian() guardian_alert = "" suggested_lots = 0.01 if guardian.enabled and self.mt5: # 1. Fetch Equity (Real-time) try: acc_info = self.mt5.get_account_info() # Assuming this method exists or similar if acc_info: equity = acc_info.get('equity', 0.0) # 2. Check Allowed is_allowed, g_reason = guardian.check_trade_allowed(equity) if not is_allowed: logger.error(f"⛔ PROP FIRM BLOCK: {g_reason}") return { "signal": "NEUTRAL", "confidence": 0.0, "reason": f"PROP_BLOCK: {g_reason}", "timeframe": timeframe } # 3. Calculate Risk Lot Size # Estimate Safe Range width as Stop Loss substitute if standard SL not avail # Or use ATR * 1.5 (Scalp) / 2.5 (Swing) sl_pips = (current_atr * 1.5 * 10000) if timeframe in ['1m','5m'] else (current_atr * 2.5 * 10000) suggested_lots = guardian.calculate_lot_size(equity, sl_pips, self.symbol, default_lot=self._get_default_lot_size()) guardian_alert = f"Risk Lot: {suggested_lots} ({g_reason})" except Exception as e: logger.warning(f"Guardian check warning: {e}") # === ALADDIN FINANCIAL GUARD === is_viable, reason = self._check_financial_viability( current_close, final_signal, current_atr, timeframe=timeframe ) if not is_viable: # CRITICAL: Check Financial Guard Policy from DB db = get_database() guard_policy = db.get_config("financial_guard_policy", "Advisory (Notify Only)") is_strict = "Strict" in guard_policy if is_strict: logger.warning(f"[ALADDIN GUARD] BLOCKED: Signal {final_signal} cancelled (Profit < $1). Policy=STRICT.") final_signal = "NEUTRAL" decision.override_type = "aladdin_strict_block" else: logger.info(f"[ALADDIN GUARD] Advisory: Signal {final_signal} has Low Potential ({reason}). Proceeding anyway (Policy=Advisory).") decision.override_type = "aladdin_low_vol" except Exception as e: logger.warning(f"Aladdin Guard Check Failed: {e}") # --- DEBUG LOGGING FOR USER --- logger.info(f"[DECISION] {timeframe} | Signal={final_signal} | Score={final_score:.4f} | Conf={final_confidence:.4f}") logger.info(f" Breakdown: {decision.model_predictions}") # --- PHASE 3: META-LABELING GUARD --- try: meta_guard = MetaModel(timeframe, MODELS_DIR) if meta_guard.load(): # We need: Signal, Conf, Regime, Volatility, Hour # Volatility calc (approx from last 10 candles if possible, or just use ATR) # We have 'current_atr' from Aladdin check above. vol_est = df['close'].pct_change().tail(10).std() if len(df) > 10 else 0.001 hour_now = datetime.now().hour viability = meta_guard.predict_viability( signal=final_signal, conf=final_confidence, regime={ "RANGE":0, "TREND":1, "PANIC":2 }.get(regime, 0), volatility=vol_est, hour=hour_now ) logger.info(f"[{timeframe}] 🛡️ Meta-Guard Viability: {viability:.2%}") if viability < 0.55: # Strict filter logger.warning(f"[{timeframe}] 🛡️ META-VETO: Signal {final_signal} rejected (Low Viability {viability:.2%})") final_signal = "NEUTRAL" final_score = 0.0 # Add override note to decision if possible if decision: decision.override_type = "meta_veto" else: # Meta model not ready yet (allow signal) pass except Exception as e: logger.error(f"Meta-Guard failed: {e}") # ------------------------------ else: # Fallback to NEUTRAL if no models available final_signal = "NEUTRAL" final_confidence = 0.0 final_score = 0.0 decision = None logger.info("─── STEP 5/5: SIGNAL FINALIZATION ───") # Stopwatch logging for production diagnostics self._notify("step_update", {"step": "congress", "status": "done", "elapsed": (datetime.now()-t_cong).total_seconds()}) t_model_end = datetime.now() feat_ms = (t_feat_end - t_feat_start).total_seconds() * 1000 model_ms = (t_model_end - t_feat_end).total_seconds() * 1000 total_ms = (t_model_end - t_start).total_seconds() * 1000 logger.info(f"STOPWATCH [{timeframe}]: Features={feat_ms:.1f}ms, Models={model_ms:.1f}ms, TOTAL={total_ms:.1f}ms") # Create result with full details timestamp = datetime.now() # Extract model details from congress decision model_details = [] if decision and decision.model_predictions: for pred in decision.model_predictions: signal = "BUY" if pred.score > 0.3 else ("SELL" if pred.score < -0.3 else "NEUTRAL") model_details.append({ "model": pred.model_name, "signal": signal, "confidence": round(pred.confidence, 4), "score": round(pred.score, 4), "weight": decision.weights_applied.get(pred.model_name, 0.33), "contribution": round(pred.score * pred.confidence * decision.weights_applied.get(pred.model_name, 0.33), 4) }) # Extract critical features for EA (ATR, etc) ea_features = {} if not df.empty: last_row = df.iloc[-1] for feat in ['close', 'atr', 'rsi', 'volume']: if feat in df.columns: try: ea_features[feat] = float(last_row[feat]) except: ea_features[feat] = 0.0 else: ea_features[feat] = 0.0 # 8. PREPARE FINAL RESULT DICT # Initialize strictly to ensure validity final_result = { "timestamp": timestamp, "asset": "EURUSD", "timeframe": timeframe, "signal": final_signal, "confidence": final_confidence, "score": final_score, "trend_certainty": decision.trend_certainty if decision else 0.0, "duration_tf5": decision.duration_tf5 if decision else 0, "regime": regime, "regime_confidence": decision.final_confidence if decision else 0, "model_signals": model_signals_legacy, "model_details": model_details, "congress_decision": decision.to_dict() if decision else None, "features": ea_features, "data_status": data_status, "data_age": data_age_str, "suggested_lots": suggested_lots, # Prop Firm Risk Lot "guardian_alert": guardian_alert, # Safety message } # 9. SAVE SIGNAL (JSON & DB) # This returns the path which we add to the result saved_path = self._save_signal(final_result) # Finalize final_result["file_path"] = str(saved_path) # Track for drift detection self.drift_detector.add_prediction({ 'signal': final_signal, 'confidence': final_confidence, }) # AUDIT CACHE try: get_audit_cache().log_prediction_event(final_result) except: pass self._notify("prediction_complete", { "timeframe": timeframe, "signal": final_signal, "confidence": final_confidence, "score": final_score, "regime": regime, }) logger.info( f"Prediction {timeframe}: {final_signal} " f"(score={final_score:.3f}, conf={final_confidence:.2%}, regime={regime})" ) return final_result except PredictionError as pe: # Handle custom prediction errors with structured response error_type = type(pe).__name__ is_recoverable = getattr(pe, 'recoverable', False) logger.warning(f"[{timeframe}] {error_type}: {pe}") self._notify("prediction_error", { "timeframe": timeframe, "error": str(pe), "error_type": error_type, "recoverable": is_recoverable, }) # Return partial error result instead of None return { "timestamp": datetime.now(), "asset": "EURUSD", "timeframe": timeframe, "signal": "ERROR", "confidence": 0.0, "score": 0.0, "error": str(pe), "error_type": error_type, "recoverable": is_recoverable, } except Exception as e: # Unexpected errors logger.error(f"[{timeframe}] Unexpected prediction error: {e}", exc_info=True) # Index PR-400: Prediction Engine Recovery get_sentinel(tm=self.training_manager).repair("PR-400", {"timeframe": timeframe}) self._notify("prediction_error", { "timeframe": timeframe, "error": str(e), "error_type": "UnexpectedError", "recoverable": False, }) return None
def _save_signal(self, result: dict): """Save signal to JSON and Database""" # === JSON (new format with full details) === from config.settings import SIGNALS_DIR # Extract features if available in result, otherwise try to get from somewhere? # Ideally 'result' should have it. # Format timestamp for MT5 (YYYY.MM.DD HH:MM:SS) - Remove 'T' ts_str = result["timestamp"].strftime("%Y.%m.%d %H:%M:%S") if hasattr(result["timestamp"], "strftime") else str(result["timestamp"]).replace("T", " ").split(".")[0] gen_str = datetime.now().strftime("%Y.%m.%d %H:%M:%S") # CRITICAL: Order matters for MQL5 naive parser. # Put simple keys FIRST. Complex lists/dicts LAST. json_output = { "signal": result["signal"], "confidence": result["confidence"], "timestamp": ts_str, "timestamp": ts_str, "timestamp_unix": int(result["timestamp"].replace(tzinfo=None).timestamp()), # Ensure naive -> local timestamp conversion is standard, or better: # ACTUALLY, to match TimeGMT() in EA, we need to send UTC timestamp. # If result['timestamp'] is local (likely), .timestamp() gives local epoch. # TimeGMT() in MQL returns "seconds since 1970 UTC". # Python .timestamp() returns "seconds since 1970 UTC" (if object is naive, it assumes it represents local time). # So they SHOULD match if PC and Server are correct. # But let's be explicit: "timestamp_unix": int(datetime.now().timestamp()), # Always use current machine time as simplified 'now' reference? No, signal time. # Best Practice: Use explicit UTC logic if possible, but for now stick to simple .timestamp() which yields UTC-equivalent for local time. "timestamp_unix": int(result["timestamp"].timestamp()), "generated_at": gen_str, "symbol": result.get("asset", "EURUSD"), "timeframe": result["timeframe"], "score": result.get("score", 0), "status": "ACTIVE", "risk_check": result.get("risk_check", {"is_valid": True}), "features": result.get("features", {}), "regime": result.get("regime", "unknown"), "regime_confidence": result.get("regime_confidence", 0), "suggested_lots": result.get("suggested_lots", 0.1), "guardian_alert": result.get("guardian_alert", ""), "version": "2.1.1", # Complex objects LAST to avoid 'StringFind' hitting them first "model_details": result.get("model_details", []), "regime_details": result.get("regime_details", {}), "congress_decision": result.get("congress_decision", {}), "override_type": (result.get("congress_decision") or {}).get("override_type"), } # Save ONLY latest version (History is in DB) tf = result["timeframe"] symbol = result.get("asset", "EURUSD") # Determine output directory (Dynamic from Settings) output_dir = SIGNALS_DIR try: db = get_database() custom_dir = db.get_config("signals_dir") if custom_dir: output_dir = Path(custom_dir) else: # FALLBACK: Try to get path dynamically from MT5 (Instance Specific) try: # We need to access the mt5 connector directly or via AppController logic # Since self.mt5 is available: if self.mt5 and self.mt5.connected: term_info = self.mt5.terminal_info() if term_info and term_info.data_path: dynamic_path = Path(term_info.data_path) / "MQL5" / "Files" / "Cerebrum" / "Signals" output_dir = dynamic_path logger.info(f"Using dynamic instance path: {output_dir}") else: # If we can't get terminal info, fallback to configured output_dir = SIGNALS_DIR else: output_dir = SIGNALS_DIR except Exception as e: logger.warning(f"Could not resolve dynamic MT5 path: {e}") output_dir = SIGNALS_DIR except Exception as e: logger.error(f"Failed to get custom signals_dir from DB: {e}") output_dir = SIGNALS_DIR # Ensure directory exists try: output_dir.mkdir(parents=True, exist_ok=True) except Exception as e: logger.error(f"Failed to create signal directory {output_dir}: {e}") # Fallback to local data dir if system permission error output_dir = SIGNALS_DIR output_dir.mkdir(parents=True, exist_ok=True) latest_path = output_dir / f"{symbol}_{tf}_latest.json" logger.info(f"[IO] Saving SIGNAL to: {latest_path.absolute()}") logger.info(f"Signal File Path = {latest_path.absolute()}") # Retry logic for robust writing (handle MT5 locking) max_retries = 3 for attempt in range(max_retries): try: with open(latest_path, 'w') as f: json.dump(json_output, f, indent=4) logger.info(f"✅ JSON signal successfully saved to {latest_path}") break except Exception as e: if attempt < max_retries - 1: time.sleep(0.1) else: logger.error(f"❌ Failed to save signal JSON to {latest_path} after retries: {e}") try: self._save_to_db(json_output, model_details=result["model_details"]) except Exception as e: logger.error(f"Failed to save to DB: {e}") return latest_path def _save_to_db(self, data, model_details): """Save prediction result to SQLite database""" db = get_database() # Singleton # Convert model_details list to legacy dict format for DB # DB expects: {'xgboost': (sig, conf), 'lightgbm': ...} model_signals_legacy = {} if model_details: for d in model_details: model = d.get('model', '').lower() if model in ['xgboost', 'lightgbm', 'stacking']: # Handle if we already have a dict or the legacy tuple val = d.get('signal', 'NEUTRAL') conf = d.get('confidence', 0.0) model_signals_legacy[model] = (val, conf) # Parse timestamp safely ts = data.get("timestamp") if isinstance(ts, str): try: ts = datetime.strptime(ts, "%Y.%m.%d %H:%M:%S") except: ts = datetime.now() # Call legacy method (until DB schema is updated) db.add_signal( timestamp=ts, timeframe=data.get("timeframe"), signal=data.get("signal"), confidence=data.get("confidence", 0.0), model_signals=model_signals_legacy )
[docs] def load_signal(self, timeframe: str) -> Optional[dict]: """Load current signal from CSV file""" path = self.get_signal_path(timeframe) if not path.exists(): return None try: df = pd.read_csv(path, sep=";") if df.empty: return None row = df.iloc[0] return { "timestamp": row["timestamp"], "asset": row["asset"], "timeframe": row["timeframe"], "signal": row["signal"], "confidence": float(row["confidence"]), "score": float(row.get("score", 0)), "regime": row.get("regime", "unknown"), } except Exception as e: logger.error(f"Failed to load signal for {timeframe}: {e}") return None
[docs] def reset_engine(self): """ [PR-400] Reset the prediction engine state. Flushes all caches and buffers. """ logger.warning("[PredictionEngine] Engine reset triggered. Flushing all caches...") try: with self._cache_lock: self._feature_cache.clear() return True except Exception as e: logger.error(f"Engine reset failed: {e}") return False
[docs] def predict_all(self) -> Dict[str, dict]: """Generate predictions for all timeframes in parallel (Turbo Prediction)""" self.is_predicting = True results = {} from concurrent.futures import ThreadPoolExecutor, as_completed logger.info(f"🚀 Launching Turbo Prediction for {len(TIMEFRAME_NAMES)} timeframes...") # Parallel execution: limit workers to 4 to balance CPU load with ThreadPoolExecutor(max_workers=4) as executor: future_to_tf = {executor.submit(self.predict_timeframe, tf): tf for tf in TIMEFRAME_NAMES} for future in as_completed(future_to_tf): tf = future_to_tf[future] try: res = future.result() if res: results[tf] = res except Exception as e: logger.error(f"Parallel prediction failed for {tf}: {e}") results[tf] = {"error": str(e), "signal": "ERROR"} self.is_predicting = False return results
[docs] def get_prediction_status(self, timeframe: str) -> dict: """Get prediction status for a timeframe""" signal = self.load_signal(timeframe) return { "timeframe": timeframe, "has_signal": signal is not None, "signal": signal, "is_predicting": self.is_predicting, }
[docs] def get_combined_signal(self) -> dict: """ Get overall signal combining all timeframes using Congress Engine. This aggregates signals across timeframes with timeframe-aware weighting. """ signals = [] model_predictions: List[ModelPrediction] = [] for tf in TIMEFRAME_NAMES: signal = self.load_signal(tf) if signal: signals.append(signal) # Create pseudo-ModelPrediction for cross-timeframe Congress model_predictions.append(ModelPrediction( model_name=f"ensemble_{tf}", model_role=ModelRole.TECHNICAL, # All TF ensembles treated equally score=signal.get('score', 0), confidence=signal.get('confidence', 0), regime=signal.get('regime', 'range'), timeframe=tf, )) if not signals: return { "signal": "NEUTRAL", "confidence": 0.0, "score": 0.0, "active_timeframes": 0, } # Use Congress for weighted aggregation if we have predictions if model_predictions: # Get empty df for regime (use most recent signal's regime) regime = signals[-1].get('regime', 'range') # Weight by timeframe (longer TF = more weight) tf_weights = { '1m': 0.5, '5m': 0.6, '15m': 0.7, '30m': 0.8, '1h': 1.0, '4h': 1.2, '1d': 1.5, '1w': 1.8 } weighted_sum = 0.0 weight_total = 0.0 for pred in model_predictions: w = tf_weights.get(pred.timeframe, 1.0) weighted_sum += w * pred.score * pred.confidence weight_total += w * pred.confidence final_score = weighted_sum / (weight_total + 1e-8) final_confidence = weight_total / len(model_predictions) # Determine signal threshold = 0.3 if final_score > threshold: final_signal = "BUY" elif final_score < -threshold: final_signal = "SELL" else: final_signal = "NEUTRAL" else: # Fallback to simple voting votes = {"BUY": 0, "SELL": 0, "NEUTRAL": 0} for s in signals: votes[s["signal"]] += 1 max_votes = max(votes.values()) final_signal = [k for k, v in votes.items() if v == max_votes][0] confidences = [s["confidence"] for s in signals if s["signal"] == final_signal] final_confidence = np.mean(confidences) if confidences else 0.0 final_score = 0.0 result = { "signal": final_signal, "confidence": float(final_confidence), "score": float(final_score), "active_timeframes": len(signals), "votes": {s["signal"]: sum(1 for x in signals if x["signal"] == s["signal"]) for s in signals}, } # --- PERSIST MULTI-TF SIGNAL TO DATABASE --- try: db = get_database() db.log_prediction( timeframe="MULTI", # Special marker for combined signal signal=final_signal, confidence=float(final_confidence), score=float(final_score), model_details={"active_tf": len(signals), "votes": result["votes"]} ) except Exception as e: logger.warning(f"Failed to persist multi-TF signal: {e}") return result
[docs] def get_congress_history(self, limit: int = 50) -> List[dict]: """Get recent Congress decision history""" return self.congress.get_decision_history(limit)