"""
Cerebrum Forex - Prediction Engine V2
Generate signals using Congress Engine weighted ensemble.
Key features:
- Weighted aggregation via Congress Engine
- Regime-adaptive weights
- Full audit trail
- Standardized model outputs
"""
import logging
from models.xgboost_model import XGBoostModel
from models.lightgbm_model import LightGBMModel
from models.catboost_model import CatBoostModel
from models.stacking_model import StackingModel
from typing import Optional, Callable, Dict, List, Tuple
from models.meta_model import MetaModel
import threading
import time
import json
from pathlib import Path
from datetime import datetime
import pandas as pd
import numpy as np
from config.settings import TIMEFRAMES, SIGNALS_DIR, TIMEFRAME_NAMES, DATA_DIR
from core.mt5_connector import MT5Connector
from core.feature_engine import FeatureEngine
from core.training_manager import TrainingManager
from core.model_scorer import ModelScorer
from core.noble_kpi import calculate_atr, detect_regime, noble_safe_range
from core.prop_firm_guardian import get_guardian
from core.risk_filter import get_risk_filter
from core.congress_engine import (
CongressEngine, get_congress_engine,
ModelPrediction, AladdinPersona, CongressDecision
)
from core.audit_cache import get_audit_cache
from core.drift_detector import get_drift_detector
from core.regime_detector import RegimeDetector
from config.settings import MODELS_DIR, get_timeframe_by_name
from database import get_database
from core.exceptions import (
DataMissingError, DataStaleError, ModelNotTrainedError,
FeatureCalculationError, InsufficientModelsError, PredictionError
)
from core.debug_manager import get_debug_manager
from core.sentinel import get_sentinel
logger = logging.getLogger(__name__)
[docs]
class PredictionEngine:
"""
Generate predictions using Congress Engine weighted ensemble.
V2 features:
- Congress Engine for weighted aggregation
- Regime-aware weight adjustment
- Full audit trail for explainability
- Drift detection monitoring
"""
def __init__(self, training_manager: TrainingManager, mt5_connector: MT5Connector):
self.training_manager = training_manager
self.mt5 = mt5_connector
self.symbol = mt5_connector.symbol # Required for Guardian checks
self.feature_engine = FeatureEngine()
self.congress = get_congress_engine()
self.drift_detector = get_drift_detector()
# EXPERIMENTAL: Forbidden Logic
from core.smart_money import SmartMoneyEngine
self.smc_engine = SmartMoneyEngine()
# Prediction state
self.is_predicting = False
self._callbacks: List[Callable] = []
# OPTIMIZATION: Feature cache to avoid recalculating when data unchanged
# Key: (timeframe, last_candle_time) -> Value: (X_normalized, feature_cols)
self._feature_cache: dict = {}
self._feature_cache_max_size = 8 # One per timeframe
self._cache_lock = threading.Lock()
# Ensure signals directory exists
SIGNALS_DIR.mkdir(parents=True, exist_ok=True)
[docs]
def add_callback(self, callback: Callable):
"""Add callback for prediction status updates"""
self._callbacks.append(callback)
[docs]
def remove_callback(self, callback: Callable):
"""Remove callback"""
if callback in self._callbacks:
self._callbacks.remove(callback)
def _notify(self, event: str, data: dict):
"""Notify all callbacks"""
for callback in self._callbacks:
try:
callback(event, data)
except Exception as e:
logger.error(f"Callback error: {e}")
[docs]
def get_signal_path(self, timeframe: str) -> Path:
"""Get path to signal CSV file"""
try:
db = get_database()
custom_dir = db.get_config("signals_dir")
if custom_dir:
return Path(custom_dir) / f"signal_{timeframe}.csv"
except Exception:
pass
return SIGNALS_DIR / f"signal_{timeframe}.csv"
def _get_default_lot_size(self) -> float:
"""Read fixed_lots from ea_config.json as fallback"""
try:
config_path = DATA_DIR / "ea_config.json"
if config_path.exists():
with open(config_path, 'r') as f:
config = json.load(f)
# Support both nested and flat structures
if "order" in config:
return float(config["order"].get("volume", 0.1))
return float(config.get("fixed_lots", 0.1))
except Exception as e:
logger.warning(f"Could not read default lot from ea_config.json: {e}")
return 0.1
def _get_model_role(self, model_type: str, timeframe: str) -> AladdinPersona:
"""
Determine model role (Aladdin Persona) based on type.
"""
# Map current models to Aladdin Experts
# XGBoost -> The Quant (Microstructure)
# LightGBM -> The Archivist (Technical)
# RandomForest -> The Futurist (Context)
# Stacking -> The Leader (Ensemble)
role_map = {
'xgboost': AladdinPersona.QUANT,
'lightgbm': AladdinPersona.ARCHIVIST,
'randomforest': AladdinPersona.FUTURIST,
'catboost': AladdinPersona.SENTINEL,
'stacking': AladdinPersona.LEADER,
}
return role_map.get(model_type, AladdinPersona.ARCHIVIST)
def _check_financial_viability(self, current_price: float, signal: str, atr: float, timeframe: str = '1h') -> Tuple[bool, str]:
"""
ALADDIN FINANCIAL GUARD (Expert Tuned):
Ensure trade is viable relative to timeframe-specific expectations.
"""
if signal == 'NEUTRAL':
return True, "Neutral"
# Standard constants (Assuming EURUSD-like 5 decimals)
POINT = 0.00001
# Expert Trader ROI Tuning:
# Scalping (M1/M5) needs lower friction thresholds to function.
if timeframe in ['1m', '5m']:
# Scalping Mode: Fast exits, lower R:R requirement for entry
MIN_PROFIT_PTS = 20 # 2 pips (consistent with user request)
REQ_ATR_MULT = 1.2 # Lowered slightly for high-frequency signals
elif timeframe in ['15m', '30m']:
# Day Trading Mode
MIN_PROFIT_PTS = 40 # 4 pips
REQ_ATR_MULT = 1.8
else:
# Swing Mode (H1+)
MIN_PROFIT_PTS = 80 # 8 pips
REQ_ATR_MULT = 2.2
# Estimate Potential Move
potential_move_pts = (atr * REQ_ATR_MULT) / POINT
# Strict Aladdin Check
if potential_move_pts < MIN_PROFIT_PTS:
return False, f"Insufficient Potential: {potential_move_pts:.0f} pts < {MIN_PROFIT_PTS} pts ({timeframe} req)"
return True, "Viable"
def _is_market_open(self) -> Tuple[bool, str]:
"""
Check if Forex market is currently open.
Forex market hours:
- Open: Sunday 22:00 UTC to Friday 22:00 UTC
- Closed: Friday 22:00 to Sunday 22:00 (Weekend)
- Closed: January 1 (New Year's Day)
- Closed: December 25 (Christmas Day)
Returns:
(is_open, reason) tuple
"""
from datetime import timezone
now_utc = datetime.now(timezone.utc)
weekday = now_utc.weekday() # Monday=0, Sunday=6
hour = now_utc.hour
month = now_utc.month
day = now_utc.day
# Check major Forex holidays
if month == 1 and day == 1:
return False, "New Year's Day"
if month == 12 and day == 25:
return False, "Christmas Day"
# Check weekend closure
if weekday == 4 and hour >= 22: # Friday after 22:00 UTC
return False, "Weekend (Friday close)"
elif weekday == 5: # Saturday
return False, "Weekend (Saturday)"
elif weekday == 6 and hour < 22: # Sunday before 22:00 UTC
return False, "Weekend (Sunday pre-open)"
return True, "Market Open"
[docs]
def get_enabled_models(self) -> List[str]:
"""Get list of enabled model types from settings"""
from config.settings import default_settings
# Ensure we return valid model types that exist in the system
valid_types = ["xgboost", "lightgbm", "randomforest", "catboost", "stacking"]
enabled = getattr(default_settings, "models_enabled", valid_types)
# Helper: Intersection of enabled and valid
return [m for m in enabled if m in valid_types]
[docs]
def predict_timeframe(self, timeframe: str, df: Optional[pd.DataFrame] = None) -> Optional[dict]:
"""
Generate prediction for a single timeframe using Congress Engine.
"""
logger.info(f"════════════════════════════════════════════════════════════")
logger.info(f" PREDICTION START: {timeframe}")
logger.info(f"════════════════════════════════════════════════════════════")
self._notify("prediction_start", {"timeframe": timeframe})
# === MARKET CLOSED CHECK ===
# Skip predictions when Forex market is closed (saves resources)
is_open, reason = self._is_market_open()
if not is_open:
logger.info(f"[{timeframe}] ⏸️ Market closed ({reason}). Skipping prediction.")
self._notify("prediction_skipped", {"timeframe": timeframe, "reason": reason})
return {
"timeframe": timeframe,
"signal": "NEUTRAL",
"confidence": 0.0,
"final_score": 0.0,
"status": "market_closed",
"reason": reason,
"timestamp": datetime.now().isoformat(),
}
try:
# 1. LOAD DATA (OHLC)
self._notify("step_update", {"step": "data", "status": "active"})
t_start = datetime.now()
# If df provided (In-Memory Pipeline), use it. Otherwise load FAST buffer.
if df is None:
# OPTIMIZATION: Use get_buffer(6000) instead of load_ohlc()
# load_ohlc reads the full CSV (100MB+), which is too slow (2s+) for real-time.
# get_buffer fetches recent data from MT5 API (ms).
df = self.mt5.get_buffer(timeframe, n=6000)
# Fallback to disk if API fails but we have data
if df is None or df.empty:
logger.warning(f"[{timeframe}] Buffer empty, falling back to disk load...")
# OPTIMIZATION: Use load_ohlc_buffer(2000) instead of full load
df = self.mt5.load_ohlc_buffer(timeframe, n=2000)
t_data = datetime.now()
if df is None or df.empty:
raise DataMissingError(
f"OHLC Data missing for {timeframe}. Please Extract Data first.",
timeframe=timeframe
)
# 2. APPLY OPTIMIZED ROW LIMITS FOR INFERENCE
self._notify("step_update", {"step": "data", "status": "done", "elapsed": (datetime.now()-t_start).total_seconds()})
self._notify("step_update", {"step": "optimize", "status": "active"})
t_opt = datetime.now()
# CRITICAL OPTIMIZATION:
# We only need the last ~2000 candles to correctly calculate indicators (SMA200, etc).
# Processing 5000+ rows wastes CPU.
inference_limit = 2000
if len(df) > inference_limit:
df = df.tail(inference_limit).reset_index(drop=True)
self._notify("step_update", {"step": "optimize", "status": "done", "elapsed": (datetime.now()-t_opt).total_seconds()})
# --- FRESHNESS CHECK ---
last_time = df['time'].iloc[-1]
# ... (Existing freshness logic) ...
# 3. FRESHNESS CHECK
t_feat_start = datetime.now() # Placeholder
logger.info("─── STEP 1/5: DATA PREPARATION ───")
logger.info(f"Stopwatch: Data Load={(t_data-t_start).total_seconds()*1000:.1f}ms (Rows: {len(df)})")
tf_config = get_timeframe_by_name(timeframe)
data_status = "fresh"
data_age_str = "0m"
if tf_config:
from config.settings import default_settings
# --- SIMPLIFIED TIMEZONE HANDLING ---
# MT5 servers are often in GMT+2/3, local machine may differ.
# Instead of complex heuristics, we use a generous allowed offset.
# This prevents false "Stale" warnings due to timezone differences.
now = datetime.now()
time_diff = now - last_time
age_seconds = abs(time_diff.total_seconds()) # Use abs() for both directions
# Allow up to 2 hours of timezone offset (covers most cases: GMT+0 to GMT+3)
TIMEZONE_OFFSET_TOLERANCE_SECONDS = 2 * 3600 # 2 hours
# If age is suspiciously close to exact hours (within 5 min), assume TZ offset
if age_seconds > 3300: # > 55 minutes
residual = age_seconds % 3600 # Remainder after removing full hours
if residual < 300 or residual > 3300: # Within 5 min of exact hour
# Likely TZ offset, use residual as "true" age
adjusted_seconds = min(residual, 3600 - residual)
else:
adjusted_seconds = age_seconds
else:
adjusted_seconds = age_seconds
minutes_old = int(adjusted_seconds / 60)
# Display string
if minutes_old < 60:
data_age_str = f"{minutes_old}m"
else:
data_age_str = f"{minutes_old // 60}h {minutes_old % 60}m"
max_age_minutes = tf_config.minutes * default_settings.staleness_multiplier
# Use ADJUSTED seconds for the staleness check (more lenient)
is_truly_stale = adjusted_seconds > (max_age_minutes * 60)
if is_truly_stale:
logger.warning(f"[{timeframe}] Data STALE (Age: {data_age_str}). Triggering update...")
self._notify("data_update", {"timeframe": timeframe, "status": "refreshing"})
try:
update_df = self.mt5.extract_ohlc(timeframe, is_update=True)
if update_df is None or update_df.empty:
logger.warning(f"[{timeframe}] Update returned no new data. Proceeding with existing data.")
data_status = "stale_usable"
self._notify("data_update", {"timeframe": timeframe, "status": "stale_warning", "age": data_age_str})
else:
df = self.mt5.load_ohlc_buffer(timeframe, n=2000)
if df is None or df.empty:
raise ValueError(f"CRITICAL: Failed to reload data after update for {timeframe}.")
logger.info(f"[{timeframe}] Data refreshed. New length: {len(df)}")
data_status = "fresh"
last_time = df['time'].iloc[-1]
time_diff = datetime.now() - last_time
minutes_old = int(time_diff.total_seconds() / 60)
if minutes_old < 60: data_age_str = f"{minutes_old}m"
else: data_age_str = f"{minutes_old // 60}h {minutes_old % 60}m"
except Exception as e:
logger.error(f"Auto-update failed: {e}")
data_status = "stale_error"
# --- WARMUP CHECK ---
if len(df) < 500:
raise DataMissingError(
f"Insufficient history ({len(df)} candles). Need > 500.",
timeframe=timeframe
)
# 2. CHECK MODELS (with graceful degradation)
required_models = ['xgboost', 'lightgbm', 'randomforest', 'stacking']
available_models = []
missing_models = []
for m_type in required_models:
m_path = MODELS_DIR / f"{m_type}_{timeframe}.pkl"
if m_path.exists():
available_models.append(m_type)
else:
missing_models.append(m_type)
logger.warning(f"[{timeframe}] Model missing: {m_type}")
# Need at least 2 models for reliable prediction
if len(available_models) < 2:
raise InsufficientModelsError(
f"Only {len(available_models)} model(s) available for {timeframe}. Need at least 2.",
available_models=len(available_models),
required_models=2
)
# 3. FEATURE ENGINEERING (Indicators + Features)
self._notify("feature_calc", {"status": "calculating"})
self._notify("step_update", {"step": "features", "status": "active"})
# Ensure 'spread' column exists
if 'spread' not in df.columns:
df['spread'] = 0
# OPTIMIZATION: Feature Cache Check
last_time = df['time'].iloc[-1]
cache_key = (timeframe, str(last_time))
# --- FEATURE CALCULATION STOPWATCH ---
t_feat_start = datetime.now()
cached = self._feature_cache.get(cache_key)
if cached:
X_final, feature_cols = cached
logger.debug(f"[{timeframe}] CACHE HIT: Reusing pre-computed features")
else:
try:
# Calculate all features
df = self.feature_engine.calculate_all_features(df, timeframe=timeframe)
except Exception as feat_err:
logger.error(f"[{timeframe}] Feature calculation failed in prediction: {feat_err}")
# Index FE-200: Feature Cache recovery
get_sentinel(tm=self.training_manager).repair("FE-200", {"timeframe": timeframe})
raise FeatureCalculationError(
f"Feature calculation failed: {feat_err}",
timeframe=timeframe
)
if df is None or df.empty:
raise FeatureCalculationError(
"Feature calculation returned empty data",
timeframe=timeframe
)
# If we get here, resources are valid. Proceed.
# --- FEATURE ALIGNMENT ---
# Load training schema to align columns EXACTLY
# Priority: 1. Normalizer's fitted columns, 2. JSON schema, 3. All columns (fallback)
feature_cols = []
# First, try to get features from the fitted normalizer
normalizer = self.training_manager.get_normalizer(timeframe)
if normalizer and normalizer.is_fitted and normalizer.quantile_stats:
feature_cols = list(normalizer.quantile_stats.keys())
logger.debug(f"[{timeframe}] Using normalizer features: {len(feature_cols)} features")
# Fallback to JSON schema file
if not feature_cols:
schema_path = MODELS_DIR / f"features_{timeframe}.json"
if schema_path.exists():
try:
with open(schema_path, 'r') as f:
feature_cols = json.load(f)
logger.debug(f"[{timeframe}] Loaded feature schema: {len(feature_cols)} features")
except Exception as e:
logger.warning(f"Failed to load feature schema: {e}")
if not feature_cols:
# Last resort fallback (Risky!)
exclude = ['time', 'open', 'high', 'low', 'close', 'volume']
feature_cols = [c for c in df.columns if c not in exclude]
logger.warning(f"[{timeframe}] No schema found! (Models likely outdated). Please RETRAIN models to fix capabilities. Using {len(feature_cols)} features as fallback.")
# Align DataFrame to schema
# 1. Add missing columns with 0
missing_cols = set(feature_cols) - set(df.columns)
if missing_cols:
logger.warning(f"[{timeframe}] Missing {len(missing_cols)} features (filling 0): {list(missing_cols)[:5]}...")
for c in missing_cols:
df[c] = 0.0
# 2. Select only schema columns in correct order
X_df = df[feature_cols]
# Apply normalization
normalizer = self.training_manager.get_normalizer(timeframe)
if normalizer.is_fitted:
X_normalized = normalizer.transform(X_df, feature_cols)
X_final = X_normalized
else:
logger.warning(f"[{timeframe}] Normalizer NOT FITTED! Using raw features (Models will likely fail)")
X_final = X_df
# Store in cache
with self._cache_lock:
self._feature_cache[cache_key] = (X_final, feature_cols)
# Evict old entries if cache is full
if len(self._feature_cache) > self._feature_cache_max_size:
oldest_key = next(iter(self._feature_cache))
del self._feature_cache[oldest_key]
logger.debug(f"[{timeframe}] CACHE MISS: Computed and stored features")
t_feat_end = datetime.now()
self._notify("step_update", {"step": "features", "status": "done", "elapsed": (t_feat_end - t_feat_start).total_seconds()})
# === DEBUG: FEATURE SNAPSHOT ===
if get_debug_manager().is_enabled("PREDICTION"):
if X_final.shape[1] > 0:
# Use .iloc[-1] for pandas DataFrame to get last row
row_snapshot = X_final.iloc[-1] if hasattr(X_final, 'iloc') else X_final[-1]
get_debug_manager().log("PREDICTION", f"[{timeframe}] Feature Input (Last Row)", row_snapshot)
get_debug_manager().log("PREDICTION", f"[{timeframe}] Feature Cols", feature_cols)
# --- REGIME DETECTION (AI Core 3.0) ---
# Use new GMM Regime Detector
detector = RegimeDetector()
regime = "RANGE" # Default
regime_conf = 0.5
if detector.load(timeframe):
try:
# Detect on recent data
# Predict returns value for whole DF, take last one
pad_idx = detector.predict(df)
last_r_id = pad_idx[-1]
regime = {0: "RANGE", 1: "TREND", 2: "PANIC"}.get(last_r_id, "RANGE")
regime_conf = 0.8 # GMM usually high confidence if fitted
except Exception as e:
logger.error(f"Regime detection failed: {e}")
else:
logger.warning(f"[{timeframe}] Regime Detector not trained. Defaulting to RANGE.")
logger.info("─── STEP 2/5: REGIME DETECTION ───")
logger.info(f"Detected Regime: {regime.upper()} (Confidence: {regime_conf:.2f})")
# --- MAGIC LOGIC: SMART MONEY CALCULATIONS ---
smc_signals = {}
try:
# Calculate ATR for dynamic threshold (V2 Logic)
from core.noble_kpi import calculate_atr
atr_series = calculate_atr(df, period=14)
# Calculate footprints on full DF with ATR filter
fvg_df = self.smc_engine.detect_fair_value_gaps(df, atr=atr_series)
ob_df = self.smc_engine.detect_order_blocks(df)
sweep_df = self.smc_engine.detect_liquidity_sweeps(df)
# Get signals from last closed candle (or second to last if current is forming?)
# We use -1 (latest available data, usually open candle if live, or closed if history)
# Assuming df includes current forming candle, we might want to check if it's closed?
# For now using -1 to align with models.
last_idx = -1
smc_signals = {
'fvg_bull': bool(fvg_df['fvg_bull'].iloc[last_idx]),
'fvg_bear': bool(fvg_df['fvg_bear'].iloc[last_idx]),
'ob_bull': bool(ob_df['ob_bull'].iloc[last_idx]),
'ob_bear': bool(ob_df['ob_bear'].iloc[last_idx]),
'sweep_bull': bool(sweep_df['sweep_bull'].iloc[last_idx]),
'sweep_bear': bool(sweep_df['sweep_bear'].iloc[last_idx])
}
# Log if any magic is detected
if any(smc_signals.values()):
logger.info(f"[{timeframe}] 🔮 SMC FOOTPRINTS DETECTED: {smc_signals}")
except Exception as e:
logger.warning(f"SMC Calculation failed: {e}")
# Get predictions from each model as ModelPrediction objects
logger.info("─── STEP 3/5: MULTI-MODEL INFERENCE ───")
model_signals_legacy = {} # For backward compatibility
model_predictions = [] # List of ModelPrediction objects for Congress
# --- OPTIMIZATION: PRE-WARM MODELS (Avoid Lock Contention) ---
# We load all models sequentially first so that the parallel threads
# don't fight over the TrainingManager LRU lock.
active_models = []
model_types = self.get_enabled_models()
for mt in model_types:
m = self.training_manager.get_model(timeframe, mt)
if m:
if not m.is_trained: m.load() # Force load if needed
if m.is_trained:
active_models.append(mt)
# OPTIMIZATION: Parallel model inference using ThreadPoolExecutor
# Now fully non-blocking since models are pre-loaded in memory
from concurrent.futures import ThreadPoolExecutor, as_completed
def _run_model_prediction(model_type: str):
"""Run single model prediction - thread-safe for inference."""
try:
# Model is GUARANTEED to be loaded by pre-warming step above
# But we use get_model to retrieve the reference safely
# (Lock duration is minimal since it just returns existing obj)
model = self.training_manager.get_model(timeframe, model_type)
if model is None or not model.is_trained:
return None
# Get standardized prediction (all models use last row)
input_data = X_final.iloc[-1:]
std_pred = model.predict_standard(input_data, regime=regime)
# === DEBUG: MODEL PROBE ===
if get_debug_manager().is_enabled("PREDICTION"):
get_debug_manager().log("PREDICTION", f"[{timeframe}] {model_type.upper()} Raw", {
"Score": std_pred.score,
"Conf": std_pred.confidence,
"Regime": regime
})
# Create ModelPrediction for Congress
model_pred = ModelPrediction(
model_name=model_type,
model_role=self._get_model_role(model_type, timeframe),
score=std_pred.score,
confidence=std_pred.confidence,
regime=regime,
timeframe=timeframe,
)
return (model_type, model_pred, std_pred)
except Exception as e:
logger.warning(f"[{timeframe}] Model {model_type} prediction failed (graceful skip): {e}")
return None
# Run all 5 models in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(_run_model_prediction, mt): mt
for mt in ['xgboost', 'lightgbm', 'randomforest', 'catboost', 'stacking']}
for future in as_completed(futures):
result = future.result()
if result:
model_type, model_pred, std_pred = result
model_predictions.append(model_pred)
model_signals_legacy[model_type] = {
"signal": std_pred.signal,
"confidence": std_pred.confidence
}
# Notify specific model done
self._notify("step_update", {"step": model_type, "status": "done", "elapsed": 0.5}) # Elapsed is tricky in parallel
self._notify("model_prediction", {
"timeframe": timeframe,
"model_type": model_type,
"signal": std_pred.signal,
"confidence": std_pred.confidence,
"score": std_pred.score,
})
logger.info(f"[{timeframe}] {model_type} -> Signal={std_pred.signal}, Score={std_pred.score:.4f}, Probs={std_pred.probabilities}")
# 4. CONGRESS CONSENSUS (With Continuum Flux)
logger.info("─── STEP 4/5: CONGRESS CONSENSUS ───")
self._notify("step_update", {"step": "congress", "status": "active"})
t_cong = datetime.now()
# --- CONTINUUM FLUX PHYSICS V2 ---
flux_boost = 0.0
flux_details = {}
try:
from core.continuum_engine import get_continuum_engine
cont_engine = get_continuum_engine(self.mt5)
# Pass primary_df for resistance context (optional future expansion)
f_boost, f_details = cont_engine.analyze_flux_breakout(timeframe, primary_df=df)
flux_boost = f_boost
flux_details = f_details
if flux_boost != 0:
self._notify("flux_alert", {
"timeframe": timeframe,
"boost": flux_boost,
"type": flux_details.get('type', 'kinetic_action')
})
except Exception as e:
logger.warning(f"Continuum Flux failed: {e}")
logger.info(f"[{timeframe}] ⚛️ FLUX INPUT: {flux_boost} (Type: {flux_details.get('type','none')})")
# Defaults for Prop Firm / Result construction
suggested_lots = self._get_default_lot_size()
guardian_alert = ""
if model_predictions:
decision = self.congress.aggregate(
predictions=model_predictions,
df=df,
timeframe=timeframe,
smc_signals=smc_signals,
flux_boost=flux_boost,
flux_details=flux_details
)
final_signal = decision.final_signal
final_confidence = decision.final_confidence
final_score = decision.final_score
# --- TECHNICAL VALIDATION (RiskFilter) ---
if final_signal != "NEUTRAL":
try:
# 1. Calc Noble Safe Range (if not already done)
# Need ATR and Mean. Use current_atr if avail or approx.
last_row_atr = df.iloc[-1]
current_close = float(last_row_atr['close'])
current_atr = float(last_row_atr.get('atr', 0.0010))
# Calculate Safe Range (Standard Deviation Bounds)
safe_low, safe_high = noble_safe_range(
current_close, current_atr,
timeframe,
regime={ "RANGE":0, "TREND":1, "PANIC":2 }.get(regime, 0)
)
# 2. Apply Risk Filter
risk_filter = get_risk_filter()
# Convert Decision to Dict for processing
congress_dict = {
"signal": final_signal,
"confidence": final_confidence,
"score": final_score
}
congress_dict = risk_filter.apply_to_congress_result(
congress_dict, current_close, df, safe_low, safe_high
)
# 3. Update Decision with Risk Adjustments
final_signal = congress_dict["signal"]
final_confidence = congress_dict["confidence"]
# Add Note if risk adjusted
if "risk_check" in congress_dict:
chk = congress_dict["risk_check"]
if not chk["is_valid"]:
logger.warning(f"[{timeframe}] 🛡️ RiskFilter Adjusted: {chk['original_signal']} -> {final_signal} ({chk['reason']})")
if decision: decision.override_type = f"risk_filter_{chk['reason'][:10]}"
except Exception as e:
logger.error(f"RiskFilter error: {e}")
# --- ALADDIN FINANCIAL GUARD (+1 USD Protection) ---
if final_signal != "NEUTRAL":
try:
# Extract metrics
last_row_atr = df.iloc[-1]
current_close = float(last_row_atr['close'])
current_atr = float(last_row_atr.get('atr', 0.0010))
# === PROP FIRM GUARDIAN CHECK ===
guardian = get_guardian()
guardian_alert = ""
suggested_lots = 0.01
if guardian.enabled and self.mt5:
# 1. Fetch Equity (Real-time)
try:
acc_info = self.mt5.get_account_info() # Assuming this method exists or similar
if acc_info:
equity = acc_info.get('equity', 0.0)
# 2. Check Allowed
is_allowed, g_reason = guardian.check_trade_allowed(equity)
if not is_allowed:
logger.error(f"⛔ PROP FIRM BLOCK: {g_reason}")
return {
"signal": "NEUTRAL",
"confidence": 0.0,
"reason": f"PROP_BLOCK: {g_reason}",
"timeframe": timeframe
}
# 3. Calculate Risk Lot Size
# Estimate Safe Range width as Stop Loss substitute if standard SL not avail
# Or use ATR * 1.5 (Scalp) / 2.5 (Swing)
sl_pips = (current_atr * 1.5 * 10000) if timeframe in ['1m','5m'] else (current_atr * 2.5 * 10000)
suggested_lots = guardian.calculate_lot_size(equity, sl_pips, self.symbol, default_lot=self._get_default_lot_size())
guardian_alert = f"Risk Lot: {suggested_lots} ({g_reason})"
except Exception as e:
logger.warning(f"Guardian check warning: {e}")
# === ALADDIN FINANCIAL GUARD ===
is_viable, reason = self._check_financial_viability(
current_close, final_signal, current_atr, timeframe=timeframe
)
if not is_viable:
# CRITICAL: Check Financial Guard Policy from DB
db = get_database()
guard_policy = db.get_config("financial_guard_policy", "Advisory (Notify Only)")
is_strict = "Strict" in guard_policy
if is_strict:
logger.warning(f"[ALADDIN GUARD] BLOCKED: Signal {final_signal} cancelled (Profit < $1). Policy=STRICT.")
final_signal = "NEUTRAL"
decision.override_type = "aladdin_strict_block"
else:
logger.info(f"[ALADDIN GUARD] Advisory: Signal {final_signal} has Low Potential ({reason}). Proceeding anyway (Policy=Advisory).")
decision.override_type = "aladdin_low_vol"
except Exception as e:
logger.warning(f"Aladdin Guard Check Failed: {e}")
# --- DEBUG LOGGING FOR USER ---
logger.info(f"[DECISION] {timeframe} | Signal={final_signal} | Score={final_score:.4f} | Conf={final_confidence:.4f}")
logger.info(f" Breakdown: {decision.model_predictions}")
# --- PHASE 3: META-LABELING GUARD ---
try:
meta_guard = MetaModel(timeframe, MODELS_DIR)
if meta_guard.load():
# We need: Signal, Conf, Regime, Volatility, Hour
# Volatility calc (approx from last 10 candles if possible, or just use ATR)
# We have 'current_atr' from Aladdin check above.
vol_est = df['close'].pct_change().tail(10).std() if len(df) > 10 else 0.001
hour_now = datetime.now().hour
viability = meta_guard.predict_viability(
signal=final_signal,
conf=final_confidence,
regime={ "RANGE":0, "TREND":1, "PANIC":2 }.get(regime, 0),
volatility=vol_est,
hour=hour_now
)
logger.info(f"[{timeframe}] 🛡️ Meta-Guard Viability: {viability:.2%}")
if viability < 0.55: # Strict filter
logger.warning(f"[{timeframe}] 🛡️ META-VETO: Signal {final_signal} rejected (Low Viability {viability:.2%})")
final_signal = "NEUTRAL"
final_score = 0.0
# Add override note to decision if possible
if decision: decision.override_type = "meta_veto"
else:
# Meta model not ready yet (allow signal)
pass
except Exception as e:
logger.error(f"Meta-Guard failed: {e}")
# ------------------------------
else:
# Fallback to NEUTRAL if no models available
final_signal = "NEUTRAL"
final_confidence = 0.0
final_score = 0.0
decision = None
logger.info("─── STEP 5/5: SIGNAL FINALIZATION ───")
# Stopwatch logging for production diagnostics
self._notify("step_update", {"step": "congress", "status": "done", "elapsed": (datetime.now()-t_cong).total_seconds()})
t_model_end = datetime.now()
feat_ms = (t_feat_end - t_feat_start).total_seconds() * 1000
model_ms = (t_model_end - t_feat_end).total_seconds() * 1000
total_ms = (t_model_end - t_start).total_seconds() * 1000
logger.info(f"STOPWATCH [{timeframe}]: Features={feat_ms:.1f}ms, Models={model_ms:.1f}ms, TOTAL={total_ms:.1f}ms")
# Create result with full details
timestamp = datetime.now()
# Extract model details from congress decision
model_details = []
if decision and decision.model_predictions:
for pred in decision.model_predictions:
signal = "BUY" if pred.score > 0.3 else ("SELL" if pred.score < -0.3 else "NEUTRAL")
model_details.append({
"model": pred.model_name,
"signal": signal,
"confidence": round(pred.confidence, 4),
"score": round(pred.score, 4),
"weight": decision.weights_applied.get(pred.model_name, 0.33),
"contribution": round(pred.score * pred.confidence * decision.weights_applied.get(pred.model_name, 0.33), 4)
})
# Extract critical features for EA (ATR, etc)
ea_features = {}
if not df.empty:
last_row = df.iloc[-1]
for feat in ['close', 'atr', 'rsi', 'volume']:
if feat in df.columns:
try:
ea_features[feat] = float(last_row[feat])
except:
ea_features[feat] = 0.0
else:
ea_features[feat] = 0.0
# 8. PREPARE FINAL RESULT DICT
# Initialize strictly to ensure validity
final_result = {
"timestamp": timestamp,
"asset": "EURUSD",
"timeframe": timeframe,
"signal": final_signal,
"confidence": final_confidence,
"score": final_score,
"trend_certainty": decision.trend_certainty if decision else 0.0,
"duration_tf5": decision.duration_tf5 if decision else 0,
"regime": regime,
"regime_confidence": decision.final_confidence if decision else 0,
"model_signals": model_signals_legacy,
"model_details": model_details,
"congress_decision": decision.to_dict() if decision else None,
"features": ea_features,
"data_status": data_status,
"data_age": data_age_str,
"suggested_lots": suggested_lots, # Prop Firm Risk Lot
"guardian_alert": guardian_alert, # Safety message
}
# 9. SAVE SIGNAL (JSON & DB)
# This returns the path which we add to the result
saved_path = self._save_signal(final_result)
# Finalize
final_result["file_path"] = str(saved_path)
# Track for drift detection
self.drift_detector.add_prediction({
'signal': final_signal,
'confidence': final_confidence,
})
# AUDIT CACHE
try:
get_audit_cache().log_prediction_event(final_result)
except: pass
self._notify("prediction_complete", {
"timeframe": timeframe,
"signal": final_signal,
"confidence": final_confidence,
"score": final_score,
"regime": regime,
})
logger.info(
f"Prediction {timeframe}: {final_signal} "
f"(score={final_score:.3f}, conf={final_confidence:.2%}, regime={regime})"
)
return final_result
except PredictionError as pe:
# Handle custom prediction errors with structured response
error_type = type(pe).__name__
is_recoverable = getattr(pe, 'recoverable', False)
logger.warning(f"[{timeframe}] {error_type}: {pe}")
self._notify("prediction_error", {
"timeframe": timeframe,
"error": str(pe),
"error_type": error_type,
"recoverable": is_recoverable,
})
# Return partial error result instead of None
return {
"timestamp": datetime.now(),
"asset": "EURUSD",
"timeframe": timeframe,
"signal": "ERROR",
"confidence": 0.0,
"score": 0.0,
"error": str(pe),
"error_type": error_type,
"recoverable": is_recoverable,
}
except Exception as e:
# Unexpected errors
logger.error(f"[{timeframe}] Unexpected prediction error: {e}", exc_info=True)
# Index PR-400: Prediction Engine Recovery
get_sentinel(tm=self.training_manager).repair("PR-400", {"timeframe": timeframe})
self._notify("prediction_error", {
"timeframe": timeframe,
"error": str(e),
"error_type": "UnexpectedError",
"recoverable": False,
})
return None
def _save_signal(self, result: dict):
"""Save signal to JSON and Database"""
# === JSON (new format with full details) ===
from config.settings import SIGNALS_DIR
# Extract features if available in result, otherwise try to get from somewhere?
# Ideally 'result' should have it.
# Format timestamp for MT5 (YYYY.MM.DD HH:MM:SS) - Remove 'T'
ts_str = result["timestamp"].strftime("%Y.%m.%d %H:%M:%S") if hasattr(result["timestamp"], "strftime") else str(result["timestamp"]).replace("T", " ").split(".")[0]
gen_str = datetime.now().strftime("%Y.%m.%d %H:%M:%S")
# CRITICAL: Order matters for MQL5 naive parser.
# Put simple keys FIRST. Complex lists/dicts LAST.
json_output = {
"signal": result["signal"],
"confidence": result["confidence"],
"timestamp": ts_str,
"timestamp": ts_str,
"timestamp_unix": int(result["timestamp"].replace(tzinfo=None).timestamp()), # Ensure naive -> local timestamp conversion is standard, or better:
# ACTUALLY, to match TimeGMT() in EA, we need to send UTC timestamp.
# If result['timestamp'] is local (likely), .timestamp() gives local epoch.
# TimeGMT() in MQL returns "seconds since 1970 UTC".
# Python .timestamp() returns "seconds since 1970 UTC" (if object is naive, it assumes it represents local time).
# So they SHOULD match if PC and Server are correct.
# But let's be explicit:
"timestamp_unix": int(datetime.now().timestamp()), # Always use current machine time as simplified 'now' reference? No, signal time.
# Best Practice: Use explicit UTC logic if possible, but for now stick to simple .timestamp() which yields UTC-equivalent for local time.
"timestamp_unix": int(result["timestamp"].timestamp()),
"generated_at": gen_str,
"symbol": result.get("asset", "EURUSD"),
"timeframe": result["timeframe"],
"score": result.get("score", 0),
"status": "ACTIVE",
"risk_check": result.get("risk_check", {"is_valid": True}),
"features": result.get("features", {}),
"regime": result.get("regime", "unknown"),
"regime_confidence": result.get("regime_confidence", 0),
"suggested_lots": result.get("suggested_lots", 0.1),
"guardian_alert": result.get("guardian_alert", ""),
"version": "2.1.1",
# Complex objects LAST to avoid 'StringFind' hitting them first
"model_details": result.get("model_details", []),
"regime_details": result.get("regime_details", {}),
"congress_decision": result.get("congress_decision", {}),
"override_type": (result.get("congress_decision") or {}).get("override_type"),
}
# Save ONLY latest version (History is in DB)
tf = result["timeframe"]
symbol = result.get("asset", "EURUSD")
# Determine output directory (Dynamic from Settings)
output_dir = SIGNALS_DIR
try:
db = get_database()
custom_dir = db.get_config("signals_dir")
if custom_dir:
output_dir = Path(custom_dir)
else:
# FALLBACK: Try to get path dynamically from MT5 (Instance Specific)
try:
# We need to access the mt5 connector directly or via AppController logic
# Since self.mt5 is available:
if self.mt5 and self.mt5.connected:
term_info = self.mt5.terminal_info()
if term_info and term_info.data_path:
dynamic_path = Path(term_info.data_path) / "MQL5" / "Files" / "Cerebrum" / "Signals"
output_dir = dynamic_path
logger.info(f"Using dynamic instance path: {output_dir}")
else:
# If we can't get terminal info, fallback to configured
output_dir = SIGNALS_DIR
else:
output_dir = SIGNALS_DIR
except Exception as e:
logger.warning(f"Could not resolve dynamic MT5 path: {e}")
output_dir = SIGNALS_DIR
except Exception as e:
logger.error(f"Failed to get custom signals_dir from DB: {e}")
output_dir = SIGNALS_DIR
# Ensure directory exists
try:
output_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.error(f"Failed to create signal directory {output_dir}: {e}")
# Fallback to local data dir if system permission error
output_dir = SIGNALS_DIR
output_dir.mkdir(parents=True, exist_ok=True)
latest_path = output_dir / f"{symbol}_{tf}_latest.json"
logger.info(f"[IO] Saving SIGNAL to: {latest_path.absolute()}")
logger.info(f"Signal File Path = {latest_path.absolute()}")
# Retry logic for robust writing (handle MT5 locking)
max_retries = 3
for attempt in range(max_retries):
try:
with open(latest_path, 'w') as f:
json.dump(json_output, f, indent=4)
logger.info(f"✅ JSON signal successfully saved to {latest_path}")
break
except Exception as e:
if attempt < max_retries - 1:
time.sleep(0.1)
else:
logger.error(f"❌ Failed to save signal JSON to {latest_path} after retries: {e}")
try:
self._save_to_db(json_output, model_details=result["model_details"])
except Exception as e:
logger.error(f"Failed to save to DB: {e}")
return latest_path
def _save_to_db(self, data, model_details):
"""Save prediction result to SQLite database"""
db = get_database() # Singleton
# Convert model_details list to legacy dict format for DB
# DB expects: {'xgboost': (sig, conf), 'lightgbm': ...}
model_signals_legacy = {}
if model_details:
for d in model_details:
model = d.get('model', '').lower()
if model in ['xgboost', 'lightgbm', 'stacking']:
# Handle if we already have a dict or the legacy tuple
val = d.get('signal', 'NEUTRAL')
conf = d.get('confidence', 0.0)
model_signals_legacy[model] = (val, conf)
# Parse timestamp safely
ts = data.get("timestamp")
if isinstance(ts, str):
try:
ts = datetime.strptime(ts, "%Y.%m.%d %H:%M:%S")
except:
ts = datetime.now()
# Call legacy method (until DB schema is updated)
db.add_signal(
timestamp=ts,
timeframe=data.get("timeframe"),
signal=data.get("signal"),
confidence=data.get("confidence", 0.0),
model_signals=model_signals_legacy
)
[docs]
def load_signal(self, timeframe: str) -> Optional[dict]:
"""Load current signal from CSV file"""
path = self.get_signal_path(timeframe)
if not path.exists():
return None
try:
df = pd.read_csv(path, sep=";")
if df.empty:
return None
row = df.iloc[0]
return {
"timestamp": row["timestamp"],
"asset": row["asset"],
"timeframe": row["timeframe"],
"signal": row["signal"],
"confidence": float(row["confidence"]),
"score": float(row.get("score", 0)),
"regime": row.get("regime", "unknown"),
}
except Exception as e:
logger.error(f"Failed to load signal for {timeframe}: {e}")
return None
[docs]
def reset_engine(self):
"""
[PR-400] Reset the prediction engine state.
Flushes all caches and buffers.
"""
logger.warning("[PredictionEngine] Engine reset triggered. Flushing all caches...")
try:
with self._cache_lock:
self._feature_cache.clear()
return True
except Exception as e:
logger.error(f"Engine reset failed: {e}")
return False
[docs]
def predict_all(self) -> Dict[str, dict]:
"""Generate predictions for all timeframes in parallel (Turbo Prediction)"""
self.is_predicting = True
results = {}
from concurrent.futures import ThreadPoolExecutor, as_completed
logger.info(f"🚀 Launching Turbo Prediction for {len(TIMEFRAME_NAMES)} timeframes...")
# Parallel execution: limit workers to 4 to balance CPU load
with ThreadPoolExecutor(max_workers=4) as executor:
future_to_tf = {executor.submit(self.predict_timeframe, tf): tf for tf in TIMEFRAME_NAMES}
for future in as_completed(future_to_tf):
tf = future_to_tf[future]
try:
res = future.result()
if res:
results[tf] = res
except Exception as e:
logger.error(f"Parallel prediction failed for {tf}: {e}")
results[tf] = {"error": str(e), "signal": "ERROR"}
self.is_predicting = False
return results
[docs]
def get_prediction_status(self, timeframe: str) -> dict:
"""Get prediction status for a timeframe"""
signal = self.load_signal(timeframe)
return {
"timeframe": timeframe,
"has_signal": signal is not None,
"signal": signal,
"is_predicting": self.is_predicting,
}
[docs]
def get_combined_signal(self) -> dict:
"""
Get overall signal combining all timeframes using Congress Engine.
This aggregates signals across timeframes with timeframe-aware weighting.
"""
signals = []
model_predictions: List[ModelPrediction] = []
for tf in TIMEFRAME_NAMES:
signal = self.load_signal(tf)
if signal:
signals.append(signal)
# Create pseudo-ModelPrediction for cross-timeframe Congress
model_predictions.append(ModelPrediction(
model_name=f"ensemble_{tf}",
model_role=ModelRole.TECHNICAL, # All TF ensembles treated equally
score=signal.get('score', 0),
confidence=signal.get('confidence', 0),
regime=signal.get('regime', 'range'),
timeframe=tf,
))
if not signals:
return {
"signal": "NEUTRAL",
"confidence": 0.0,
"score": 0.0,
"active_timeframes": 0,
}
# Use Congress for weighted aggregation if we have predictions
if model_predictions:
# Get empty df for regime (use most recent signal's regime)
regime = signals[-1].get('regime', 'range')
# Weight by timeframe (longer TF = more weight)
tf_weights = {
'1m': 0.5, '5m': 0.6, '15m': 0.7, '30m': 0.8,
'1h': 1.0, '4h': 1.2, '1d': 1.5, '1w': 1.8
}
weighted_sum = 0.0
weight_total = 0.0
for pred in model_predictions:
w = tf_weights.get(pred.timeframe, 1.0)
weighted_sum += w * pred.score * pred.confidence
weight_total += w * pred.confidence
final_score = weighted_sum / (weight_total + 1e-8)
final_confidence = weight_total / len(model_predictions)
# Determine signal
threshold = 0.3
if final_score > threshold:
final_signal = "BUY"
elif final_score < -threshold:
final_signal = "SELL"
else:
final_signal = "NEUTRAL"
else:
# Fallback to simple voting
votes = {"BUY": 0, "SELL": 0, "NEUTRAL": 0}
for s in signals:
votes[s["signal"]] += 1
max_votes = max(votes.values())
final_signal = [k for k, v in votes.items() if v == max_votes][0]
confidences = [s["confidence"] for s in signals if s["signal"] == final_signal]
final_confidence = np.mean(confidences) if confidences else 0.0
final_score = 0.0
result = {
"signal": final_signal,
"confidence": float(final_confidence),
"score": float(final_score),
"active_timeframes": len(signals),
"votes": {s["signal"]: sum(1 for x in signals if x["signal"] == s["signal"]) for s in signals},
}
# --- PERSIST MULTI-TF SIGNAL TO DATABASE ---
try:
db = get_database()
db.log_prediction(
timeframe="MULTI", # Special marker for combined signal
signal=final_signal,
confidence=float(final_confidence),
score=float(final_score),
model_details={"active_tf": len(signals), "votes": result["votes"]}
)
except Exception as e:
logger.warning(f"Failed to persist multi-TF signal: {e}")
return result
[docs]
def get_congress_history(self, limit: int = 50) -> List[dict]:
"""Get recent Congress decision history"""
return self.congress.get_decision_history(limit)