Source code for core.congress_engine

"""
Cerebrum Forex - Congress Engine
Weighted ensemble aggregation with regime detection.

The Congress Engine combines predictions from multiple specialized models
using adaptive weights based on market regime and timeframe.

Key features:
- Weighted aggregation with adaptive threshold
- Regime detection (trend vs range)
- Full audit trail for explainability
- Dynamic weight adjustment based on model performance
"""

import logging
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple

from core.noble_kpi import calculate_atr, detect_regime, MarketRegime
from core.model_scorer import get_model_scorer

logger = logging.getLogger(__name__)


[docs] class AladdinPersona(Enum): """Aladdin 'Council of Experts' Personas""" QUANT = "quant" # The Mathematician (Microstructure/Timing) ARCHIVIST = "archivist" # The Financial Historian (Technical Indicators) FUTURIST = "futurist" # The Pattern Recognizer (Context/Macro) GUARDIAN = "guardian" # The Risk Manager (Volatility/Regime) LEADER = "leader" # The Ensemble Leader (Stacking) SENTINEL = "sentinel" # The Watcher (CatBoost/Robustness)
[docs] @dataclass class ModelPrediction: """Standardized model output""" model_name: str model_role: AladdinPersona score: float # [-1, +1] directional score confidence: float # [0, 1] prediction confidence regime: str # 'trend' or 'range' timeframe: str timestamp: datetime = field(default_factory=datetime.now)
[docs] @dataclass class CongressDecision: """Final decision with full audit trail""" final_signal: str # BUY, SELL, NEUTRAL final_score: float # [-1, +1] weighted aggregate final_confidence: float # [0, 1] aggregate confidence detected_regime: str # Market regime used certainty_boost: float = 0.0 # Added certainty bonus flux_boost: float = 0.0 # Physics-based acceleration boost trend_certainty: float = 0.0 # TF/5 Trend Certainty [0, 1] duration_tf5: int = 0 # Validity window in minutes (TF/5) threshold: float = 0.0 # Threshold used for this decision # Audit trail model_predictions: List[ModelPrediction] = field(default_factory=list) flux_details: Dict = field(default_factory=dict) # Physics metrics weights_applied: Dict[str, float] = field(default_factory=dict) timestamp: datetime = field(default_factory=datetime.now) override_type: Optional[str] = None # e.g. 'majority_buy'
[docs] def to_dict(self) -> dict: """Convert to dictionary for storage""" return { 'final_signal': self.final_signal, 'final_score': self.final_score, 'final_confidence': self.final_confidence, 'detected_regime': self.detected_regime, 'threshold': self.threshold, 'certainty_boost': self.certainty_boost, 'trend_certainty': self.trend_certainty, 'duration_tf5': self.duration_tf5, 'model_predictions': [ { 'model_name': p.model_name, 'model_role': p.model_role.value if isinstance(p.model_role, AladdinPersona) else str(p.model_role), 'score': p.score, 'confidence': p.confidence, 'regime': p.regime, } for p in self.model_predictions ], 'weights_applied': self.weights_applied, 'timestamp': self.timestamp.isoformat(), 'override_type': self.override_type, 'details': self.flux_details, # Expose details including Veto info }
[docs] class CongressEngine: """ Weighted ensemble aggregation with Aladdin 'Council of Experts'. The Congress Engine combines model predictions using: FinalScore = Σ(wi(regime, TF) × score_i × confidence_i) + CertaintyBoost Weights are adaptive based on: - Market regime (trend vs range) - Aladdin Persona (Expert Specialization) """ # Default weights loaded from config try: from config.settings import CONGRESS_WEIGHTS DEFAULT_WEIGHTS = CONGRESS_WEIGHTS except ImportError: # Aladdin Weighted Mix (LEADER has highest weight, SENTINEL is robust backup) DEFAULT_WEIGHTS = { 'trend': {'quant': 0.15, 'archivist': 0.15, 'futurist': 0.15, 'leader': 0.35, 'sentinel': 0.20}, 'range': {'quant': 0.15, 'archivist': 0.15, 'futurist': 0.20, 'leader': 0.30, 'sentinel': 0.20}, } # Base decision threshold BASE_THRESHOLD = 0.07 # Stricter for "Certainty" # Bayesian Fusion Constants LAMBDA = 1.8 # Higher activation slope for decisive signals KPI_GAMMA = 0.5 # Stronger Noble KPI influence CERTAINTY_BONUS = 0.15 # Bonus if all experts agree
[docs] def __init__(self, custom_weights: Optional[Dict] = None): """ Initialize Congress Engine. Args: custom_weights: Optional custom weight configuration """ self.weights = custom_weights or self.DEFAULT_WEIGHTS self.decision_history: List[CongressDecision] = [] self.model_scorer = get_model_scorer() # Model to role mapping for dynamic weights self.model_role_map = { 'xgboost': 'quant', 'lightgbm': 'archivist', 'randomforest': 'futurist', 'stacking': 'leader', 'catboost': 'sentinel', }
[docs] def detect_market_regime( self, df: pd.DataFrame, adx_threshold: float = 20.0 ) -> str: """ Detect market regime using ADX and volatility. Args: df: DataFrame with OHLC data adx_threshold: ADX threshold for trend detection Returns: 'trend' or 'range' """ if df.empty or len(df) < 20: return 'range' try: # Calculate ADX high = df['high'] low = df['low'] close = df['close'] # True Range tr1 = high - low tr2 = abs(high - close.shift()) tr3 = abs(low - close.shift()) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) # +DM and -DM plus_dm = high.diff() minus_dm = -low.diff() plus_dm[plus_dm < 0] = 0 minus_dm[minus_dm < 0] = 0 plus_dm[(plus_dm < minus_dm)] = 0 minus_dm[(minus_dm < plus_dm)] = 0 # Smoothed values period = 14 atr = tr.rolling(period).mean() plus_di = 100 * (plus_dm.rolling(period).mean() / atr) minus_di = 100 * (minus_dm.rolling(period).mean() / atr) # ADX dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di + 1e-10) adx = dx.rolling(period).mean() current_adx = adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 0 # Also check Noble regime noble_regime = detect_regime(df) # Combine ADX with volatility regime if current_adx > adx_threshold: return 'trend' elif noble_regime in [MarketRegime.HIGH_VOL, MarketRegime.CRISIS]: return 'trend' # High volatility often means trending else: return 'range' except Exception as e: logger.warning(f"Regime detection error: {e}") return 'range'
[docs] def calculate_adaptive_threshold( self, df: pd.DataFrame, timeframe: str ) -> float: """ Calculate adaptive decision threshold based on volatility. Higher volatility = higher threshold (more confident signals only) Args: df: DataFrame with OHLC data timeframe: Current timeframe Returns: Threshold θ in [0.2, 0.5] """ if df.empty or len(df) < 20: return self.BASE_THRESHOLD # Get base threshold from config if available base_threshold = self.BASE_THRESHOLD if hasattr(self, 'congress_config'): base_threshold = self.congress_config.get('thresholds', {}).get('range_base', self.BASE_THRESHOLD) try: atr = calculate_atr(df) current_atr = atr.iloc[-1] avg_atr = atr.mean() if pd.isna(current_atr) or pd.isna(avg_atr) or avg_atr == 0: return base_threshold # Volatility ratio vol_ratio = current_atr / avg_atr # Scale threshold: higher vol = higher threshold threshold = base_threshold * (0.8 + 0.4 * min(vol_ratio, 2.0)) return np.clip(threshold, 0.2, 0.5) except Exception as e: logger.warning(f"Threshold calculation error: {e}") return base_threshold
[docs] def get_weights( self, regime: str, timeframe: str ) -> Dict[str, float]: # Return Dict[str, float] for flexibility, or Dict[AladdinPersona, float] """ Get weights for current regime and timeframe. """ # DYNAMIC RELOAD: Refresh weights from config/DB on every call try: from config.settings import get_congress_weights, get_congress_config self.weights = get_congress_weights() self.congress_config = get_congress_config() # Cache full config for rules except ImportError: pass # Use existing self.weights base_weights_config = self.weights.get(regime, self.weights.get('range', {})) # Map string keys from config to AladdinPersona enums # Default config keys: 'quant', 'archivist', 'futurist' base_weights = {} # Ensure we have defaults if config is missing keys defaults = { AladdinPersona.QUANT: 0.25, AladdinPersona.ARCHIVIST: 0.25, AladdinPersona.FUTURIST: 0.15, AladdinPersona.LEADER: 0.35 } # Config Map: "quant" -> AladdinPersona.QUANT config_map = { "quant": AladdinPersona.QUANT, "archivist": AladdinPersona.ARCHIVIST, "futurist": AladdinPersona.FUTURIST, "guardian": AladdinPersona.GUARDIAN, "leader": AladdinPersona.LEADER } # Load from config or defaults for key, persona in config_map.items(): val = base_weights_config.get(key) if val is not None: base_weights[persona.value] = val # Store as value (str) for easier lookup in convene else: # Fallback to old keys? 'microstructure' etc? pass # If empty (likely due to migration), use hardcoded defaults based on regime if not base_weights: if regime == 'trend': base_weights = {'quant': 0.20, 'archivist': 0.20, 'futurist': 0.15, 'leader': 0.45} else: base_weights = {'quant': 0.20, 'archivist': 0.20, 'futurist': 0.25, 'leader': 0.35} # Timeframe adjustments tf_adjustments = {} # Short timeframes: increase QUANT (Math/Microstructure) weight if timeframe in ['1m', '5m', '15m']: tf_adjustments = { AladdinPersona.QUANT.value: 1.2, AladdinPersona.ARCHIVIST.value: 0.9, AladdinPersona.FUTURIST.value: 0.8, } # Long timeframes: increase FUTURIST (Context/Macro) weight elif timeframe in ['1d', '1u']: # 1u = 1w? Assuming 1w tf_adjustments = { AladdinPersona.QUANT.value: 0.7, AladdinPersona.ARCHIVIST.value: 1.0, AladdinPersona.FUTURIST.value: 1.3, } # Apply adjustments adjusted = {} # Iterate over known personas known_personas = [p.value for p in AladdinPersona] for role in known_personas: # Get base weight (default 0 if not set, but we normalized later) w = base_weights.get(role, 0.0) if w == 0 and role in ['quant', 'archivist', 'futurist']: w = 0.33 # Safety adj = tf_adjustments.get(role, 1.0) adjusted[role] = w * adj # Normalize to sum to 1 total = sum(adjusted.values()) if total > 0: adjusted = {role: w / total for role, w in adjusted.items()} # === DYNAMIC WEIGHT ADJUSTMENT === # Blend static weights with performance-based dynamic weights try: dynamic_weights = self.model_scorer.get_dynamic_weights( adjusted, self.model_role_map ) return dynamic_weights except Exception as e: logger.debug(f"[Congress] Dynamic weights fallback: {e}") return adjusted
[docs] def aggregate( self, predictions: List[ModelPrediction], df: pd.DataFrame, timeframe: str, smc_signals: Optional[Dict[str, bool]] = None, flux_boost: float = 0.0, flux_details: Optional[Dict] = None ) -> CongressDecision: """ Aggregate model predictions into final signal. With new Flux Boost (Physics) integration. """ logger.info(f"─── CONGRESS AGGREGATION: {timeframe} ───") if not predictions: return CongressDecision( final_signal='NEUTRAL', final_score=0.0, final_confidence=0.0, detected_regime='range', threshold=self.BASE_THRESHOLD, ) # Detect regime regime = self.detect_market_regime(df) # Get adaptive threshold threshold = self.calculate_adaptive_threshold(df, timeframe) # Get weights weights = self.get_weights(regime, timeframe) # === ALADDIN FUSION ENGINE (Certainty Upgrade) === # Equation: Psi = tanh( lambda * [ Sum(w*S*C) + gamma*K + UnanimityBonus + FluxBoost] ) # 1. Calculate Core Weighted Signal core_signal = 0.0 weights_sum = 0.0 confidence_sum = 0.0 details = {} weights_applied = {} # Vote tracking for Certainty vote_buy = 0 vote_sell = 0 vote_neutral = 0 for pred in predictions: # Handle string vs Enum role (Robustness) role_key = pred.model_role.value if isinstance(pred.model_role, AladdinPersona) else str(pred.model_role) w = weights.get(role_key, 0.33) # Use confidence as uncertainty proxy (Odds-like weighting) # Higher confidence^2 gives exponential weight to sure models weighted_score = pred.score * (pred.confidence ** 2) core_signal += w * weighted_score weights_sum += w confidence_sum += pred.confidence weights_applied[pred.model_name] = w # Tally Votes for Certainty Check (Strict > 0.1) if pred.score > 0.1: vote_buy += 1 elif pred.score < -0.1: vote_sell += 1 else: vote_neutral += 1 # Normalize core signal if weights_sum > 0: core_signal = core_signal / weights_sum # 2. Integrate Noble KPI Bias (The "K" term - Guardian Input) noble_bias = 0.0 try: if not df.empty and 'kpi_safe_high' in df.columns: last_row = df.iloc[-1] close = last_row['close'] safe_high = last_row['kpi_safe_high'] safe_low = last_row['kpi_safe_low'] if close > safe_high: noble_bias = self.KPI_GAMMA * 1.0 # Breakout Up -> Push UP elif close < safe_low: noble_bias = self.KPI_GAMMA * -1.0 # Breakout Down -> Push DOWN # 3. MAGIC LOGIC: Smart Money Overrides (The "Forbidden" Boost) smc_boost = 0.0 if smc_signals: # Bullish Context: FVG + Bullish OB -> Institutional Entry if smc_signals.get('fvg_bull', False) and smc_signals.get('ob_bull', False): smc_boost += 0.30 # Massive Boost (Golden Entry) # Bearish Context if smc_signals.get('fvg_bear', False) and smc_signals.get('ob_bear', False): smc_boost -= 0.30 # Massive Drop # Liquidity Sweeps if smc_signals.get('sweep_bear', False): smc_boost -= 0.20 if smc_signals.get('sweep_bull', False): smc_boost += 0.20 except Exception: pass # 3b. FLUX BOOST (Physics of Price) # Directly added to the raw input of the tanh function # 4. SENTINEL VETO (CatBoost Safeguard) # If Sentinel strongly disagrees with the core signal, kill it. # This enforces the "No Loss" objective. sentinel_veto = False sentinel_signal = "NEUTRAL" for pred in predictions: key = str(pred.model_role).lower() if "sentinel" in key or pred.model_name == "catboost": # Check for strong disagreement # Logic: If Core is BUY (>0.2) and Sentinel is SELL (score < -0.3) -> VETO if core_signal > 0.2 and pred.score < -0.3: sentinel_veto = True sentinel_signal = "SELL" elif core_signal < -0.2 and pred.score > 0.3: sentinel_veto = True sentinel_signal = "BUY" if sentinel_veto: logger.warning(f"🛡️ SENTINEL VETO APPLIED: Core={core_signal:.2f}, Sentinel={sentinel_signal}") details['veto'] = 'sentinel_disagreement' details['sentinel_signal'] = sentinel_signal # Store what Sentinel saw details['core_signal_val'] = core_signal # Store original core signal value core_signal = 0.0 # Force Neutral # 5. Calculate Certainty Boost (Unanimity) certainty_boost = 0.0 total_votes = len(predictions) if total_votes >= 2: if vote_buy == total_votes: certainty_boost = self.CERTAINTY_BONUS # All Experts Agree BUY details['consensus'] = 'unanimous_buy' elif vote_sell == total_votes: certainty_boost = -self.CERTAINTY_BONUS # All Experts Agree SELL details['consensus'] = 'unanimous_sell' # === MASTER EQUATION === # Psi = tanh( lambda * ( weighted_sum + noble_bias + certainty_boost + flux_boost ) ) import math raw_input = (core_signal + noble_bias + certainty_boost + smc_boost + flux_boost) final_score = math.tanh(self.LAMBDA * raw_input) final_confidence = confidence_sum / len(predictions) if predictions else 0.0 # --- ROI OPTIMIZATION: ADAPTIVE THRESHOLDS --- # Stricter thresholds for "Only Gain" policy # TUNING: Relaxed to 0.30 (from 0.40) to catch big moves while keeping safety adj_threshold = max(0.30, threshold) if regime == 'trend': adj_threshold = max(0.25, threshold - 0.05) elif regime == 'panic': adj_threshold = 0.60 # Extremely Strict # Decision Logic if final_score > adj_threshold: final_signal = 'BUY' elif final_score < -adj_threshold: final_signal = 'SELL' else: final_signal = 'NEUTRAL' # ... (Panic Protocol & Overrides omitted for brevity, logic remains same) # Re-implementing simplified override checks for safety rules = getattr(self, 'congress_config', {}).get('rules', {}) override_active = rules.get('democratic_override', False) if override_active and regime == 'range' and final_signal == 'NEUTRAL': if vote_buy >= 2 and vote_sell == 0 and not sentinel_veto: final_signal = 'BUY' final_score = threshold + 0.02 details['override'] = 'democratic_override_buy' certainty_boost += 0.1 elif vote_sell >= 2 and vote_buy == 0 and not sentinel_veto: final_signal = 'SELL' final_score = -threshold - 0.02 details['override'] = 'democratic_override_sell' certainty_boost -= 0.1 # --- TF/5 TREND CERTAINTY CALCULATION --- # (Existing robust certainty logic here...) # 1. Consensus (How many agree with the final signal) agreeing_votes = vote_buy if final_signal == 'BUY' else vote_sell if final_signal == 'SELL' else vote_neutral consensus_ratio = agreeing_votes / len(predictions) if predictions else 0.0 # 2. Stability & Exhaustion (Trend Health) try: # RSI Analysis (FeatureEngine name: 'rsi' fallback) rsi_val = 50.0 if 'rsi' in df.columns: rsi_val = df['rsi'].iloc[-1] elif 'trend_rsi' in df.columns: rsi_val = df['trend_rsi'].iloc[-1] overbought = max(0, rsi_val - 70) if final_signal == 'BUY' else 0 oversold = max(0, 30 - rsi_val) if final_signal == 'SELL' else 0 exhaustion_penalty = (overbought + oversold) / 30.0 # Volatility atr_val = 0.0010 if 'atr' in df.columns: atr_val = df['atr'].iloc[-1] elif 'vol_atr' in df.columns: atr_val = df['vol_atr'].iloc[-1] close = df['close'].iloc[-1] open_p = df['open'].iloc[-1] current_bar_move = abs(close - open_p) volatility_penalty = min(0.5, current_bar_move / (3 * atr_val)) if atr_val > 0 else 0 # ADX adx_val = 25.0 if 'adx' in df.columns: adx_val = df['adx'].iloc[-1] elif 'trend_adx' in df.columns: adx_val = df['trend_adx'].iloc[-1] stability = min(1.0, adx_val / 50.0) penalty = (0.6 * exhaustion_penalty) + (0.4 * volatility_penalty) except Exception as e: logger.warning(f"Exhaustion analysis failed: {e}") stability = 0.5 penalty = 0.0 # Formula: (Consensus*0.4) + (Confidence*0.3) + (Stability*0.3) - (Penalty*0.2) trend_certainty = (0.4 * consensus_ratio) + (0.3 * final_confidence) + (0.3 * stability) - (0.2 * penalty) if final_signal == 'NEUTRAL': trend_certainty = min(trend_certainty, 0.4) trend_certainty = max(0.0, min(1.0, trend_certainty)) # 3. Dynamic Duration: Boost if very certain tf_minutes = { '1m': 1, '5m': 5, '15m': 15, '30m': 30, '1h': 60, '4h': 240, '1d': 1440, '1w': 10080 }.get(timeframe, 60) if trend_certainty > 0.85: duration_tf5 = int(tf_minutes / 4.0) # TF/4 boost else: duration_tf5 = int(tf_minutes / 5.0) # Standard TF/5 duration_tf5 = max(1, duration_tf5) decision = CongressDecision( final_signal=final_signal, final_score=float(final_score), final_confidence=float(final_confidence), detected_regime=regime, threshold=adj_threshold, certainty_boost=float(certainty_boost), trend_certainty=round(float(trend_certainty), 3), duration_tf5=int(duration_tf5), model_predictions=predictions, weights_applied=weights_applied, override_type=details.get('override') or details.get('consensus') or details.get('veto'), flux_details=details # Pass full details dict ) # Store in history self.decision_history.append(decision) # Keep history manageable if len(self.decision_history) > 1000: self.decision_history = self.decision_history[-500:] logger.info(f"════════════════════════════════════════════════════════════") logger.info(f" ALADDIN DECISION: {final_signal}") logger.info(f" Score: {final_score:.4f} | Conf: {final_confidence:.4f} | Certainty: {certainty_boost:.4f} | Veto: {sentinel_veto}") logger.info(f"════════════════════════════════════════════════════════════") return decision
[docs] def get_decision_history(self, limit: int = 100) -> List[dict]: """Get recent decision history as dicts""" return [d.to_dict() for d in self.decision_history[-limit:]]
[docs] def simple_aggregate( self, predictions: List[Dict], timeframe: str = "1h" ) -> Dict: """ Simple weighted aggregation using config.json weights. This is a simpler interface for the prediction engine that uses model names (xgboost, lightgbm, randomforest, stacking) instead of ModelRole. Args: predictions: List of {model, signal, confidence} timeframe: For timeframe-specific weight profiles Returns: {signal, confidence, score, details} """ from config.settings import get_model_weights, get_signal_thresholds if not predictions: return { "signal": "NEUTRAL", "confidence": 0.0, "score": 0.0, "details": [] } # Get weights from config weights = get_model_weights(timeframe) thresholds = get_signal_thresholds() # Signal mapping signal_map = {"SELL": -1, "NEUTRAL": 0, "BUY": 1} total_score = 0.0 total_weight = 0.0 details = [] for pred in predictions: model = pred.get("model", "unknown") signal_str = pred.get("signal", "NEUTRAL") signal_num = signal_map.get(signal_str, 0) confidence = pred.get("confidence", 0.5) weight = weights.get(model, 0.33) contribution = weight * signal_num * confidence total_score += contribution total_weight += weight details.append({ "model": model, "signal": signal_str, "confidence": confidence, "weight": weight, "contribution": round(contribution, 4) }) # Normalize score if total_weight > 0: normalized_score = total_score / total_weight else: normalized_score = 0.0 # Apply thresholds buy_threshold = thresholds.get("buy", 0.30) sell_threshold = thresholds.get("sell", -0.30) if normalized_score >= buy_threshold: final_signal = "BUY" elif normalized_score <= sell_threshold: final_signal = "SELL" else: final_signal = "NEUTRAL" final_confidence = min(abs(normalized_score), 1.0) logger.info( f"Congress Simple: {final_signal} (score={normalized_score:.3f}, " f"conf={final_confidence:.3f}, tf={timeframe})" ) return { "signal": final_signal, "confidence": final_confidence, "score": round(normalized_score, 4), "details": details, "timeframe": timeframe, "thresholds": {"buy": buy_threshold, "sell": sell_threshold} }
[docs] def make_decision_batch( self, predictions_df: pd.DataFrame, regime_series: pd.Series, atr_series: pd.Series, timeframe: str, noble_bias_series: Optional[np.ndarray] = None ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Vectorized version of make_decision for Backtesting. Args: predictions_df: DataFrame where cols are model names, rows are samples regime_series: Series of 'trend' or 'range' strings atr_series: Series of ATR values for adaptive threshold timeframe: Timeframe string Returns: (signals, scores, confidences, thresholds) as numpy arrays """ n_samples = len(predictions_df) if n_samples == 0: return np.array([]), np.array([]), np.array([]), np.array([]) # 1. Initialize accumulators weighted_scores = np.zeros(n_samples) total_weights = np.zeros(n_samples) # 2. Iterate models (Vectorized per model) for model_name in predictions_df.columns: if model_name not in self.model_role_map: continue persona = self.model_role_map[model_name] raw_scores = predictions_df[model_name].values # Mask for regimes mask_trend = (regime_series.values == 'trend') mask_range = (regime_series.values == 'range') # Get weights for this persona w_trend = self.weights.get('trend', {}).get(persona, 0.25) w_range = self.weights.get('range', {}).get(persona, 0.25) # Apply weights weights = np.zeros(n_samples) weights[mask_trend] = w_trend weights[mask_range] = w_range # Confidence weighting (score^2) confidences = np.abs(raw_scores) dynamic_weights = weights * (confidences ** 2) # Penalize low confidence heavily weighted_scores += raw_scores * dynamic_weights total_weights += dynamic_weights # 3. Final Aggregation final_scores = np.zeros(n_samples) mask_valid = total_weights > 0 final_scores[mask_valid] = weighted_scores[mask_valid] / total_weights[mask_valid] # --- FEATURE PARITY UPGRADE --- # 3b. Apply Noble Bias (if provided) if noble_bias_series is not None: final_scores += noble_bias_series # 3c. Unanimity Boost (Vectorized) # If all distinct models agree on sign, add boost # Exclude sentinel from unanimity causing (it's a vetoer) voting_cols = [c for c in predictions_df.columns if 'sentinel' not in c.lower() and c != 'catboost'] if len(voting_cols) >= 2: votes = predictions_df[voting_cols].values # Check if all > 0 all_buy = np.all(votes > 0, axis=1) # Check if all < 0 all_sell = np.all(votes < 0, axis=1) final_scores[all_buy] += self.CERTAINTY_BONUS final_scores[all_sell] -= self.CERTAINTY_BONUS # 4. Adaptive Thresholds # Calculate volatility ratio (vectorized) avg_atr = atr_series.mean() if avg_atr > 0: vol_ratios = atr_series.values / avg_atr else: vol_ratios = np.ones(n_samples) base_threshold = self.BASE_THRESHOLD if hasattr(self, 'congress_config'): base_threshold = self.congress_config.get('thresholds', {}).get('range_base', self.BASE_THRESHOLD) thresholds = base_threshold * (0.8 + 0.4 * np.clip(vol_ratios, 0, 2.0)) # STRICT MODE: Minimum 0.30 threshold for Backtest to match Live Engine (Tuned down from 0.40) thresholds = np.clip(thresholds, 0.30, 0.60) # 5. Sentinel Veto (Vectorized) # Filter out signals where CatBoost strongly disagrees if 'catboost' in predictions_df.columns: cat_scores = predictions_df['catboost'].values # Veto Logic: Core BUY (>0.2) + CatBoost SELL (<-0.3) # OR Core SELL (<-0.2) + CatBoost BUY (>0.3) veto_buy = (final_scores > 0.2) & (cat_scores < -0.3) veto_sell = (final_scores < -0.2) & (cat_scores > 0.3) veto_mask = veto_buy | veto_sell # Apply Veto (Zero out scores) final_scores[veto_mask] = 0.0 # 6. Signal Generation signals = np.full(n_samples, "NEUTRAL", dtype=object) # Buy Logic mask_buy = final_scores > thresholds signals[mask_buy] = "BUY" # Sell Logic mask_sell = final_scores < -thresholds signals[mask_sell] = "SELL" # --- DEMOCRATIC OVERRIDE (Features Parity) --- # If Range Regime + Neutral + 2 Votes -> Force Trade rules = getattr(self, 'congress_config', {}).get('rules', {}) if rules.get('democratic_override', False): # Identify Range Regime rows is_range = (regime_series.values == 'range') is_neutral = (signals == "NEUTRAL") if len(voting_cols) >= 2: # Count votes > 0 and < 0 vote_counts_buy = np.sum(votes > 0, axis=1) vote_counts_sell = np.sum(votes < 0, axis=1) # Override Buy: Range + Neutral + 2 Buys + 0 Sells + No Veto mask_override_buy = ( is_range & is_neutral & (vote_counts_buy >= 2) & (vote_counts_sell == 0) & (~veto_mask if 'veto_mask' in locals() else True) ) signals[mask_override_buy] = "BUY" # Boost score slightly to reflect override final_scores[mask_override_buy] = thresholds[mask_override_buy] + 0.02 # Override Sell: Range + Neutral + 2 Sells + 0 Buys + No Veto mask_override_sell = ( is_range & is_neutral & (vote_counts_sell >= 2) & (vote_counts_buy == 0) & (~veto_mask if 'veto_mask' in locals() else True) ) signals[mask_override_sell] = "SELL" final_scores[mask_override_sell] = -thresholds[mask_override_sell] - 0.02 # Calculate final confidence final_confidences = np.abs(final_scores) # 7. Certainty Boost (Bonus if strong agreement) # Vectorized boost application # If final score is strong (>0.7), add boost mask_boost = final_confidences > 0.7 final_scores[mask_boost] *= 1.15 # 15% boost # Re-clip to [-1, 1] final_scores = np.clip(final_scores, -1.0, 1.0) final_confidences = np.abs(final_scores) return signals, final_scores, final_confidences, thresholds
[docs] def aggregate_with_regime( self, predictions: List[Dict], df: pd.DataFrame, timeframe: str = "1h" ) -> Dict: """ Weighted aggregation with regime classification. Combines simple_aggregate with RegimeClassifier output. Args: predictions: List of {model, signal, confidence} df: OHLC DataFrame for regime detection timeframe: Current timeframe Returns: Dict with signal, regime, and full audit trail """ from core.regime_classifier import get_regime_classifier # Get base aggregation result = self.simple_aggregate(predictions, timeframe) # Add regime classification try: classifier = get_regime_classifier() regime_result = classifier.classify(df) result["regime"] = regime_result.regime.value result["regime_confidence"] = regime_result.confidence result["regime_details"] = { "adx": regime_result.adx, "atr_ratio": regime_result.atr_ratio, "sma_alignment": regime_result.sma_alignment } logger.info( f"Congress+Regime: {result['signal']} | " f"Regime={regime_result.regime.value} (conf={regime_result.confidence:.2f})" ) except Exception as e: logger.warning(f"Regime classification failed: {e}") result["regime"] = "unknown" result["regime_confidence"] = 0.0 result["regime_details"] = {"error": str(e)} return result
[docs] def full_aggregate( self, predictions: List[Dict], df: pd.DataFrame, timeframe: str = "1h", current_price: Optional[float] = None, safe_low: Optional[float] = None, safe_high: Optional[float] = None ) -> Dict: """ Complete prediction pipeline with all features. Combines: 1. Weighted aggregation (configurable weights) 2. Regime classification 3. Risk filter validation (KPI bounds, volatility, momentum) Args: predictions: List of {model, signal, confidence} df: OHLC DataFrame timeframe: Current timeframe current_price: Current market price (for KPI check) safe_low: KPI Safe Low bound safe_high: KPI Safe High bound Returns: Complete prediction dict with all audit info """ # Step 1: Get aggregation with regime result = self.aggregate_with_regime(predictions, df, timeframe) # === FUSION STEP 2: NOBLE KPI BIAS INTEGRATION === # Psi_Final = tanh( Psi_Technical + Gamma * KPI_Bias ) if current_price is not None and safe_low is not None and safe_high is not None: # Calculate KPI Zone kpi_bias = 0.0 if current_price > safe_high: kpi_bias = -1.0 # Overbought -> Bias Down # Apply Master Equation import math # ALIGNMENT FIX: # Training maps Breakout > SafeHigh as BUY (Trend Following) # Training maps Breakout < SafeLow as SELL (Trend Following) # We must match this logic here. # Previously it was negative (Mean Reversion), which fought the models. if current_price > safe_high: kpi_bias = 1.0 # Breakout Up -> Push UP (BUY) elif current_price < safe_low: kpi_bias = -1.0 # Breakout Down -> Push DOWN (SELL) import math raw_score = result['score'] # This is technically Tanh(Technical) already # We un-tanh it roughly or just add to it (since tanh(a+b) != tanh(a)+b) # Better: Recalculate Psi using the stored details if possible, # or simply apply bias to the final score in a second activation pass (ResNet style) # Psi_Final = tanh( arctanh(score) + Gamma * K ) # Stable approximation: psi_final = math.tanh( result['score'] + (self.KPI_GAMMA * kpi_bias) ) result['score'] = psi_final result['noble_bias_applied'] = kpi_bias # Re-evaluate Signal based on new Score threshold = result.get('threshold', 0.3) new_signal = "NEUTRAL" if psi_final > threshold: new_signal = "BUY" elif psi_final < -threshold: new_signal = "SELL" if new_signal != result['signal']: logger.info(f"Noble KPI Adjusted Signal: {result['signal']} -> {new_signal}") result['signal'] = new_signal # Step 2: Apply risk filter if we have price data if current_price is not None: from core.risk_filter import get_risk_filter try: risk_filter = get_risk_filter() result = risk_filter.apply_to_congress_result( result, current_price=current_price, df=df, safe_low=safe_low, safe_high=safe_high ) except Exception as e: logger.warning(f"Risk filter failed: {e}") result["risk_check"] = {"error": str(e)} # Add timestamp result["timestamp"] = datetime.now().isoformat() logger.info( f"Full Congress: {result['signal']} | " f"Score={result['score']:.3f} | " f"Regime={result.get('regime', 'N/A')} | " f"Risk={'PASS' if result.get('risk_check', {}).get('is_valid', True) else 'FAIL'}" ) return result
[docs] def save_prediction_json( self, result: Dict, symbol: str = "EURUSD", output_dir: Optional[str] = None ) -> str: """ Save prediction result to JSON file. Args: result: Prediction result from full_aggregate symbol: Trading symbol output_dir: Output directory (default: data/signals) Returns: Path to saved file """ import json from pathlib import Path from config.settings import SIGNALS_DIR # Determine output path if output_dir: out_path = Path(output_dir) else: out_path = SIGNALS_DIR out_path.mkdir(parents=True, exist_ok=True) # Create filename with timestamp timeframe = result.get("timeframe", "unknown") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{symbol}_{timeframe}_{timestamp}.json" filepath = out_path / filename # Add metadata output = { "symbol": symbol, "generated_at": datetime.now().isoformat(), "version": "2.0", **result } # Save with open(filepath, 'w') as f: json.dump(output, f, indent=2, default=str) logger.info(f"Saved prediction to {filepath}") # Also save a "latest" file for easy access latest_path = out_path / f"{symbol}_{timeframe}_latest.json" with open(latest_path, 'w') as f: json.dump(output, f, indent=2, default=str) return str(filepath)
# Singleton instance _congress_engine = None
[docs] def get_congress_engine() -> CongressEngine: """Get or create Congress Engine singleton""" global _congress_engine if _congress_engine is None: _congress_engine = CongressEngine() return _congress_engine