"""
Cerebrum Forex - Application Controller
Central controller connecting all components.
"""
import logging
import threading
from pathlib import Path
from typing import Optional, Dict, Any
from config.settings import ensure_directories, default_settings, TIMEFRAME_NAMES, DATA_DIR
from database import get_database
logger = logging.getLogger(__name__)
from core.profiler import profile
[docs]
class AppController:
"""Application controller connecting all components"""
def __init__(self):
# Ensure directories exist
ensure_directories()
# Lazy init components to speed up startup
self._mt5 = None
self._training_manager = None
self._prediction_engine = None
self._scheduler = None
self.db = get_database()
# State
self.is_active = False
self.mt5_connected = False
self.is_training = False
self.is_predicting = False
self.is_extracting = False
# Cache for UI (avoid blocking reads)
self._signal_cache: Dict[str, dict] = {}
self._ohlc_status_cache: Dict[str, dict] = {}
self._training_status_cache: Dict[str, dict] = {}
# Account Info Cache (prevent micro-stutters)
self._account_info_cache = None
self._last_account_update = 0
self._is_updating_account = False
# Scheduler State (Init defaults)
self.scheduler_enabled = False
self.scheduler_cycle = "4h"
self.next_run_time = None
# Performance Auto-Detection (First Run)
try:
self._auto_detect_performance_profile()
except:
pass
# Load from DB (Safe Sync)
self._sync_settings_to_memory()
# Recalculate scheduler after sync
self._calculate_next_run()
self.is_training = False
self.is_predicting = False
self.current_training_source = "SYSTEM" # [AUTO, MANUAL, SYSTEM]
# Performance Auto-Detection (First Run)
self._auto_detect_performance_profile()
@property
def mt5(self):
# Connections/State
if self._mt5 is None:
from core.mt5_connector import MT5Connector
self._mt5 = MT5Connector(default_settings.symbol)
# Register with Sentinel
from core.sentinel import get_sentinel
get_sentinel(mt5=self._mt5)
return self._mt5
@property
def training_manager(self):
if self._training_manager is None:
from core.training_manager import TrainingManager
self._training_manager = TrainingManager(self.mt5)
# Register with Sentinel
from core.sentinel import get_sentinel
get_sentinel(tm=self._training_manager)
return self._training_manager
@property
def prediction_engine(self):
if self._prediction_engine is None:
from core.prediction_engine import PredictionEngine
self._prediction_engine = PredictionEngine(self.training_manager, self.mt5)
# Register with Sentinel
from core.sentinel import get_sentinel
get_sentinel(pe=self._prediction_engine)
return self._prediction_engine
@property
def scheduler(self):
if self._scheduler is None:
from core.scheduler import Scheduler
self._scheduler = Scheduler(self.prediction_engine, self.mt5)
return self._scheduler
[docs]
def start(self):
"""Start the application - minimal startup, just connect MT5"""
self.is_active = True
# 0. Migration: Force 2015 default for extraction (User repeated request)
try:
current_from = self.db.get_config("from_year")
if not current_from or current_from == "2020":
logger.info("Migrating from_year default: 2020 -> 2015")
self.db.set_config("from_year", "2015")
except Exception as e:
logger.error(f"Migration error: {e}")
# Launch EVERYTHING in thread to avoid blocking UI
# MT5 initialization can take time, and model loading takes time.
def start_background():
try:
logger.info("Connecting to MT5 in background...")
self.mt5_connected = self.mt5.connect()
if self.mt5_connected:
logger.info("════════════════════════════════════════════════════════════")
logger.info(" MT5 CONNECTION ESTABLISHED")
logger.info("════════════════════════════════════════════════════════════")
# WARMUP IMPORTS (Background)
try:
logger.info("Warming up ML modules...")
from core.training_manager import TrainingManager
from core.prediction_engine import PredictionEngine
logger.info("ML modules warmed up.")
except Exception as e:
logger.warning(f"Module warmup failed: {e}")
if not self.mt5_connected:
logger.warning("MT5 Connection failed/offline.")
if self.prediction_engine:
# MANUAL MODE: Engine ready for triggers
pass
logger.info("Prediction Engine ready (Manual Mode)")
# Start Account Info Worker (Keep this AUTO)
threading.Thread(target=self._account_info_worker, daemon=True).start()
# Start AI Scheduler Worker
threading.Thread(target=self._scheduler_worker, daemon=True).start()
# SYNC EA CONFIG (Ensure EA has latest settings)
if self.mt5_connected:
threading.Thread(target=self.sync_ea_config, daemon=True).start()
self.log_training_event("SYSTEM", "STARTUP", "Application started")
logger.info("════════════════════════════════════════════════════════════")
logger.info(" APPLICATION ENGINE READY")
logger.info("════════════════════════════════════════════════════════════")
except Exception as e:
logger.error(f"CRITICAL: Background startup error: {e}")
import traceback
logger.error(traceback.format_exc())
threading.Thread(target=start_background, daemon=True).start()
# NOTE: Extraction and Training still require manual triggers
# or specific scheduling logic
if not self.mt5_connected:
# Try once more with detailed feedback
success, message = self.mt5.check_health()
self.mt5_connected = success
if success:
logger.info("MT5 Connection recovered via health check")
else:
logger.error(f"MT5 Initial connect failed: {message}")
logger.info("Application started")
[docs]
def stop(self):
"""Stop the application (deactivate)"""
self.is_active = False
# Stop all processes
if self._mt5:
self.mt5.disconnect()
if self._training_manager:
self.training_manager.stop_scheduled_training()
if self._scheduler:
self.scheduler.stop()
self.mt5_connected = False
self.mt5_connected = False
logger.info("════════════════════════════════════════════════════════════")
logger.info(" APPLICATION SHUTDOWN COMPLETE")
logger.info("════════════════════════════════════════════════════════════")
[docs]
@profile(threshold_ms=100.0)
def predict_manual(self, timeframe: str) -> dict:
"""
Execute a MANUAL prediction for a specific timeframe.
No scheduling, no background loop. Direct execution.
"""
if not self.prediction_engine:
logger.error("Prediction Engine not loaded")
return {"error": "Engine not loaded"}
logger.info("═══ START SECTION: MANUAL PREDICTION ═══")
logger.info(f"Context: Timeframe={timeframe}, Request=UserClick")
# 1. Ensure Data Freshness (Fast Fetch)
try:
# OPTIMIZATION: IN-MEMORY PIPELINE
# Fetch buffer directly from MT5 (6000 rows for safety)
# This avoids reading 100MB+ CSV files from disk.
df_buffer = None
if self.mt5_connected:
df_buffer = self.mt5.get_buffer(timeframe, n=6000)
# If buffer is None (offline), prediction_engine will fallback to disk.
# Run prediction
result = self.prediction_engine.predict_timeframe(timeframe, df=df_buffer)
return result
except Exception as e:
logger.error(f"Manual prediction failed: {e}")
return {"error": str(e)}
[docs]
def train_all(self, force_global: bool = False, source: str = "MANUAL"):
"""Train all models for all timeframes"""
def run():
self.is_training = True
self.current_training_source = source
try:
self.log_training_event("MANUAL", "START", "Starting all timeframes")
self.training_manager.train_all(force_global=force_global)
self.log_training_event("MANUAL", "SUCCESS", "Finished training all")
except Exception as e:
self.log_training_event("MANUAL", "FAILED", str(e))
logger.error(f"Training error: {e}")
self.is_training = False
logger.info("═══ END SECTION: SMART TRAIN ═══")
thread = threading.Thread(target=run, daemon=True)
thread.start()
[docs]
def predict_all(self):
"""Generate predictions for all timeframes"""
def run():
self.is_predicting = True
try:
self.prediction_engine.predict_all()
# Update cache
for tf in TIMEFRAME_NAMES:
sig = self.prediction_engine.load_signal(tf)
if sig:
self._signal_cache[tf] = sig
except Exception as e:
logger.error(f"Prediction error: {e}")
self.is_predicting = False
thread = threading.Thread(target=run, daemon=True)
thread.start()
[docs]
def get_signal(self, timeframe: str) -> Optional[dict]:
"""Get current signal for a timeframe (from cache)"""
return self._signal_cache.get(timeframe)
[docs]
def get_combined_signal(self) -> dict:
"""Get combined signal from all timeframes"""
try:
return self.prediction_engine.get_combined_signal()
except Exception:
return {"signal": "NEUTRAL", "confidence": 0, "votes": {}}
[docs]
def get_ohlc_status(self, timeframe: str) -> dict:
"""Get OHLC file status for a timeframe"""
return self._ohlc_status_cache.get(timeframe, {"exists": False})
[docs]
def get_training_status(self, timeframe: str) -> dict:
"""Get training status for a timeframe"""
return self._training_status_cache.get(timeframe, {"models": {}})
[docs]
def get_model(self, timeframe: str, model_type: str):
"""Get a specific model"""
try:
return self.training_manager.get_model(timeframe, model_type)
except Exception:
return None
[docs]
def get_settings(self) -> dict:
"""Get current settings"""
import json
# Parse nested JSON configs from DB
def parse_json_config(key, defaults):
raw = self.db.get_config(key)
if raw:
try:
return json.loads(raw.replace("'", '"'))
except:
pass
return defaults
return {
"mt5_path": self.db.get_config("mt5_path") or "",
"symbol": self.db.get_config("symbol") or "EURUSD",
# Trading & System
"magic_number": int(self.db.get_config("magic_number") or 234000),
"max_slippage": int(self.db.get_config("max_slippage") or 3),
"max_spread": int(self.db.get_config("max_spread") or 20),
"audio_alerts": str(self.db.get_config("audio_alerts")).lower() == "true",
"auto_connect": str(self.db.get_config("auto_connect")).lower() == "true",
"ohlc_interval": int(self.db.get_config("ohlc_interval") or 12),
"from_year": int(self.db.get_config("from_year") or 2015),
"history_1m_months": int(self.db.get_config("history_1m_months") or 6),
# AI & Signals
"min_consensus": int(self.db.get_config("min_consensus") or 2),
"confidence_threshold": float(self.db.get_config("confidence_threshold") or 0.65),
"validation_ratio": float(self.db.get_config("validation_ratio") or 0.20),
"hyper_optimization": self.db.get_config("hyper_optimization") == "True",
"incremental_training": self.db.get_config("incremental_training") != "False", # Default True
"cpu_threads": int(self.db.get_config("cpu_threads") or 4),
"performance_profile": self.db.get_config("performance_profile") or "BALANCED",
"financial_guard_policy": self.db.get_config("financial_guard_policy") or "Advisory (Notify Only)",
# Nested Configs (Scalper Defaults)
"trailing_config": parse_json_config("trailing_config", {
"adaptive_enabled": True,
"step": 5,
"adaptive_start": 20,
"adaptive_deep": 200,
"adaptive_min": 10
}),
"prop_firm_config": parse_json_config("prop_firm_config", {
"enabled": True,
"max_daily_loss_pct": 5.0,
"max_total_drawdown_pct": 10.0,
"risk_per_trade_pct": 1.0,
"profit_target_pct": 10.0,
"news_filter_enabled": True,
"neutral_stop_pts": 50
}),
}
[docs]
@profile(threshold_ms=1.0)
def get_cached_account_info(self) -> Optional[dict]:
"""Get account info from cache (Instant)"""
return self._account_info_cache
def _account_info_worker(self):
"""Dedicated background worker for account info"""
import time
while self.is_active:
try:
if self.mt5_connected:
info = self.mt5.get_account_info()
if info:
self._account_info_cache = info
self._last_account_update = time.time()
# INTELLIGENT THROTTLING: Adaptive sleep based on Profile
profile = default_settings.performance_profile
sleep_time = 3 # PRO
if profile == "ECO": sleep_time = 30 # Slow updates for Low RAM/CPU
elif profile == "BALANCED": sleep_time = 10
time.sleep(sleep_time)
except Exception as e:
logger.error(f"Account worker error: {e}")
time.sleep(10)
# Check stopping condition more frequently
if not self.is_active:
break
[docs]
def save_settings(self, settings: dict):
"""Save settings to database and sync to memory"""
import json
for key, value in settings.items():
# Serialize nested dicts as JSON
if isinstance(value, dict):
self.db.set_config(key, json.dumps(value))
else:
self.db.set_config(key, str(value))
# Update symbol in active connector
if key == "symbol" and self._mt5:
# MT5Connector uses self.symbol for extraction
self._mt5.symbol = str(value)
# Immediate sync to in-memory defaults (for TrainingManager/MT5/etc)
self._sync_settings_to_memory()
logger.info("Settings saved and synchronized.")
[docs]
def get_congress_config(self) -> dict:
"""Get Congress AI configuration"""
from config.settings import get_congress_config
return get_congress_config()
[docs]
def save_congress_config(self, config: dict):
"""Save Congress AI configuration to JSON"""
import json
from config.settings import MODEL_CONFIG_PATH, load_model_config
try:
# Load existing full config to preserve other keys
full_config = load_model_config()
# Update congress_config section
full_config["congress_config"] = config
# Also update base weights/thresholds for backward compatibility
# (Use Range weights as default since it's the tricky one)
weights = config.get("weights", {}).get("range", {})
full_config["model_weights"] = weights
thresh = config.get("thresholds", {})
full_config["thresholds"] = {
"buy": 0.30, # Keep default or dynamic?
"sell": -0.30
}
if "range_base" in thresh:
base = thresh["range_base"]
full_config["thresholds"]["range_buy"] = base
full_config["thresholds"]["range_sell"] = -base
# Write to file
with open(MODEL_CONFIG_PATH, 'w') as f:
json.dump(full_config, f, indent=4)
logger.info("Congress AI Logic configuration saved.")
except Exception as e:
logger.error(f"Failed to save Congress config: {e}")
def _sync_settings_to_memory(self):
"""Synchronize DB settings with global default_settings object"""
try:
conf = self.db.get_all_config()
if not conf: return
# MT5/Trading
if conf.get("symbol"): default_settings.symbol = conf["symbol"]
if conf.get("mt5_path"): default_settings.mt5_path = conf["mt5_path"]
try:
if conf.get("magic_number"): default_settings.magic_number = int(conf["magic_number"])
except: pass
# Data
try:
if conf.get("from_year"): default_settings.extract_from_year = int(conf["from_year"])
if conf.get("history_1m_months"): default_settings.history_1m_months = int(conf["history_1m_months"])
except: pass
# AI
try:
if conf.get("confidence_threshold"): default_settings.confidence_threshold = float(conf["confidence_threshold"])
if conf.get("min_consensus"): default_settings.min_consensus = int(conf["min_consensus"])
if conf.get("cpu_threads"): default_settings.cpu_threads = int(conf["cpu_threads"])
except: pass
if conf.get("performance_profile"): default_settings.performance_profile = conf["performance_profile"]
if conf.get("incremental_training"): default_settings.incremental_training = conf["incremental_training"] == "True"
if conf.get("hyper_optimization"): default_settings.hyper_optimization = conf["hyper_optimization"] == "True"
# Scheduler Sync
if conf.get("scheduler_enabled"): self.scheduler_enabled = conf["scheduler_enabled"] == "True"
if conf.get("scheduler_cycle"): self.scheduler_cycle = conf["scheduler_cycle"]
# Update staleness if needed
try:
if conf.get("ohlc_interval"): default_settings.ohlc_interval_hours = int(conf["ohlc_interval"])
except: pass
logger.info(f"✅ Configuration Synced: profile={default_settings.performance_profile}, scheduler={self.scheduler_enabled}")
except Exception as e:
logger.error(f"Failed to sync settings: {e}")
def _auto_detect_performance_profile(self):
"""Detect system hardware and set optimal performance profile if not already set"""
try:
# Check if already set in DB
existing = self.db.get_config("performance_profile")
if existing:
return # User or system already set it
import psutil
ram_gb = psutil.virtual_memory().total / (1024**3)
cpu_cores = psutil.cpu_count(logical=True)
profile = "BALANCED"
if ram_gb < 7.5: # Allow slight margin for 8GB systems
profile = "ECO"
elif ram_gb >= 15.5 and cpu_cores >= 8:
profile = "PRO"
logger.info(f"🚀 HARDWARE AUTO-DISCOVERY: RAM={ram_gb:.1f}GB, Cores={cpu_cores} -> Profile: {profile}")
self.db.set_config("performance_profile", profile)
# Sync to memory
default_settings.performance_profile = profile
except Exception as e:
logger.warning(f"Hardware detection failed: {e}")
# Fallback to current memory value (BALANCED)
# ════════════════════════════════════════════════════════════════════
# SCHEDULER LOGIC
# ════════════════════════════════════════════════════════════════════
[docs]
def update_scheduler_config(self, enabled: bool, cycle: str):
"""Update scheduler configuration and reset timer"""
self.scheduler_enabled = enabled
self.scheduler_cycle = cycle
self._calculate_next_run()
# Save to DB for persistence
self.db.set_config("scheduler_enabled", str(enabled))
self.db.set_config("scheduler_cycle", cycle)
logger.info(f"Scheduler updated: Enabled={enabled}, Cycle={cycle}, Next Run={self.next_run_time}")
def _calculate_next_run(self):
"""Calculate next run time based on cycle"""
try:
from datetime import datetime, timedelta
now = datetime.now()
if self.scheduler_cycle == "4h":
# Round to next 4h block (00, 04, 08, 12, 16, 20)
next_hour = (now.hour // 4 + 1) * 4
if next_hour >= 24:
self.next_run_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
else:
self.next_run_time = now.replace(hour=next_hour, minute=0, second=0, microsecond=0)
elif self.scheduler_cycle == "12h":
# Round to next 12h block (00, 12)
next_hour = (now.hour // 12 + 1) * 12
if next_hour >= 24:
self.next_run_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
else:
self.next_run_time = now.replace(hour=next_hour, minute=0, second=0, microsecond=0)
elif self.scheduler_cycle == "daily":
# Next day at 00:00
self.next_run_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1)
elif self.scheduler_cycle == "weekly":
# Next Sunday at 00:00
days_ahead = 6 - now.weekday() # Sunday is 6
if days_ahead <= 0: # If today is Sunday, move to next Sunday
days_ahead += 7
next_run = now + timedelta(days=days_ahead)
self.next_run_time = next_run.replace(hour=0, minute=0, second=0, microsecond=0)
else:
# Default fallback 4h
self.next_run_time = now + timedelta(hours=4)
logger.info(f"DEBUG: _calculate_next_run finished. New next_run_time: {self.next_run_time}")
except Exception as e:
logger.error(f"DEBUG: _calculate_next_run FALIED: {e}", exc_info=True)
def _scheduler_worker(self):
"""Background worker for AI Scheduler"""
import time
from datetime import datetime
# Init state from DB if possible
try:
saved_enabled = self.db.get_config("scheduler_enabled") == "True"
saved_cycle = self.db.get_config("scheduler_cycle") or "4h"
self.scheduler_enabled = saved_enabled
self.scheduler_cycle = saved_cycle
self._calculate_next_run()
except:
self.scheduler_enabled = False # Default to FALSE (Manual First)
self.scheduler_cycle = "4h"
self.next_run_time = None
while self.is_active:
try:
if self.scheduler_enabled and self.next_run_time:
now = datetime.now()
if now >= self.next_run_time:
logger.info(f"⏰ Scheduler Triggered! (Now={now} >= Next={self.next_run_time})")
# 1. Update Data
if self.mt5_connected:
logger.info("DEBUG: Scheduler step 1: Updating OHLC...")
self.mt5.extract_all(is_update=True)
logger.info("DEBUG: Scheduler step 1: Updating OHLC DONE")
# 2. Train Models
logger.info("DEBUG: Scheduler step 2: Training Models...")
self.train_all(force_global=False, source="AUTO")
logger.info("DEBUG: Scheduler step 2: Training Models DONE")
# 3. Predict
logger.info("DEBUG: Scheduler step 3: Running Predictions...")
self.predict_all()
logger.info("DEBUG: Scheduler step 3: Running Predictions DONE")
# 3. Recalculate next run
logger.info("DEBUG: Scheduler step 4: Recalculating next run...")
self._calculate_next_run()
logger.info(f"Scheduler: Next run set to {self.next_run_time}")
time.sleep(10) # Check every 10 seconds
except Exception as e:
logger.error(f"Scheduler worker CRASHED: {e}", exc_info=True)
time.sleep(30)
[docs]
def sync_ea_config(self):
"""Headless synchronization of EA settings to MT5 folder"""
try:
config_path = DATA_DIR / "ea_config.json"
if not config_path.exists():
logger.info("No EA config found to sync.")
return
import json
with open(config_path, 'r') as f:
settings = json.load(f)
# Target directory detection
target_dir = None
if self.mt5_connected and self.mt5.data_path:
target_dir = Path(self.mt5.data_path) / "MQL5" / "Files" / "Cerebrum"
if not target_dir:
from config.settings import SIGNALS_DIR
target_dir = SIGNALS_DIR.parent
target_dir.mkdir(parents=True, exist_ok=True)
target_file = target_dir / "ea_config.json"
# Flattening logic (Keep in sync with EAManagerWidget)
flat = {}
o = settings.get("order", {})
flat["fixed_lots"] = o.get("volume", 0.1)
flat["atr_mult_sl"] = 1.5
flat["atr_mult_tp"] = 3.0
flat["inp_order_type"] = o.get("type", "MARKET")
flat["inp_direction"] = o.get("direction", "AUTO")
flat["inp_price"] = o.get("price", 0.0)
flat["inp_sl_pts"] = o.get("sl", 0)
flat["inp_tp_pts"] = o.get("tp", 20)
flat["inp_fixed_sl"] = o.get("fixed_sl", 10) # Default 1 pip
flat["inp_trailing"] = o.get("trailing", 10) # Default 1 pip
flat["inp_max_spread"] = o.get("max_spread", 20)
flat["inp_comment"] = o.get("comment", "Cerebrum Trend Capture")
flat["inp_magic"] = o.get("magic", 123456)
p = settings.get("position", {})
flat["inp_allow_modif"] = p.get("allow_modif", True)
flat["inp_partial_close"] = p.get("partial_close", 0)
flat["inp_breakeven"] = p.get("auto_be", 15) # Default 1.5 pips
flat["inp_breakeven_lock"] = p.get("be_lock", 10) # Default 1 pip
flat["inp_reverse"] = p.get("reverse", False)
flat["inp_multi_pos"] = p.get("multi_pos", False)
r = settings.get("risk", {})
flat["inp_risk_per_trade"] = r.get("risk_per_trade", 1.0)
flat["inp_dynamic_lot"] = r.get("dynamic_lot", False)
flat["inp_max_dd"] = r.get("max_dd", 5.0)
flat["inp_stop_loss_count"] = r.get("stop_after_loss", 3)
flat["inp_equity_protect"] = r.get("equity_protect", 0)
flat["inp_max_pos"] = r.get("max_pos", 1)
flat["inp_max_vol"] = r.get("max_vol", 10.0)
flat["session_filter_enabled"] = r.get("session_filter_enabled", True)
flat["session_start_hr"] = 7
flat["session_end_hr"] = 20
with open(target_file, 'w') as f:
json.dump(flat, f, indent=4)
logger.info(f"EA Auto-Sync Complete: {target_file}")
except Exception as e:
logger.warning(f"EA Auto-Sync failed: {e}")
[docs]
def log_training_event(self, source: str, status: str, details: str):
"""Log a training event to history file with source [AUTO|MANUAL|SYSTEM]"""
try:
from config.settings import DATA_DIR
import json
from datetime import datetime
history_file = DATA_DIR / "training_history.json"
event = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"source": source,
"status": status,
"details": details
}
history = []
if history_file.exists():
try:
with open(history_file, 'r') as f:
history = json.load(f)
except:
history = []
history.insert(0, event) # Newest first
# Keep last 100 events
history = history[:100]
with open(history_file, 'w') as f:
json.dump(history, f, indent=4)
logger.info(f"Training Event logged: [{source}] {status} - {details}")
except Exception as e:
logger.warning(f"Failed to log training event: {e}")
# (End of file)