Source code for core.app_controller

"""
Cerebrum Forex - Application Controller
Central controller connecting all components.
"""

import logging
import threading
from pathlib import Path
from typing import Optional, Dict, Any

from config.settings import ensure_directories, default_settings, TIMEFRAME_NAMES, DATA_DIR
from database import get_database

logger = logging.getLogger(__name__)

from core.profiler import profile


[docs] class AppController: """Application controller connecting all components""" def __init__(self): # Ensure directories exist ensure_directories() # Lazy init components to speed up startup self._mt5 = None self._training_manager = None self._prediction_engine = None self._scheduler = None self.db = get_database() # State self.is_active = False self.mt5_connected = False self.is_training = False self.is_predicting = False self.is_extracting = False # Cache for UI (avoid blocking reads) self._signal_cache: Dict[str, dict] = {} self._ohlc_status_cache: Dict[str, dict] = {} self._training_status_cache: Dict[str, dict] = {} # Account Info Cache (prevent micro-stutters) self._account_info_cache = None self._last_account_update = 0 self._is_updating_account = False # Scheduler State (Init defaults) self.scheduler_enabled = False self.scheduler_cycle = "4h" self.next_run_time = None # Performance Auto-Detection (First Run) try: self._auto_detect_performance_profile() except: pass # Load from DB (Safe Sync) self._sync_settings_to_memory() # Recalculate scheduler after sync self._calculate_next_run() self.is_training = False self.is_predicting = False self.current_training_source = "SYSTEM" # [AUTO, MANUAL, SYSTEM] # Performance Auto-Detection (First Run) self._auto_detect_performance_profile() @property def mt5(self): # Connections/State if self._mt5 is None: from core.mt5_connector import MT5Connector self._mt5 = MT5Connector(default_settings.symbol) # Register with Sentinel from core.sentinel import get_sentinel get_sentinel(mt5=self._mt5) return self._mt5 @property def training_manager(self): if self._training_manager is None: from core.training_manager import TrainingManager self._training_manager = TrainingManager(self.mt5) # Register with Sentinel from core.sentinel import get_sentinel get_sentinel(tm=self._training_manager) return self._training_manager @property def prediction_engine(self): if self._prediction_engine is None: from core.prediction_engine import PredictionEngine self._prediction_engine = PredictionEngine(self.training_manager, self.mt5) # Register with Sentinel from core.sentinel import get_sentinel get_sentinel(pe=self._prediction_engine) return self._prediction_engine @property def scheduler(self): if self._scheduler is None: from core.scheduler import Scheduler self._scheduler = Scheduler(self.prediction_engine, self.mt5) return self._scheduler
[docs] def start(self): """Start the application - minimal startup, just connect MT5""" self.is_active = True # 0. Migration: Force 2015 default for extraction (User repeated request) try: current_from = self.db.get_config("from_year") if not current_from or current_from == "2020": logger.info("Migrating from_year default: 2020 -> 2015") self.db.set_config("from_year", "2015") except Exception as e: logger.error(f"Migration error: {e}") # Launch EVERYTHING in thread to avoid blocking UI # MT5 initialization can take time, and model loading takes time. def start_background(): try: logger.info("Connecting to MT5 in background...") self.mt5_connected = self.mt5.connect() if self.mt5_connected: logger.info("════════════════════════════════════════════════════════════") logger.info(" MT5 CONNECTION ESTABLISHED") logger.info("════════════════════════════════════════════════════════════") # WARMUP IMPORTS (Background) try: logger.info("Warming up ML modules...") from core.training_manager import TrainingManager from core.prediction_engine import PredictionEngine logger.info("ML modules warmed up.") except Exception as e: logger.warning(f"Module warmup failed: {e}") if not self.mt5_connected: logger.warning("MT5 Connection failed/offline.") if self.prediction_engine: # MANUAL MODE: Engine ready for triggers pass logger.info("Prediction Engine ready (Manual Mode)") # Start Account Info Worker (Keep this AUTO) threading.Thread(target=self._account_info_worker, daemon=True).start() # Start AI Scheduler Worker threading.Thread(target=self._scheduler_worker, daemon=True).start() # SYNC EA CONFIG (Ensure EA has latest settings) if self.mt5_connected: threading.Thread(target=self.sync_ea_config, daemon=True).start() self.log_training_event("SYSTEM", "STARTUP", "Application started") logger.info("════════════════════════════════════════════════════════════") logger.info(" APPLICATION ENGINE READY") logger.info("════════════════════════════════════════════════════════════") except Exception as e: logger.error(f"CRITICAL: Background startup error: {e}") import traceback logger.error(traceback.format_exc()) threading.Thread(target=start_background, daemon=True).start() # NOTE: Extraction and Training still require manual triggers # or specific scheduling logic if not self.mt5_connected: # Try once more with detailed feedback success, message = self.mt5.check_health() self.mt5_connected = success if success: logger.info("MT5 Connection recovered via health check") else: logger.error(f"MT5 Initial connect failed: {message}") logger.info("Application started")
[docs] def stop(self): """Stop the application (deactivate)""" self.is_active = False # Stop all processes if self._mt5: self.mt5.disconnect() if self._training_manager: self.training_manager.stop_scheduled_training() if self._scheduler: self.scheduler.stop() self.mt5_connected = False self.mt5_connected = False logger.info("════════════════════════════════════════════════════════════") logger.info(" APPLICATION SHUTDOWN COMPLETE") logger.info("════════════════════════════════════════════════════════════")
[docs] def extract_all(self, is_update: bool = True): """Extract OHLC data for all timeframes""" def run(): self.is_extracting = True try: self.mt5.extract_all(is_update=is_update) except Exception as e: logger.error(f"Extraction error: {e}") self.is_extracting = False logger.info("═══ END SECTION: SMART EXTRACT ═══") thread = threading.Thread(target=run, daemon=True) thread.start()
[docs] @profile(threshold_ms=100.0) def predict_manual(self, timeframe: str) -> dict: """ Execute a MANUAL prediction for a specific timeframe. No scheduling, no background loop. Direct execution. """ if not self.prediction_engine: logger.error("Prediction Engine not loaded") return {"error": "Engine not loaded"} logger.info("═══ START SECTION: MANUAL PREDICTION ═══") logger.info(f"Context: Timeframe={timeframe}, Request=UserClick") # 1. Ensure Data Freshness (Fast Fetch) try: # OPTIMIZATION: IN-MEMORY PIPELINE # Fetch buffer directly from MT5 (6000 rows for safety) # This avoids reading 100MB+ CSV files from disk. df_buffer = None if self.mt5_connected: df_buffer = self.mt5.get_buffer(timeframe, n=6000) # If buffer is None (offline), prediction_engine will fallback to disk. # Run prediction result = self.prediction_engine.predict_timeframe(timeframe, df=df_buffer) return result except Exception as e: logger.error(f"Manual prediction failed: {e}") return {"error": str(e)}
[docs] def train_all(self, force_global: bool = False, source: str = "MANUAL"): """Train all models for all timeframes""" def run(): self.is_training = True self.current_training_source = source try: self.log_training_event("MANUAL", "START", "Starting all timeframes") self.training_manager.train_all(force_global=force_global) self.log_training_event("MANUAL", "SUCCESS", "Finished training all") except Exception as e: self.log_training_event("MANUAL", "FAILED", str(e)) logger.error(f"Training error: {e}") self.is_training = False logger.info("═══ END SECTION: SMART TRAIN ═══") thread = threading.Thread(target=run, daemon=True) thread.start()
[docs] def predict_all(self): """Generate predictions for all timeframes""" def run(): self.is_predicting = True try: self.prediction_engine.predict_all() # Update cache for tf in TIMEFRAME_NAMES: sig = self.prediction_engine.load_signal(tf) if sig: self._signal_cache[tf] = sig except Exception as e: logger.error(f"Prediction error: {e}") self.is_predicting = False thread = threading.Thread(target=run, daemon=True) thread.start()
[docs] def get_signal(self, timeframe: str) -> Optional[dict]: """Get current signal for a timeframe (from cache)""" return self._signal_cache.get(timeframe)
[docs] def get_combined_signal(self) -> dict: """Get combined signal from all timeframes""" try: return self.prediction_engine.get_combined_signal() except Exception: return {"signal": "NEUTRAL", "confidence": 0, "votes": {}}
[docs] def get_ohlc_status(self, timeframe: str) -> dict: """Get OHLC file status for a timeframe""" return self._ohlc_status_cache.get(timeframe, {"exists": False})
[docs] def get_training_status(self, timeframe: str) -> dict: """Get training status for a timeframe""" return self._training_status_cache.get(timeframe, {"models": {}})
[docs] def get_model(self, timeframe: str, model_type: str): """Get a specific model""" try: return self.training_manager.get_model(timeframe, model_type) except Exception: return None
[docs] def get_settings(self) -> dict: """Get current settings""" import json # Parse nested JSON configs from DB def parse_json_config(key, defaults): raw = self.db.get_config(key) if raw: try: return json.loads(raw.replace("'", '"')) except: pass return defaults return { "mt5_path": self.db.get_config("mt5_path") or "", "symbol": self.db.get_config("symbol") or "EURUSD", # Trading & System "magic_number": int(self.db.get_config("magic_number") or 234000), "max_slippage": int(self.db.get_config("max_slippage") or 3), "max_spread": int(self.db.get_config("max_spread") or 20), "audio_alerts": str(self.db.get_config("audio_alerts")).lower() == "true", "auto_connect": str(self.db.get_config("auto_connect")).lower() == "true", "ohlc_interval": int(self.db.get_config("ohlc_interval") or 12), "from_year": int(self.db.get_config("from_year") or 2015), "history_1m_months": int(self.db.get_config("history_1m_months") or 6), # AI & Signals "min_consensus": int(self.db.get_config("min_consensus") or 2), "confidence_threshold": float(self.db.get_config("confidence_threshold") or 0.65), "validation_ratio": float(self.db.get_config("validation_ratio") or 0.20), "hyper_optimization": self.db.get_config("hyper_optimization") == "True", "incremental_training": self.db.get_config("incremental_training") != "False", # Default True "cpu_threads": int(self.db.get_config("cpu_threads") or 4), "performance_profile": self.db.get_config("performance_profile") or "BALANCED", "financial_guard_policy": self.db.get_config("financial_guard_policy") or "Advisory (Notify Only)", # Nested Configs (Scalper Defaults) "trailing_config": parse_json_config("trailing_config", { "adaptive_enabled": True, "step": 5, "adaptive_start": 20, "adaptive_deep": 200, "adaptive_min": 10 }), "prop_firm_config": parse_json_config("prop_firm_config", { "enabled": True, "max_daily_loss_pct": 5.0, "max_total_drawdown_pct": 10.0, "risk_per_trade_pct": 1.0, "profit_target_pct": 10.0, "news_filter_enabled": True, "neutral_stop_pts": 50 }), }
[docs] @profile(threshold_ms=1.0) def get_cached_account_info(self) -> Optional[dict]: """Get account info from cache (Instant)""" return self._account_info_cache
def _account_info_worker(self): """Dedicated background worker for account info""" import time while self.is_active: try: if self.mt5_connected: info = self.mt5.get_account_info() if info: self._account_info_cache = info self._last_account_update = time.time() # INTELLIGENT THROTTLING: Adaptive sleep based on Profile profile = default_settings.performance_profile sleep_time = 3 # PRO if profile == "ECO": sleep_time = 30 # Slow updates for Low RAM/CPU elif profile == "BALANCED": sleep_time = 10 time.sleep(sleep_time) except Exception as e: logger.error(f"Account worker error: {e}") time.sleep(10) # Check stopping condition more frequently if not self.is_active: break
[docs] def save_settings(self, settings: dict): """Save settings to database and sync to memory""" import json for key, value in settings.items(): # Serialize nested dicts as JSON if isinstance(value, dict): self.db.set_config(key, json.dumps(value)) else: self.db.set_config(key, str(value)) # Update symbol in active connector if key == "symbol" and self._mt5: # MT5Connector uses self.symbol for extraction self._mt5.symbol = str(value) # Immediate sync to in-memory defaults (for TrainingManager/MT5/etc) self._sync_settings_to_memory() logger.info("Settings saved and synchronized.")
[docs] def get_congress_config(self) -> dict: """Get Congress AI configuration""" from config.settings import get_congress_config return get_congress_config()
[docs] def save_congress_config(self, config: dict): """Save Congress AI configuration to JSON""" import json from config.settings import MODEL_CONFIG_PATH, load_model_config try: # Load existing full config to preserve other keys full_config = load_model_config() # Update congress_config section full_config["congress_config"] = config # Also update base weights/thresholds for backward compatibility # (Use Range weights as default since it's the tricky one) weights = config.get("weights", {}).get("range", {}) full_config["model_weights"] = weights thresh = config.get("thresholds", {}) full_config["thresholds"] = { "buy": 0.30, # Keep default or dynamic? "sell": -0.30 } if "range_base" in thresh: base = thresh["range_base"] full_config["thresholds"]["range_buy"] = base full_config["thresholds"]["range_sell"] = -base # Write to file with open(MODEL_CONFIG_PATH, 'w') as f: json.dump(full_config, f, indent=4) logger.info("Congress AI Logic configuration saved.") except Exception as e: logger.error(f"Failed to save Congress config: {e}")
def _sync_settings_to_memory(self): """Synchronize DB settings with global default_settings object""" try: conf = self.db.get_all_config() if not conf: return # MT5/Trading if conf.get("symbol"): default_settings.symbol = conf["symbol"] if conf.get("mt5_path"): default_settings.mt5_path = conf["mt5_path"] try: if conf.get("magic_number"): default_settings.magic_number = int(conf["magic_number"]) except: pass # Data try: if conf.get("from_year"): default_settings.extract_from_year = int(conf["from_year"]) if conf.get("history_1m_months"): default_settings.history_1m_months = int(conf["history_1m_months"]) except: pass # AI try: if conf.get("confidence_threshold"): default_settings.confidence_threshold = float(conf["confidence_threshold"]) if conf.get("min_consensus"): default_settings.min_consensus = int(conf["min_consensus"]) if conf.get("cpu_threads"): default_settings.cpu_threads = int(conf["cpu_threads"]) except: pass if conf.get("performance_profile"): default_settings.performance_profile = conf["performance_profile"] if conf.get("incremental_training"): default_settings.incremental_training = conf["incremental_training"] == "True" if conf.get("hyper_optimization"): default_settings.hyper_optimization = conf["hyper_optimization"] == "True" # Scheduler Sync if conf.get("scheduler_enabled"): self.scheduler_enabled = conf["scheduler_enabled"] == "True" if conf.get("scheduler_cycle"): self.scheduler_cycle = conf["scheduler_cycle"] # Update staleness if needed try: if conf.get("ohlc_interval"): default_settings.ohlc_interval_hours = int(conf["ohlc_interval"]) except: pass logger.info(f"✅ Configuration Synced: profile={default_settings.performance_profile}, scheduler={self.scheduler_enabled}") except Exception as e: logger.error(f"Failed to sync settings: {e}") def _auto_detect_performance_profile(self): """Detect system hardware and set optimal performance profile if not already set""" try: # Check if already set in DB existing = self.db.get_config("performance_profile") if existing: return # User or system already set it import psutil ram_gb = psutil.virtual_memory().total / (1024**3) cpu_cores = psutil.cpu_count(logical=True) profile = "BALANCED" if ram_gb < 7.5: # Allow slight margin for 8GB systems profile = "ECO" elif ram_gb >= 15.5 and cpu_cores >= 8: profile = "PRO" logger.info(f"🚀 HARDWARE AUTO-DISCOVERY: RAM={ram_gb:.1f}GB, Cores={cpu_cores} -> Profile: {profile}") self.db.set_config("performance_profile", profile) # Sync to memory default_settings.performance_profile = profile except Exception as e: logger.warning(f"Hardware detection failed: {e}") # Fallback to current memory value (BALANCED) # ════════════════════════════════════════════════════════════════════ # SCHEDULER LOGIC # ════════════════════════════════════════════════════════════════════
[docs] def update_scheduler_config(self, enabled: bool, cycle: str): """Update scheduler configuration and reset timer""" self.scheduler_enabled = enabled self.scheduler_cycle = cycle self._calculate_next_run() # Save to DB for persistence self.db.set_config("scheduler_enabled", str(enabled)) self.db.set_config("scheduler_cycle", cycle) logger.info(f"Scheduler updated: Enabled={enabled}, Cycle={cycle}, Next Run={self.next_run_time}")
def _calculate_next_run(self): """Calculate next run time based on cycle""" try: from datetime import datetime, timedelta now = datetime.now() if self.scheduler_cycle == "4h": # Round to next 4h block (00, 04, 08, 12, 16, 20) next_hour = (now.hour // 4 + 1) * 4 if next_hour >= 24: self.next_run_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) else: self.next_run_time = now.replace(hour=next_hour, minute=0, second=0, microsecond=0) elif self.scheduler_cycle == "12h": # Round to next 12h block (00, 12) next_hour = (now.hour // 12 + 1) * 12 if next_hour >= 24: self.next_run_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) else: self.next_run_time = now.replace(hour=next_hour, minute=0, second=0, microsecond=0) elif self.scheduler_cycle == "daily": # Next day at 00:00 self.next_run_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) elif self.scheduler_cycle == "weekly": # Next Sunday at 00:00 days_ahead = 6 - now.weekday() # Sunday is 6 if days_ahead <= 0: # If today is Sunday, move to next Sunday days_ahead += 7 next_run = now + timedelta(days=days_ahead) self.next_run_time = next_run.replace(hour=0, minute=0, second=0, microsecond=0) else: # Default fallback 4h self.next_run_time = now + timedelta(hours=4) logger.info(f"DEBUG: _calculate_next_run finished. New next_run_time: {self.next_run_time}") except Exception as e: logger.error(f"DEBUG: _calculate_next_run FALIED: {e}", exc_info=True) def _scheduler_worker(self): """Background worker for AI Scheduler""" import time from datetime import datetime # Init state from DB if possible try: saved_enabled = self.db.get_config("scheduler_enabled") == "True" saved_cycle = self.db.get_config("scheduler_cycle") or "4h" self.scheduler_enabled = saved_enabled self.scheduler_cycle = saved_cycle self._calculate_next_run() except: self.scheduler_enabled = False # Default to FALSE (Manual First) self.scheduler_cycle = "4h" self.next_run_time = None while self.is_active: try: if self.scheduler_enabled and self.next_run_time: now = datetime.now() if now >= self.next_run_time: logger.info(f"⏰ Scheduler Triggered! (Now={now} >= Next={self.next_run_time})") # 1. Update Data if self.mt5_connected: logger.info("DEBUG: Scheduler step 1: Updating OHLC...") self.mt5.extract_all(is_update=True) logger.info("DEBUG: Scheduler step 1: Updating OHLC DONE") # 2. Train Models logger.info("DEBUG: Scheduler step 2: Training Models...") self.train_all(force_global=False, source="AUTO") logger.info("DEBUG: Scheduler step 2: Training Models DONE") # 3. Predict logger.info("DEBUG: Scheduler step 3: Running Predictions...") self.predict_all() logger.info("DEBUG: Scheduler step 3: Running Predictions DONE") # 3. Recalculate next run logger.info("DEBUG: Scheduler step 4: Recalculating next run...") self._calculate_next_run() logger.info(f"Scheduler: Next run set to {self.next_run_time}") time.sleep(10) # Check every 10 seconds except Exception as e: logger.error(f"Scheduler worker CRASHED: {e}", exc_info=True) time.sleep(30)
[docs] def sync_ea_config(self): """Headless synchronization of EA settings to MT5 folder""" try: config_path = DATA_DIR / "ea_config.json" if not config_path.exists(): logger.info("No EA config found to sync.") return import json with open(config_path, 'r') as f: settings = json.load(f) # Target directory detection target_dir = None if self.mt5_connected and self.mt5.data_path: target_dir = Path(self.mt5.data_path) / "MQL5" / "Files" / "Cerebrum" if not target_dir: from config.settings import SIGNALS_DIR target_dir = SIGNALS_DIR.parent target_dir.mkdir(parents=True, exist_ok=True) target_file = target_dir / "ea_config.json" # Flattening logic (Keep in sync with EAManagerWidget) flat = {} o = settings.get("order", {}) flat["fixed_lots"] = o.get("volume", 0.1) flat["atr_mult_sl"] = 1.5 flat["atr_mult_tp"] = 3.0 flat["inp_order_type"] = o.get("type", "MARKET") flat["inp_direction"] = o.get("direction", "AUTO") flat["inp_price"] = o.get("price", 0.0) flat["inp_sl_pts"] = o.get("sl", 0) flat["inp_tp_pts"] = o.get("tp", 20) flat["inp_fixed_sl"] = o.get("fixed_sl", 10) # Default 1 pip flat["inp_trailing"] = o.get("trailing", 10) # Default 1 pip flat["inp_max_spread"] = o.get("max_spread", 20) flat["inp_comment"] = o.get("comment", "Cerebrum Trend Capture") flat["inp_magic"] = o.get("magic", 123456) p = settings.get("position", {}) flat["inp_allow_modif"] = p.get("allow_modif", True) flat["inp_partial_close"] = p.get("partial_close", 0) flat["inp_breakeven"] = p.get("auto_be", 15) # Default 1.5 pips flat["inp_breakeven_lock"] = p.get("be_lock", 10) # Default 1 pip flat["inp_reverse"] = p.get("reverse", False) flat["inp_multi_pos"] = p.get("multi_pos", False) r = settings.get("risk", {}) flat["inp_risk_per_trade"] = r.get("risk_per_trade", 1.0) flat["inp_dynamic_lot"] = r.get("dynamic_lot", False) flat["inp_max_dd"] = r.get("max_dd", 5.0) flat["inp_stop_loss_count"] = r.get("stop_after_loss", 3) flat["inp_equity_protect"] = r.get("equity_protect", 0) flat["inp_max_pos"] = r.get("max_pos", 1) flat["inp_max_vol"] = r.get("max_vol", 10.0) flat["session_filter_enabled"] = r.get("session_filter_enabled", True) flat["session_start_hr"] = 7 flat["session_end_hr"] = 20 with open(target_file, 'w') as f: json.dump(flat, f, indent=4) logger.info(f"EA Auto-Sync Complete: {target_file}") except Exception as e: logger.warning(f"EA Auto-Sync failed: {e}")
[docs] def log_training_event(self, source: str, status: str, details: str): """Log a training event to history file with source [AUTO|MANUAL|SYSTEM]""" try: from config.settings import DATA_DIR import json from datetime import datetime history_file = DATA_DIR / "training_history.json" event = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "source": source, "status": status, "details": details } history = [] if history_file.exists(): try: with open(history_file, 'r') as f: history = json.load(f) except: history = [] history.insert(0, event) # Newest first # Keep last 100 events history = history[:100] with open(history_file, 'w') as f: json.dump(history, f, indent=4) logger.info(f"Training Event logged: [{source}] {status} - {details}") except Exception as e: logger.warning(f"Failed to log training event: {e}")
# (End of file)