Source code for core.mt5_connector

"""
Cerebrum Forex - MT5 Connector
Handles connection to MetaTrader 5 and OHLC data extraction.
"""

import logging
from datetime import datetime
from pathlib import Path
from typing import Optional, Callable, Tuple
import threading
import time

import pandas as pd

try:
    import MetaTrader5 as mt5
    MT5_AVAILABLE = True
except ImportError:
    MT5_AVAILABLE = False

from config.settings import (
    TIMEFRAMES, 
    OHLC_DIR, 
    get_timeframe_by_name,
    TimeframeConfig,
    default_settings,
)
from database import get_database
from core.debug_manager import get_debug_manager
from core.sentinel import get_sentinel

logger = logging.getLogger(__name__)


[docs] class MT5Connector: """MetaTrader 5 connector for OHLC data extraction""" def __init__(self, symbol: str = "EURUSD"): self.symbol = symbol self.connected = False self._callbacks: list[Callable] = [] # Locks for thread safety self._lock = threading.RLock() # For MT5 connection/API self._file_lock = threading.Lock() # For file I/O (CSV) # Paths (Populated on connect) self.terminal_path: Optional[str] = None self.data_path: Optional[str] = None # Ensure OHLC directory exists OHLC_DIR.mkdir(parents=True, exist_ok=True)
[docs] def add_callback(self, callback: Callable): """Add callback for status updates""" self._callbacks.append(callback)
def _notify(self, event: str, data: dict): """Notify all callbacks""" for callback in self._callbacks: try: callback(event, data) except Exception as e: logger.error(f"Callback error: {e}")
[docs] def close_all_positions(self) -> Tuple[int, int]: """ PANIC BUTTON: Close all open positions immediately. Returns: (closed_count, error_count) """ closed = 0 errors = 0 with self._lock: if not self.connected and not self.connect(): logger.error("Cannot close positions: MT5 disonnected") return 0, 0 positions = mt5.positions_get(symbol=self.symbol) if not positions: logger.info("No open positions to close.") return 0, 0 logger.warning(f"🚨 PANIC CLOSE INITIATED: Closing {len(positions)} positions...") for pos in positions: # Close Logic: Send opposite order order_type = mt5.ORDER_TYPE_SELL if pos.type == mt5.ORDER_TYPE_BUY else mt5.ORDER_TYPE_BUY price = mt5.symbol_info_tick(self.symbol).bid if order_type == mt5.ORDER_TYPE_SELL else mt5.symbol_info_tick(self.symbol).ask request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol, "volume": pos.volume, "type": order_type, "position": pos.ticket, "price": price, "deviation": 20, "magic": 234000, "comment": "panic_close", "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_IOC, } result = mt5.order_send(request) if result.retcode != mt5.TRADE_RETCODE_DONE: logger.error(f"Failed to close ticket {pos.ticket}: {result.comment}") errors += 1 else: logger.info(f"Closed ticket {pos.ticket} ({pos.profit:.2f})") closed += 1 return closed, errors
[docs] def connect(self, timeout: float = 5.0) -> bool: """Connect to MT5 terminal with timeout protection""" from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout if self.connected: return True with self._lock: if not MT5_AVAILABLE: logger.error("MetaTrader5 package not installed") return False # Use timeout to prevent infinite blocking def _init_mt5(): path = getattr(default_settings, 'mt5_path', "") if path: self.terminal_path = path return mt5.initialize(path=path) success = mt5.initialize() if success: info = mt5.terminal_info() if info: self.terminal_path = info.path self.data_path = info.data_path return success def _resolve_symbol(target: str) -> str: # 1. Direct check if mt5.symbol_info(target): return target # 2. Search for matches (e.g. EURUSD.m, EURUSD+, EURUSDpro) symbols = mt5.symbols_get() if not symbols: logger.error(f"Failed to get symbols from MT5: {mt5.last_error()}") return target # Filter for base match matches = [s.name for s in symbols if target in s.name and "EUR" in s.name and "USD" in s.name] logger.info(f"Candidate symbols for {target}: {matches}") if matches: # Sort by length (prefer EURUSD over EURUSD.pro) matches.sort(key=len) # If multiple, prefer the ones with suffixes commonly used by brokers # But for now shortest is usually safest UNLESS it's empty. logger.info(f"Symbol {target} resolved to {matches[0]}") return matches[0] return target try: executor = ThreadPoolExecutor(max_workers=1) future = executor.submit(_init_mt5) result = future.result(timeout=timeout) if result: logger.info("Connected to MT5") # Capture terminal info for diagnostics info = mt5.terminal_info() if info: self.terminal_path = info.path self.data_path = info.data_path logger.info(f"MT5 Terminal Path: {self.terminal_path}") logger.info(f"MT5 Data Path: {self.data_path}") get_debug_manager().log("EXTRACTION", "βœ… MT5 CONNECT SUCCESS", {"symbol": self.symbol}) else: err = mt5.last_error() # AUTO-RECOVERY: If path not set, try to find it current_path = getattr(default_settings, 'mt5_path', "") if not current_path: logger.warning("Standard MT5 init failed. Attempting deep search for terminal64.exe...") found_path = self._auto_discover_terminal() if found_path: logger.info(f"Auto-discovered MT5 at: {found_path}") if mt5.initialize(path=found_path): logger.info("Connected to MT5 via auto-discovery") self.terminal_path = found_path return True logger.error(f"MT5 initialization failed: {err}") get_debug_manager().log("EXTRACTION", "❌ MT5 CONNECT FAILED", str(err)) return False # Resolve symbol DYNAMICALLY resolved = _resolve_symbol(self.symbol) if resolved != self.symbol: logger.warning(f"Symbol auto-resolved: '{self.symbol}' -> '{resolved}'") self.symbol = resolved # Update settings to persist this? Maybe just runtime for now. except FuturesTimeout: logger.error(f"MT5 connection timed out after {timeout}s (MT5 may be unresponsive)") # Index DX-100: Connection Recovery get_sentinel(mt5=self).repair("DX-100") return False except Exception as e: logger.error(f"MT5 connection error: {e}") get_sentinel(mt5=self).repair("DX-100") return False finally: # Use wait=False to prevent hanging the main thread if mt5.initialize is stuck in C-code executor.shutdown(wait=False) self.connected = True # Ensure symbol is selected in Market Watch if not mt5.symbol_select(self.symbol, True): logger.warning(f"Failed to select symbol {self.symbol}, trying to proceed anyway...") logger.info("Connected to MT5") self._notify("connected", {"status": True}) return True
[docs] def check_health(self) -> tuple[bool, str]: """Detailed connection diagnostic""" if not MT5_AVAILABLE: return False, "MetaTrader5 package not installed. Try: pip install MetaTrader5" with self._lock: # 1. Try standard connect if self.connect(): return True, "Successfully connected to MetaTrader 5." # 2. Get last error err = mt5.last_error() # 3. Specific diagnostics if err[0] == -1: # Search for terminal t_path = self._auto_discover_terminal() if not t_path: return False, "MT5 Terminal NOT FOUND. Please ensure MetaTrader 5 is installed or set the path in Settings." else: return False, f"MT5 found at {t_path}, but failed to initialize. Try running MT5 as Administrator." if err[0] == -10004: return False, "MT5 connection failed: IPC (Inter-Process Communication) error. Restart your terminal." return False, f"MT5 Connection Failed: {err[1]} (Code: {err[0]})"
def _auto_discover_terminal(self) -> Optional[str]: """Deep search for MT5 terminal executable""" common_paths = [ Path("C:/Program Files/MetaTrader 5/terminal64.exe"), Path("C:/Program Files (x86)/MetaTrader 5/terminal64.exe"), ] # Check standard paths first for p in common_paths: if p.exists(): return str(p) # Search in Program Files for ANY MT5 terminal try: for pf in [Path("C:/Program Files"), Path("C:/Program Files (x86)")]: if not pf.exists(): continue # Look for terminal64.exe in any subfolder containing 'MetaTrader' for folder in pf.glob("*MetaTrader*"): t_app = folder / "terminal64.exe" if t_app.exists(): return str(t_app) except: pass return None def get_terminal_data_path(self) -> Optional[str]: """Robustly get the MT5 Data Path (for MQL5/Files)""" if self.data_path: return self.data_path with self._lock: # We assume connect() or similar already initialized mt5 try: info = mt5.terminal_info() if info: self.data_path = info.data_path return self.data_path except: pass return None
[docs] def disconnect(self): """Disconnect from MT5""" with self._lock: if MT5_AVAILABLE and self.connected: mt5.shutdown() self.connected = False logger.info("Disconnected from MT5") self._notify("disconnected", {"status": True})
[docs] def test_connection(self) -> bool: """Test MT5 connection""" with self._lock: if not self.connect(): return False # Try to get symbol info if MT5_AVAILABLE: info = mt5.symbol_info(self.symbol) if info is None: logger.error(f"Symbol {self.symbol} not found") self.disconnect() return False self.disconnect() return True
[docs] def preload_history(self, timeframe: str = None) -> dict: """ Force MT5 to download maximum available history for timeframes. This triggers the broker server to send all cached historical data. Args: timeframe: Specific timeframe to preload, or None for all Returns: dict with preload results per timeframe """ if not MT5_AVAILABLE: return {"error": "MT5 not available"} results = {} with self._lock: if not self.connected and not self.connect(): return {"error": "Failed to connect to MT5"} # Ensure symbol is selected in Market Watch (triggers data sync) mt5.symbol_select(self.symbol, True) # Get timeframes to preload if timeframe: timeframes = [timeframe] else: # All timeframes from config timeframes = [tf.name for tf in TIMEFRAMES] for tf_name in timeframes: tf_config = get_timeframe_by_name(tf_name) if tf_config is None: continue try: # Request maximum bars (500,000) to force full history download # MT5 will return what the broker allows logger.info(f"[PRELOAD] Requesting max history for {tf_name}...") # copy_rates_from_pos with very large count forces download rates = mt5.copy_rates_from_pos( self.symbol, tf_config.mt5_timeframe, 0, # Start from most recent 500000 # Request 500k bars (MT5 will cap to available) ) if rates is not None and len(rates) > 0: # Convert to get date range first_date = pd.to_datetime(rates[0]['time'], unit='s') last_date = pd.to_datetime(rates[-1]['time'], unit='s') results[tf_name] = { "success": True, "bars": len(rates), "from": first_date.strftime("%Y-%m-%d"), "to": last_date.strftime("%Y-%m-%d") } logger.info(f"[PRELOAD] {tf_name}: {len(rates)} bars ({first_date.date()} to {last_date.date()})") else: error = mt5.last_error() results[tf_name] = {"success": False, "error": str(error)} logger.warning(f"[PRELOAD] {tf_name}: Failed - {error}") except Exception as e: results[tf_name] = {"success": False, "error": str(e)} logger.error(f"[PRELOAD] {tf_name}: Exception - {e}") logger.info(f"[PRELOAD] History preload complete: {sum(1 for r in results.values() if r.get('success'))}/{len(results)} timeframes loaded") return results
[docs] def get_terminal_data_path(self) -> Optional[str]: """Get MT5 terminal data path (AppData directory)""" with self._lock: if not self.connected and not self.connect(): return None if MT5_AVAILABLE: info = mt5.terminal_info() if not info: # Attempt re-init mt5.initialize() info = mt5.terminal_info() if info: return info.data_path return None
[docs] def get_current_server_time(self) -> datetime: """ Get the ABSOLUTE LATEST server time from the last tick/quote. This represents 'NOW' on the server. """ if not self.connected and not self.connect(): return datetime.now() # Fallback try: # Fast check via symbol tick tick = mt5.symbol_info_tick(self.symbol) if tick: return datetime.fromtimestamp(tick.time) # Slower check via symbol info info = mt5.symbol_info(self.symbol) if info: return datetime.fromtimestamp(info.time) except Exception: pass return datetime.now() # Absolute fallback
[docs] def get_server_time(self) -> Optional[datetime]: """Get current server time (Legacy/Optional Wrapper)""" return self.get_current_server_time()
[docs] def get_time_offset(self) -> float: """ Calculate seconds offset between Local Time and Server Time. Positive = Local is ahead of Server. Negative = Local is behind Server. """ server_time = self.get_server_time() if not server_time: return 0.0 local_time = datetime.now() # Round to minutes to avoid seconds-level jitter delta = (local_time - server_time).total_seconds() return delta
[docs] def get_account_info(self) -> Optional[dict]: """Get MT5 account information (Non-blocking for UI)""" # Try to acquire lock with short timeout to prevent UI freeze if not self._lock.acquire(timeout=0.05): return None try: if not MT5_AVAILABLE or not self.connected: return None info = mt5.account_info() if info is None: return None # Fetch Open Positions positions = mt5.positions_get() pos_list = [] if positions: for pos in positions: pos_list.append({ "ticket": pos.ticket, "symbol": pos.symbol, "type": pos.type, # 0=Buy, 1=Sell "volume": pos.volume, "price_open": pos.price_open, "profit": pos.profit, "comment": pos.comment }) # Return Dictionary compatible with UI return { "login": info.login, "name": info.name, "server": info.server, "currency": info.currency, "company": info.company, "leverage": info.leverage, "balance": info.balance, "equity": info.equity, "profit": info.profit, "margin": info.margin, "margin_free": info.margin_free, "margin_level": info.margin_level, "trade_mode": info.trade_mode, "positions": pos_list } finally: self._lock.release()
[docs] def get_market_status(self) -> dict: """ Check real-time market status for the current symbol via MT5. Returns a dict with 'status', 'description', and 'is_open' boolean. """ # Try to acquire lock with short timeout to prevent UI freeze if not self._lock.acquire(timeout=0.05): return {"status": "CHECKING", "is_open": False, "description": "Terminal Busy"} try: if not MT5_AVAILABLE or not self.connected: return {"status": "OFFLINE", "is_open": False, "description": "MT5 Not Connected"} # 1. Get Symbol Info info = mt5.symbol_info(self.symbol) if info is None: return {"status": "ERROR", "is_open": False, "description": "Symbol Not Found"} # 2. Check Trade Mode # trade_mode: 0=Disabled, 1=LongOnly, 2=ShortOnly, 3=CloseOnly, 4=Full if info.trade_mode == 0: return {"status": "CLOSED", "is_open": False, "description": "Trading Disabled"} # 3. Check Session Status # Check for holiday/weekend via session gaps from datetime import datetime, timezone now_utc = datetime.now(timezone.utc) day_of_week = now_utc.weekday() # 0-6 # Market check (Sunday 22:00 UTC to Friday 22:00 UTC) if day_of_week == 5: # Saturday return {"status": "CLOSED", "is_open": False, "description": "Weekend (Saturday)"} if day_of_week == 6 and now_utc.hour < 22: # Sunday before 22:00 return {"status": "CLOSED", "is_open": False, "description": "Weekend (Sunday)"} if day_of_week == 4 and now_utc.hour >= 22: # Friday after 22:00 return {"status": "CLOSED", "is_open": False, "description": "Market Closed (Friday)"} # 4. Check quote freshness (Active Ticks) tick = mt5.symbol_info_tick(self.symbol) if tick: tick_time = datetime.fromtimestamp(tick.time) server_time = self.get_current_server_time() age_seconds = (server_time - tick_time).total_seconds() # If ticks are very old during a weekday, it's likely a bank holiday where MT5 is up but symbol is paused if age_seconds > 3600: return {"status": "HOLIDAY", "is_open": False, "description": "Market Inactive (Holiday?)"} return { "status": "OPEN", "is_open": True, "description": "Trading Active", "server_time": server_time.strftime("%H:%M:%S") } return {"status": "CLOSED", "is_open": False, "description": "No Quote Signal"} except Exception as e: return {"status": "ERROR", "is_open": False, "description": str(e)} finally: self._lock.release()
[docs] def get_ohlc_path(self, timeframe: str) -> Path: """Get path for OHLC CSV file""" return OHLC_DIR / f"ohlc_{self.symbol}_{timeframe}.csv"
[docs] def extract_ohlc(self, timeframe: str, from_date: datetime = None, to_date: datetime = None, is_update: bool = False) -> Optional[pd.DataFrame]: """ Extract OHLC data for a timeframe. """ if not MT5_AVAILABLE: logger.error("MT5 not available") return None tf_config = get_timeframe_by_name(timeframe) if tf_config is None: logger.error(f"Unknown timeframe: {timeframe}") return None db = get_database() log_id = db.log_extraction_start(timeframe, is_update) # --- PHASE 1: MT5 INTERACTION (NEEDS LOCK) --- rates = None try: with self._lock: # Only lock for MT5 calls if not self.connected and not self.connect(): raise Exception("Failed to connect to MT5") # --- AUTO-PRELOAD for fresh extractions --- # Force MT5 to download maximum available history before extraction # Note: Preload is called WITHOUT releasing the lock to avoid race conditions if not is_update: logger.info(f"[{timeframe}] Running history preload to maximize data...") # Preload uses copy_rates_from_pos which doesn't need separate lock try: rates_preload = mt5.copy_rates_from_pos( self.symbol, tf_config.mt5_timeframe, 0, 100000 # Request max bars to trigger history download ) if rates_preload is not None: logger.info(f"[{timeframe}] Preload successful: {len(rates_preload)} bars available") except Exception as pe: logger.warning(f"[{timeframe}] Preload failed (non-critical): {pe}") # Determine date range if is_update: try: # Use file lock for peeking, avoid race with save with self._file_lock: existing_df = pd.read_csv(self.get_ohlc_path(timeframe)) if existing_df is not None and not existing_df.empty: if 'time' not in existing_df.columns: # Handle malformed CSV from_date = None else: # Use max time existing_df['time'] = pd.to_datetime(existing_df['time']) from_date = existing_df['time'].max() except Exception: from_date = None if from_date is None: # Apply Turbo Extraction: Force smart history at the source level # if timeframe == '1m': # months = getattr(default_settings, 'history_1m_months', 6) # from_date = datetime.now() - pd.DateOffset(months=months) # logger.info(f"[{timeframe}] Turbo Extraction Active: Capped at {months} months.") # else: # Default: Start from user-configured setting from_year = getattr(default_settings, 'extract_from_year', 2015) # Respect user's choice - no profile override # The user sets their preferred from_year in Settings from_date = datetime(from_year, 1, 1) logger.info(f"[{timeframe}] Using user-configured from_year: {from_year}") if to_date is None: # STRICT SERVER TIME # We want all available data up to 'Server Now' + 1 Day (to catch forming candle) server_now = self.get_current_server_time() to_date = server_now + pd.Timedelta(days=1) logger.info(f"[{timeframe}] Server Time: {server_now} -> Extracting until: {to_date}") # Ensure from_date is native datetime (fix pandas Timestamp issues) if hasattr(from_date, 'to_pydatetime'): from_date = from_date.to_pydatetime() # Ensure to_date is native datetime (fix pandas Timestamp issues) if hasattr(to_date, 'to_pydatetime'): to_date = to_date.to_pydatetime() # Check for NaT just in case if pd.isna(from_date): from_date = datetime(2010, 1, 1) logger.info(f"[{timeframe}] Extracting from {from_date} to {to_date}") self._notify("extraction_start", { "timeframe": timeframe, "from_date": from_date, "to_date": to_date, "is_update": is_update, }) # Extract from MT5 WITH TIMEOUT from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout def _fetch(): # Retry loop inside thread for attempt in range(3): try: # 1. Range Fetch (History) rates_range = mt5.copy_rates_range( self.symbol, tf_config.mt5_timeframe, from_date, to_date ) # 2. Latest Fetch (Freshness Guarantee) rates_latest = mt5.copy_rates_from_pos( self.symbol, tf_config.mt5_timeframe, 0, 1000 ) import numpy as np # Check success if rates_range is None and rates_latest is None: err = mt5.last_error() # RECOVERY: If connection lost, try to re-initialize if err[0] == -10004: # No IPC connection logger.warning(f"IPC lost for {timeframe}, attempting re-init...") path = getattr(default_settings, 'mt5_path', "") if path: mt5.initialize(path=path) else: mt5.initialize() logger.warning(f"Attempt {attempt+1} failed for {timeframe}: {err}") raise Exception(f"MT5 error: {err}") # Merge logic if rates_range is None: return rates_latest if rates_latest is None: return rates_range return np.concatenate((rates_range, rates_latest)) except Exception as e: if attempt == 2: raise e time.sleep(1) # Wait before retry try: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(_fetch) # Dynamic timeout: 120s for 1m (large data), 60s for others timeout_s = 120 if timeframe == '1m' else 60 rates = future.result(timeout=timeout_s) except FuturesTimeout: logger.error(f"MT5 Extraction timed out for {timeframe} ({timeout_s}s limit)") raise Exception("MT5 Extraction Timed Out") except Exception as e: logger.error(f"MT5 Extraction failed for {timeframe}: {e}") db.log_extraction_failed(log_id, str(e)) # Index DX-110: OHLC Data Recovery get_sentinel(mt5=self).repair("DX-110", {"timeframe": timeframe}) return None # --- PHASE 2: PROCESSING (NO LOCK) --- # Release lock so UI updates (get_account_info) can proceed try: if rates is None or len(rates) == 0: if is_update: logger.info(f"No new data for {timeframe} (already up to date)") else: logger.warning(f"No data extracted for {timeframe}") db.log_extraction_complete(log_id, 0) return None # Convert to DataFrame (Heavy CPU) df = pd.DataFrame(rates) df['time'] = pd.to_datetime(df['time'], unit='s') # --- DEDUPLICATION (Fix merged duplicates) --- original_len = len(df) df = df.drop_duplicates(subset=['time'], keep='last') if len(df) < original_len: logger.info(f"[{timeframe}] Removed {original_len - len(df)} duplicate rows") # Rename columns df = df.rename(columns={ 'time': 'time', 'open': 'open', 'high': 'high', 'low': 'low', 'close': 'close', 'tick_volume': 'volume', }) # Keep only required columns df = df[['time', 'open', 'high', 'low', 'close', 'volume', 'spread']] # --- DATA VALIDATION --- # Remove rows with invalid OHLC values invalid_mask = ( (df['open'] <= 0) | (df['high'] <= 0) | (df['low'] <= 0) | (df['close'] <= 0) | (df['high'] < df['low']) | # High must be >= Low (df['spread'] < 0) # Spread cannot be negative ) invalid_count = invalid_mask.sum() if invalid_count > 0: logger.warning(f"[{timeframe}] Removed {invalid_count} invalid OHLC rows") df = df[~invalid_mask] # Sort by time to ensure chronological order df = df.sort_values('time').reset_index(drop=True) # Save to CSV (Protected by file lock inside _save_ohlc) self._save_ohlc(timeframe, df, is_update) db.log_extraction_complete(log_id, len(df)) self._notify("extraction_complete", { "timeframe": timeframe, "candles": len(df), "is_update": is_update, }) logger.info(f"Extracted {len(df)} candles for {timeframe}") return df[["time", "open", "high", "low", "close", "volume", "spread"]] except Exception as e: logger.error(f"Processing failed for {timeframe}: {e}") db.log_extraction_failed(log_id, str(e)) self._notify("extraction_error", { "timeframe": timeframe, "error": str(e), }) return None
[docs] def update_all_ohlc(self, timeframes: list = None, callback: callable = None) -> dict: """ Bulk update OHLC for all (or specified) timeframes. This is a DEDICATED extraction phase that should be called BEFORE training, so that training threads never need to wait for MT5 IPC. Args: timeframes: List of TF names to update. Defaults to all standard TFs. callback: Optional callback(tf, status, msg) for progress updates. Returns: Dict with {timeframe: candle_count or error} """ from config.settings import TIMEFRAME_NAMES if timeframes is None: timeframes = list(TIMEFRAME_NAMES) results = {} logger.info(f"πŸ“₯ [Bulk OHLC] Starting extraction for {len(timeframes)} timeframes...") for tf in timeframes: try: if callback: callback(tf, "active", f"Updating {tf}...") # Check if update is needed (fast file check) ohlc_path = self.get_ohlc_path(tf) needs_update = True if ohlc_path.exists(): # Check file modification time vs server time import os file_mtime = datetime.fromtimestamp(os.path.getmtime(ohlc_path)) server_time = self.get_current_server_time() # Freshness thresholds (in minutes) freshness = {"1m": 5, "5m": 10, "15m": 20, "30m": 40, "1h": 70, "4h": 250} limit_mins = freshness.get(tf, 60) age_mins = (server_time - file_mtime).total_seconds() / 60 # Default: Fresh is_fresh = age_mins < limit_mins # CHECK PROFILE CONSISTENCY (Force update if data is too old/wrong for profile) if is_fresh: try: # Read first line to check start date with open(ohlc_path, 'r') as f: header = f.readline() # Header first = f.readline() # First row if first: date_str = first.split(',')[0] first_date = pd.to_datetime(date_str) # Get limit profile = getattr(default_settings, 'performance_profile', 'PRO') required_year = 2015 if profile == 'BALANCED': required_year = 2020 elif profile == 'ECO': required_year = 2023 # If file starts BEFORE required year (e.g. 2015 < 2020), we must update/trim # If file starts significantly AFTER required year (e.g. 2026 > 2020), we must update/fetch more if first_date.year < required_year: logger.info(f"[{tf}] File trimming required ({first_date.year} < {required_year}). Forcing update.") is_fresh = False elif first_date.year > required_year and required_year < 2024: # Only complain if gap is large (>1 year)? # No, let's keep it simple. If we want 2020 and have 2026, we fetch. # But avoid spamming if broker limit (1m). if tf not in ['1m', '5m', '15m']: # Ignore low TFs broker limits logger.info(f"[{tf}] Missing history ({first_date.year} > {required_year}). Forcing update.") is_fresh = False except Exception as e: logger.warning(f"Could not verify file content for {tf}: {e}") is_fresh = False if is_fresh: logger.info(f"[{tf}] OHLC is fresh ({age_mins:.0f}m < {limit_mins}m), skipping extraction.") results[tf] = "fresh" if callback: callback(tf, "complete", "Fresh") needs_update = False if needs_update: df = self.extract_ohlc(tf, is_update=True) if df is not None: results[tf] = len(df) logger.info(f"[{tf}] OHLC updated: {len(df)} candles.") if callback: callback(tf, "complete", f"{len(df):,}") else: results[tf] = "no_new_data" if callback: callback(tf, "complete", "Up-to-date") except Exception as e: logger.error(f"[{tf}] Bulk OHLC update failed: {e}") results[tf] = f"error: {e}" if callback: callback(tf, "error", str(e)) logger.info(f"πŸ“₯ [Bulk OHLC] Complete. Results: {results}") return results
[docs] def get_buffer(self, timeframe: str, n: int = 2000) -> Optional[pd.DataFrame]: """ Get latest N candles directly for In-Memory prediction (FAST BUFFER). NOTE: This is NOT for training. It fetches a small buffer (e.g. 2000 candles) to allow rapid indicator calculation for real-time signals. CRITICAL OPTIMIZATION: save_to_disk=False We do NOT update the massive CSV on every tick. TrainingManager handles historical updates ("Smart Update"). Prediction only needs the in-memory data. """ return self.get_latest_candles(timeframe, n=n, save_to_disk=False)
[docs] def get_latest_candles(self, timeframe: str, n: int = 1000, save_to_disk: bool = True) -> Optional[pd.DataFrame]: """Get latest N candles from MT5 (lightweight)""" # Phase 1: MT5 Interaction (Protected) rates = None try: with self._lock: if not self.connected and not self.connect(): return None tf_config = get_timeframe_by_name(timeframe) if tf_config is None: return None # TIMEOUT WRAPPER from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout def _fetch_pos(): return mt5.copy_rates_from_pos(self.symbol, tf_config.mt5_timeframe, 0, n) try: t0 = datetime.now() with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(_fetch_pos) rates = future.result(timeout=2.0) # 2s max timeout for live buffer dt = (datetime.now() - t0).total_seconds() * 1000 if dt > 500: logger.warning(f"SLOW MT5 FETCH [{timeframe}]: {dt:.1f}ms") else: logger.debug(f"MT5 fetch [{timeframe}]: {dt:.1f}ms") except FuturesTimeout: logger.warning(f"MT5 get_latest_candles timed out for {timeframe} (10s)") return None except Exception as e: logger.error(f"Error in get_latest_candles: {e}") return None # Phase 2: Processing & IO (Unprotected by MT5 lock) if rates is None or len(rates) == 0: return None try: df = pd.DataFrame(rates) df['time'] = pd.to_datetime(df['time'], unit='s') df = df.rename(columns={'tick_volume': 'volume'}) df = df[["time", "open", "high", "low", "close", "volume", "spread"]] # Save to CSV (Protected by File Lock internally) if save_to_disk: self._save_ohlc(timeframe, df, is_update=True) else: pass # Skipping disk I/O for speed # === DEBUG: EXTRACTION PROBE === if get_debug_manager().is_enabled("EXTRACTION"): get_debug_manager().log("EXTRACTION", f"[{timeframe}] Fetched Buffer", { "Rows": len(df), "Start": str(df['time'].iloc[0]), "End": str(df['time'].iloc[-1]) }) return df except Exception as e: logger.error(f"Error processing latest candles for {timeframe}: {e}") return None
def _save_ohlc(self, timeframe: str, df: pd.DataFrame, is_update: bool): """Save OHLC data to CSV with optimization for updates""" from core.data_validator import get_validator path = self.get_ohlc_path(timeframe) validator = get_validator() with self._file_lock: # OPTIMIZED APPEND PATH - DISABLED for stability (issues with freshness) if False and is_update and path.exists(): try: # 1. Read ONLY the last timestamp (fast) # We assume time is sorted. # Standard pandas read_csv is slow for large files. # For massive files, we should use 'seek' but let's try reading just tail first. try: # Logic: Read last 5 lines to be safe # Use engine='c' for speed, skip lines? No, skipfooter/skiprows logic is messy. # Simple approach: Read generic. If massive, this line is the bottleneck. # BUT, appending implies we trust the file structure. # Better: use partial read if supported, or just trust the append logic? # We must prevent duplicates. # Let's read the full file ONLY if we can't determine the last time easily. # Actually, for 1m data (1M rows), read_csv takes ~200ms. Writing takes longer. # We want to Avoid reading everything if possible. # Peak efficiency: Read existing last timestamp. import csv last_time = None with open(path, 'r', newline='') as f: # Seek to end try: f.seek(0, 2) # End f_len = f.tell() # Backtrack a bit to find last line found = False for i in range(100, 2000, 100): # Dynamic backtrack if i > f_len: i = f_len f.seek(f_len - i) lines = f.readlines() if len(lines) >= 2: # Header + at least one line (or just data) last_line = lines[-1].strip() if last_line: # Parse last line (Time,Open,High,Low,Close,Volume,Spread) parts = last_line.split(',') if len(parts) >= 1: last_time = pd.to_datetime(parts[0]) found = True break if not found: pass # Fallback except Exception: pass # formatting error, fallback if last_time: logger.debug(f"[{timeframe}] File last_time: {last_time}. New data range: {df['time'].min()} - {df['time'].max()}") # Filter NEW data only df_new = df[df['time'] > last_time] if df_new.empty: logger.info(f"[{timeframe}] No new data after filtering (File up to {last_time}, Fetched up to {df['time'].max()})") return # Nothing to append df = df_new if df.empty: return # Nothing to append # Validate ONLY the new chunk df, report = validator.validate_and_clean(df, timeframe) if df.empty: return # Append df.to_csv(path, mode='a', header=False, index=False) # Metadata update try: # Incremental metadata update in DB (Requires custom logic or full re-calc?) # DB expects full candle count. We might need to estimate or just skip count update to save time? # Let's Skip DB update on fast path to avoid overhead, or just update last_date. pass except: pass logger.debug(f"Appended {len(df)} candles to {timeframe}") return except Exception as e: logger.warning(f"Fast append failed for {timeframe}, falling back to full rewrite: {e}") # Fallback to slow path below except Exception: pass # SLOW PATH (Full Write / Initial) if is_update and path.exists(): try: existing_df = pd.read_csv(path, parse_dates=['time']) df = pd.concat([existing_df, df], ignore_index=True) df = df.drop_duplicates(subset=['time'], keep='last') df = df.sort_values('time') except Exception as e: logger.error(f"Error reading existing OHLC for append: {e}") # Validate full dataset df, report = validator.validate_and_clean(df, timeframe) # --- ENFORCE PROFILE LIMITS (TRIMMING) --- df = self._enforce_profile_limits(df, timeframe) if report['rows_removed'] > 0 or report['rows_cleaned'] > 0: logger.info(f"[{timeframe}] Cleaned: removed {report['rows_removed']} rows, fixed {report['rows_cleaned']} values") try: df.to_csv(path, index=False) logger.info(f"Saved OHLC to {path} (Range: {df['time'].min()} - {df['time'].max()})") except Exception as e: logger.error(f"Error saving OHLC to {path}: {e}") return # Save metadata (Slow path only or lightweight update) try: db = get_database() file_size_kb = path.stat().st_size / 1024 db.update_ohlc_metadata( timeframe=timeframe, first_date=df['time'].min(), last_date=df['time'].max(), candle_count=len(df), file_size_kb=file_size_kb, file_path=str(path), source="MT5" ) except Exception: pass def _enforce_profile_limits(self, df: pd.DataFrame, timeframe: str) -> pd.DataFrame: """Trim DataFrame according to active Performance Profile""" if df.empty: return df try: profile = getattr(default_settings, 'performance_profile', 'PRO') limit_year = 2015 # Default if profile == 'BALANCED': limit_year = 2020 elif profile == 'ECO': limit_year = 2023 limit_date = pd.Timestamp(f"{limit_year}-01-01") # Check if trimming needed if df['time'].min() < limit_date: original_len = len(df) df = df[df['time'] >= limit_date] new_len = len(df) if original_len != new_len: logger.info(f"[{timeframe}] Trimmed data for {profile} profile: {original_len} -> {new_len} candles (>= {limit_year})") except Exception as e: logger.error(f"Failed to enforce profile limits: {e}") return df
[docs] def load_ohlc(self, timeframe: str) -> Optional[pd.DataFrame]: """Load OHLC data from CSV""" path = self.get_ohlc_path(timeframe) if not path.exists(): return None with self._file_lock: try: df = pd.read_csv(path, parse_dates=['time']) return df except Exception as e: logger.error(f"Failed to load OHLC for {timeframe}: {e}") return None
[docs] def load_ohlc_buffer(self, timeframe: str, n: int = 5000) -> Optional[pd.DataFrame]: """ Fast disk load: reads only the last N rows of the CSV. Crucial for large 1m files (200MB+) to prevent real-time stalls. """ path = self.get_ohlc_path(timeframe) if not path.exists(): logger.debug(f"OHLC path does not exist: {path}") return None with self._file_lock: try: # ROBUST TAIL READ STRATEGY (O(1) Speed) # Instead of guessing row counts, we read the last X bytes of the file. # 1. Estimate bytes needed: 2000 rows * ~100 bytes = 200KB # Let's read 500KB to be safe. bytes_to_read = n * 150 # 150 bytes avg row size safety margin with open(path, 'rb') as f: # Move to end f.seek(0, 2) size = f.tell() if size < bytes_to_read: # File smaller than buffer, read all f.seek(0) data = f.read() else: # Move back f.seek(-bytes_to_read, 2) # Read to end data = f.read() # Discard the first partial line try: # Try to find first newline first_cal = data.find(b'\n') if first_cal != -1: data = data[first_cal+1:] except: pass from io import BytesIO # We need header names since we skipped the real header header = pd.read_csv(path, nrows=0).columns.tolist() df = pd.read_csv( BytesIO(data), names=header, header=None, parse_dates=['time'] ) return df except Exception as e: logger.debug(f"Exception in load_ohlc_buffer: {e}") logger.error(f"Failed to load OHLC buffer for {timeframe}: {e}") # Fallback to full load if optimized fails return self.load_ohlc(timeframe)
[docs] def get_ohlc_status(self, timeframe: str) -> dict: """Get status of OHLC file for a timeframe""" path = self.get_ohlc_path(timeframe) if not path.exists(): return { "exists": False, "path": str(path), "candles": 0, "from_date": None, "to_date": None, } # We reuse load_ohlc which has the lock df = self.load_ohlc(timeframe) if df is None or df.empty: return { "exists": True, "path": str(path), "candles": 0, "from_date": None, "to_date": None, } return { "exists": True, "path": str(path), "candles": len(df), "from_date": df['time'].min(), "to_date": df['time'].max(), }
[docs] def extract_all(self, is_update: bool = True, from_year: int = None): """Extract OHLC for all timeframes""" from_date = None if from_year: from_date = datetime(from_year, 1, 1) for tf in TIMEFRAMES: self.extract_ohlc(tf.name, is_update=is_update, from_date=from_date)
def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): self.disconnect()
# ───────────────────────────────────────────────────────────────────────────── # SINGLETON ACCESSOR (for use by chart_widget and other modules) # ───────────────────────────────────────────────────────────────────────────── _global_connector: Optional[MT5Connector] = None
[docs] def get_mt5_connector() -> MT5Connector: """Get or create the global MT5Connector instance.""" global _global_connector if _global_connector is None: _global_connector = MT5Connector() return _global_connector