"""
Cerebrum Forex - MT5 Connector
Handles connection to MetaTrader 5 and OHLC data extraction.
"""
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional, Callable, Tuple
import threading
import time
import pandas as pd
try:
import MetaTrader5 as mt5
MT5_AVAILABLE = True
except ImportError:
MT5_AVAILABLE = False
from config.settings import (
TIMEFRAMES,
OHLC_DIR,
get_timeframe_by_name,
TimeframeConfig,
default_settings,
)
from database import get_database
from core.debug_manager import get_debug_manager
from core.sentinel import get_sentinel
logger = logging.getLogger(__name__)
[docs]
class MT5Connector:
"""MetaTrader 5 connector for OHLC data extraction"""
def __init__(self, symbol: str = "EURUSD"):
self.symbol = symbol
self.connected = False
self._callbacks: list[Callable] = []
# Locks for thread safety
self._lock = threading.RLock() # For MT5 connection/API
self._file_lock = threading.Lock() # For file I/O (CSV)
# Paths (Populated on connect)
self.terminal_path: Optional[str] = None
self.data_path: Optional[str] = None
# Ensure OHLC directory exists
OHLC_DIR.mkdir(parents=True, exist_ok=True)
[docs]
def add_callback(self, callback: Callable):
"""Add callback for status updates"""
self._callbacks.append(callback)
def _notify(self, event: str, data: dict):
"""Notify all callbacks"""
for callback in self._callbacks:
try:
callback(event, data)
except Exception as e:
logger.error(f"Callback error: {e}")
[docs]
def close_all_positions(self) -> Tuple[int, int]:
"""
PANIC BUTTON: Close all open positions immediately.
Returns: (closed_count, error_count)
"""
closed = 0
errors = 0
with self._lock:
if not self.connected and not self.connect():
logger.error("Cannot close positions: MT5 disonnected")
return 0, 0
positions = mt5.positions_get(symbol=self.symbol)
if not positions:
logger.info("No open positions to close.")
return 0, 0
logger.warning(f"π¨ PANIC CLOSE INITIATED: Closing {len(positions)} positions...")
for pos in positions:
# Close Logic: Send opposite order
order_type = mt5.ORDER_TYPE_SELL if pos.type == mt5.ORDER_TYPE_BUY else mt5.ORDER_TYPE_BUY
price = mt5.symbol_info_tick(self.symbol).bid if order_type == mt5.ORDER_TYPE_SELL else mt5.symbol_info_tick(self.symbol).ask
request = {
"action": mt5.TRADE_ACTION_DEAL,
"symbol": self.symbol,
"volume": pos.volume,
"type": order_type,
"position": pos.ticket,
"price": price,
"deviation": 20,
"magic": 234000,
"comment": "panic_close",
"type_time": mt5.ORDER_TIME_GTC,
"type_filling": mt5.ORDER_FILLING_IOC,
}
result = mt5.order_send(request)
if result.retcode != mt5.TRADE_RETCODE_DONE:
logger.error(f"Failed to close ticket {pos.ticket}: {result.comment}")
errors += 1
else:
logger.info(f"Closed ticket {pos.ticket} ({pos.profit:.2f})")
closed += 1
return closed, errors
[docs]
def connect(self, timeout: float = 5.0) -> bool:
"""Connect to MT5 terminal with timeout protection"""
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
if self.connected:
return True
with self._lock:
if not MT5_AVAILABLE:
logger.error("MetaTrader5 package not installed")
return False
# Use timeout to prevent infinite blocking
def _init_mt5():
path = getattr(default_settings, 'mt5_path', "")
if path:
self.terminal_path = path
return mt5.initialize(path=path)
success = mt5.initialize()
if success:
info = mt5.terminal_info()
if info:
self.terminal_path = info.path
self.data_path = info.data_path
return success
def _resolve_symbol(target: str) -> str:
# 1. Direct check
if mt5.symbol_info(target): return target
# 2. Search for matches (e.g. EURUSD.m, EURUSD+, EURUSDpro)
symbols = mt5.symbols_get()
if not symbols:
logger.error(f"Failed to get symbols from MT5: {mt5.last_error()}")
return target
# Filter for base match
matches = [s.name for s in symbols if target in s.name and "EUR" in s.name and "USD" in s.name]
logger.info(f"Candidate symbols for {target}: {matches}")
if matches:
# Sort by length (prefer EURUSD over EURUSD.pro)
matches.sort(key=len)
# If multiple, prefer the ones with suffixes commonly used by brokers
# But for now shortest is usually safest UNLESS it's empty.
logger.info(f"Symbol {target} resolved to {matches[0]}")
return matches[0]
return target
try:
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(_init_mt5)
result = future.result(timeout=timeout)
if result:
logger.info("Connected to MT5")
# Capture terminal info for diagnostics
info = mt5.terminal_info()
if info:
self.terminal_path = info.path
self.data_path = info.data_path
logger.info(f"MT5 Terminal Path: {self.terminal_path}")
logger.info(f"MT5 Data Path: {self.data_path}")
get_debug_manager().log("EXTRACTION", "β
MT5 CONNECT SUCCESS", {"symbol": self.symbol})
else:
err = mt5.last_error()
# AUTO-RECOVERY: If path not set, try to find it
current_path = getattr(default_settings, 'mt5_path', "")
if not current_path:
logger.warning("Standard MT5 init failed. Attempting deep search for terminal64.exe...")
found_path = self._auto_discover_terminal()
if found_path:
logger.info(f"Auto-discovered MT5 at: {found_path}")
if mt5.initialize(path=found_path):
logger.info("Connected to MT5 via auto-discovery")
self.terminal_path = found_path
return True
logger.error(f"MT5 initialization failed: {err}")
get_debug_manager().log("EXTRACTION", "β MT5 CONNECT FAILED", str(err))
return False
# Resolve symbol DYNAMICALLY
resolved = _resolve_symbol(self.symbol)
if resolved != self.symbol:
logger.warning(f"Symbol auto-resolved: '{self.symbol}' -> '{resolved}'")
self.symbol = resolved
# Update settings to persist this? Maybe just runtime for now.
except FuturesTimeout:
logger.error(f"MT5 connection timed out after {timeout}s (MT5 may be unresponsive)")
# Index DX-100: Connection Recovery
get_sentinel(mt5=self).repair("DX-100")
return False
except Exception as e:
logger.error(f"MT5 connection error: {e}")
get_sentinel(mt5=self).repair("DX-100")
return False
finally:
# Use wait=False to prevent hanging the main thread if mt5.initialize is stuck in C-code
executor.shutdown(wait=False)
self.connected = True
# Ensure symbol is selected in Market Watch
if not mt5.symbol_select(self.symbol, True):
logger.warning(f"Failed to select symbol {self.symbol}, trying to proceed anyway...")
logger.info("Connected to MT5")
self._notify("connected", {"status": True})
return True
[docs]
def check_health(self) -> tuple[bool, str]:
"""Detailed connection diagnostic"""
if not MT5_AVAILABLE:
return False, "MetaTrader5 package not installed. Try: pip install MetaTrader5"
with self._lock:
# 1. Try standard connect
if self.connect():
return True, "Successfully connected to MetaTrader 5."
# 2. Get last error
err = mt5.last_error()
# 3. Specific diagnostics
if err[0] == -1:
# Search for terminal
t_path = self._auto_discover_terminal()
if not t_path:
return False, "MT5 Terminal NOT FOUND. Please ensure MetaTrader 5 is installed or set the path in Settings."
else:
return False, f"MT5 found at {t_path}, but failed to initialize. Try running MT5 as Administrator."
if err[0] == -10004:
return False, "MT5 connection failed: IPC (Inter-Process Communication) error. Restart your terminal."
return False, f"MT5 Connection Failed: {err[1]} (Code: {err[0]})"
def _auto_discover_terminal(self) -> Optional[str]:
"""Deep search for MT5 terminal executable"""
common_paths = [
Path("C:/Program Files/MetaTrader 5/terminal64.exe"),
Path("C:/Program Files (x86)/MetaTrader 5/terminal64.exe"),
]
# Check standard paths first
for p in common_paths:
if p.exists(): return str(p)
# Search in Program Files for ANY MT5 terminal
try:
for pf in [Path("C:/Program Files"), Path("C:/Program Files (x86)")]:
if not pf.exists(): continue
# Look for terminal64.exe in any subfolder containing 'MetaTrader'
for folder in pf.glob("*MetaTrader*"):
t_app = folder / "terminal64.exe"
if t_app.exists(): return str(t_app)
except:
pass
return None
def get_terminal_data_path(self) -> Optional[str]:
"""Robustly get the MT5 Data Path (for MQL5/Files)"""
if self.data_path: return self.data_path
with self._lock:
# We assume connect() or similar already initialized mt5
try:
info = mt5.terminal_info()
if info:
self.data_path = info.data_path
return self.data_path
except:
pass
return None
[docs]
def disconnect(self):
"""Disconnect from MT5"""
with self._lock:
if MT5_AVAILABLE and self.connected:
mt5.shutdown()
self.connected = False
logger.info("Disconnected from MT5")
self._notify("disconnected", {"status": True})
[docs]
def test_connection(self) -> bool:
"""Test MT5 connection"""
with self._lock:
if not self.connect():
return False
# Try to get symbol info
if MT5_AVAILABLE:
info = mt5.symbol_info(self.symbol)
if info is None:
logger.error(f"Symbol {self.symbol} not found")
self.disconnect()
return False
self.disconnect()
return True
[docs]
def preload_history(self, timeframe: str = None) -> dict:
"""
Force MT5 to download maximum available history for timeframes.
This triggers the broker server to send all cached historical data.
Args:
timeframe: Specific timeframe to preload, or None for all
Returns:
dict with preload results per timeframe
"""
if not MT5_AVAILABLE:
return {"error": "MT5 not available"}
results = {}
with self._lock:
if not self.connected and not self.connect():
return {"error": "Failed to connect to MT5"}
# Ensure symbol is selected in Market Watch (triggers data sync)
mt5.symbol_select(self.symbol, True)
# Get timeframes to preload
if timeframe:
timeframes = [timeframe]
else:
# All timeframes from config
timeframes = [tf.name for tf in TIMEFRAMES]
for tf_name in timeframes:
tf_config = get_timeframe_by_name(tf_name)
if tf_config is None:
continue
try:
# Request maximum bars (500,000) to force full history download
# MT5 will return what the broker allows
logger.info(f"[PRELOAD] Requesting max history for {tf_name}...")
# copy_rates_from_pos with very large count forces download
rates = mt5.copy_rates_from_pos(
self.symbol,
tf_config.mt5_timeframe,
0, # Start from most recent
500000 # Request 500k bars (MT5 will cap to available)
)
if rates is not None and len(rates) > 0:
# Convert to get date range
first_date = pd.to_datetime(rates[0]['time'], unit='s')
last_date = pd.to_datetime(rates[-1]['time'], unit='s')
results[tf_name] = {
"success": True,
"bars": len(rates),
"from": first_date.strftime("%Y-%m-%d"),
"to": last_date.strftime("%Y-%m-%d")
}
logger.info(f"[PRELOAD] {tf_name}: {len(rates)} bars ({first_date.date()} to {last_date.date()})")
else:
error = mt5.last_error()
results[tf_name] = {"success": False, "error": str(error)}
logger.warning(f"[PRELOAD] {tf_name}: Failed - {error}")
except Exception as e:
results[tf_name] = {"success": False, "error": str(e)}
logger.error(f"[PRELOAD] {tf_name}: Exception - {e}")
logger.info(f"[PRELOAD] History preload complete: {sum(1 for r in results.values() if r.get('success'))}/{len(results)} timeframes loaded")
return results
[docs]
def get_terminal_data_path(self) -> Optional[str]:
"""Get MT5 terminal data path (AppData directory)"""
with self._lock:
if not self.connected and not self.connect():
return None
if MT5_AVAILABLE:
info = mt5.terminal_info()
if not info:
# Attempt re-init
mt5.initialize()
info = mt5.terminal_info()
if info:
return info.data_path
return None
[docs]
def get_current_server_time(self) -> datetime:
"""
Get the ABSOLUTE LATEST server time from the last tick/quote.
This represents 'NOW' on the server.
"""
if not self.connected and not self.connect():
return datetime.now() # Fallback
try:
# Fast check via symbol tick
tick = mt5.symbol_info_tick(self.symbol)
if tick:
return datetime.fromtimestamp(tick.time)
# Slower check via symbol info
info = mt5.symbol_info(self.symbol)
if info:
return datetime.fromtimestamp(info.time)
except Exception:
pass
return datetime.now() # Absolute fallback
[docs]
def get_server_time(self) -> Optional[datetime]:
"""Get current server time (Legacy/Optional Wrapper)"""
return self.get_current_server_time()
[docs]
def get_time_offset(self) -> float:
"""
Calculate seconds offset between Local Time and Server Time.
Positive = Local is ahead of Server.
Negative = Local is behind Server.
"""
server_time = self.get_server_time()
if not server_time:
return 0.0
local_time = datetime.now()
# Round to minutes to avoid seconds-level jitter
delta = (local_time - server_time).total_seconds()
return delta
[docs]
def get_account_info(self) -> Optional[dict]:
"""Get MT5 account information (Non-blocking for UI)"""
# Try to acquire lock with short timeout to prevent UI freeze
if not self._lock.acquire(timeout=0.05):
return None
try:
if not MT5_AVAILABLE or not self.connected:
return None
info = mt5.account_info()
if info is None:
return None
# Fetch Open Positions
positions = mt5.positions_get()
pos_list = []
if positions:
for pos in positions:
pos_list.append({
"ticket": pos.ticket,
"symbol": pos.symbol,
"type": pos.type, # 0=Buy, 1=Sell
"volume": pos.volume,
"price_open": pos.price_open,
"profit": pos.profit,
"comment": pos.comment
})
# Return Dictionary compatible with UI
return {
"login": info.login,
"name": info.name,
"server": info.server,
"currency": info.currency,
"company": info.company,
"leverage": info.leverage,
"balance": info.balance,
"equity": info.equity,
"profit": info.profit,
"margin": info.margin,
"margin_free": info.margin_free,
"margin_level": info.margin_level,
"trade_mode": info.trade_mode,
"positions": pos_list
}
finally:
self._lock.release()
[docs]
def get_market_status(self) -> dict:
"""
Check real-time market status for the current symbol via MT5.
Returns a dict with 'status', 'description', and 'is_open' boolean.
"""
# Try to acquire lock with short timeout to prevent UI freeze
if not self._lock.acquire(timeout=0.05):
return {"status": "CHECKING", "is_open": False, "description": "Terminal Busy"}
try:
if not MT5_AVAILABLE or not self.connected:
return {"status": "OFFLINE", "is_open": False, "description": "MT5 Not Connected"}
# 1. Get Symbol Info
info = mt5.symbol_info(self.symbol)
if info is None:
return {"status": "ERROR", "is_open": False, "description": "Symbol Not Found"}
# 2. Check Trade Mode
# trade_mode: 0=Disabled, 1=LongOnly, 2=ShortOnly, 3=CloseOnly, 4=Full
if info.trade_mode == 0:
return {"status": "CLOSED", "is_open": False, "description": "Trading Disabled"}
# 3. Check Session Status
# Check for holiday/weekend via session gaps
from datetime import datetime, timezone
now_utc = datetime.now(timezone.utc)
day_of_week = now_utc.weekday() # 0-6
# Market check (Sunday 22:00 UTC to Friday 22:00 UTC)
if day_of_week == 5: # Saturday
return {"status": "CLOSED", "is_open": False, "description": "Weekend (Saturday)"}
if day_of_week == 6 and now_utc.hour < 22: # Sunday before 22:00
return {"status": "CLOSED", "is_open": False, "description": "Weekend (Sunday)"}
if day_of_week == 4 and now_utc.hour >= 22: # Friday after 22:00
return {"status": "CLOSED", "is_open": False, "description": "Market Closed (Friday)"}
# 4. Check quote freshness (Active Ticks)
tick = mt5.symbol_info_tick(self.symbol)
if tick:
tick_time = datetime.fromtimestamp(tick.time)
server_time = self.get_current_server_time()
age_seconds = (server_time - tick_time).total_seconds()
# If ticks are very old during a weekday, it's likely a bank holiday where MT5 is up but symbol is paused
if age_seconds > 3600:
return {"status": "HOLIDAY", "is_open": False, "description": "Market Inactive (Holiday?)"}
return {
"status": "OPEN",
"is_open": True,
"description": "Trading Active",
"server_time": server_time.strftime("%H:%M:%S")
}
return {"status": "CLOSED", "is_open": False, "description": "No Quote Signal"}
except Exception as e:
return {"status": "ERROR", "is_open": False, "description": str(e)}
finally:
self._lock.release()
[docs]
def get_ohlc_path(self, timeframe: str) -> Path:
"""Get path for OHLC CSV file"""
return OHLC_DIR / f"ohlc_{self.symbol}_{timeframe}.csv"
[docs]
def update_all_ohlc(self, timeframes: list = None, callback: callable = None) -> dict:
"""
Bulk update OHLC for all (or specified) timeframes.
This is a DEDICATED extraction phase that should be called BEFORE training,
so that training threads never need to wait for MT5 IPC.
Args:
timeframes: List of TF names to update. Defaults to all standard TFs.
callback: Optional callback(tf, status, msg) for progress updates.
Returns:
Dict with {timeframe: candle_count or error}
"""
from config.settings import TIMEFRAME_NAMES
if timeframes is None:
timeframes = list(TIMEFRAME_NAMES)
results = {}
logger.info(f"π₯ [Bulk OHLC] Starting extraction for {len(timeframes)} timeframes...")
for tf in timeframes:
try:
if callback:
callback(tf, "active", f"Updating {tf}...")
# Check if update is needed (fast file check)
ohlc_path = self.get_ohlc_path(tf)
needs_update = True
if ohlc_path.exists():
# Check file modification time vs server time
import os
file_mtime = datetime.fromtimestamp(os.path.getmtime(ohlc_path))
server_time = self.get_current_server_time()
# Freshness thresholds (in minutes)
freshness = {"1m": 5, "5m": 10, "15m": 20, "30m": 40, "1h": 70, "4h": 250}
limit_mins = freshness.get(tf, 60)
age_mins = (server_time - file_mtime).total_seconds() / 60
# Default: Fresh
is_fresh = age_mins < limit_mins
# CHECK PROFILE CONSISTENCY (Force update if data is too old/wrong for profile)
if is_fresh:
try:
# Read first line to check start date
with open(ohlc_path, 'r') as f:
header = f.readline() # Header
first = f.readline() # First row
if first:
date_str = first.split(',')[0]
first_date = pd.to_datetime(date_str)
# Get limit
profile = getattr(default_settings, 'performance_profile', 'PRO')
required_year = 2015
if profile == 'BALANCED': required_year = 2020
elif profile == 'ECO': required_year = 2023
# If file starts BEFORE required year (e.g. 2015 < 2020), we must update/trim
# If file starts significantly AFTER required year (e.g. 2026 > 2020), we must update/fetch more
if first_date.year < required_year:
logger.info(f"[{tf}] File trimming required ({first_date.year} < {required_year}). Forcing update.")
is_fresh = False
elif first_date.year > required_year and required_year < 2024:
# Only complain if gap is large (>1 year)?
# No, let's keep it simple. If we want 2020 and have 2026, we fetch.
# But avoid spamming if broker limit (1m).
if tf not in ['1m', '5m', '15m']: # Ignore low TFs broker limits
logger.info(f"[{tf}] Missing history ({first_date.year} > {required_year}). Forcing update.")
is_fresh = False
except Exception as e:
logger.warning(f"Could not verify file content for {tf}: {e}")
is_fresh = False
if is_fresh:
logger.info(f"[{tf}] OHLC is fresh ({age_mins:.0f}m < {limit_mins}m), skipping extraction.")
results[tf] = "fresh"
if callback:
callback(tf, "complete", "Fresh")
needs_update = False
if needs_update:
df = self.extract_ohlc(tf, is_update=True)
if df is not None:
results[tf] = len(df)
logger.info(f"[{tf}] OHLC updated: {len(df)} candles.")
if callback:
callback(tf, "complete", f"{len(df):,}")
else:
results[tf] = "no_new_data"
if callback:
callback(tf, "complete", "Up-to-date")
except Exception as e:
logger.error(f"[{tf}] Bulk OHLC update failed: {e}")
results[tf] = f"error: {e}"
if callback:
callback(tf, "error", str(e))
logger.info(f"π₯ [Bulk OHLC] Complete. Results: {results}")
return results
[docs]
def get_buffer(self, timeframe: str, n: int = 2000) -> Optional[pd.DataFrame]:
"""
Get latest N candles directly for In-Memory prediction (FAST BUFFER).
NOTE: This is NOT for training. It fetches a small buffer (e.g. 2000 candles)
to allow rapid indicator calculation for real-time signals.
CRITICAL OPTIMIZATION: save_to_disk=False
We do NOT update the massive CSV on every tick.
TrainingManager handles historical updates ("Smart Update").
Prediction only needs the in-memory data.
"""
return self.get_latest_candles(timeframe, n=n, save_to_disk=False)
[docs]
def get_latest_candles(self, timeframe: str, n: int = 1000, save_to_disk: bool = True) -> Optional[pd.DataFrame]:
"""Get latest N candles from MT5 (lightweight)"""
# Phase 1: MT5 Interaction (Protected)
rates = None
try:
with self._lock:
if not self.connected and not self.connect():
return None
tf_config = get_timeframe_by_name(timeframe)
if tf_config is None: return None
# TIMEOUT WRAPPER
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout
def _fetch_pos():
return mt5.copy_rates_from_pos(self.symbol, tf_config.mt5_timeframe, 0, n)
try:
t0 = datetime.now()
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(_fetch_pos)
rates = future.result(timeout=2.0) # 2s max timeout for live buffer
dt = (datetime.now() - t0).total_seconds() * 1000
if dt > 500:
logger.warning(f"SLOW MT5 FETCH [{timeframe}]: {dt:.1f}ms")
else:
logger.debug(f"MT5 fetch [{timeframe}]: {dt:.1f}ms")
except FuturesTimeout:
logger.warning(f"MT5 get_latest_candles timed out for {timeframe} (10s)")
return None
except Exception as e:
logger.error(f"Error in get_latest_candles: {e}")
return None
# Phase 2: Processing & IO (Unprotected by MT5 lock)
if rates is None or len(rates) == 0:
return None
try:
df = pd.DataFrame(rates)
df['time'] = pd.to_datetime(df['time'], unit='s')
df = df.rename(columns={'tick_volume': 'volume'})
df = df[["time", "open", "high", "low", "close", "volume", "spread"]]
# Save to CSV (Protected by File Lock internally)
if save_to_disk:
self._save_ohlc(timeframe, df, is_update=True)
else:
pass # Skipping disk I/O for speed
# === DEBUG: EXTRACTION PROBE ===
if get_debug_manager().is_enabled("EXTRACTION"):
get_debug_manager().log("EXTRACTION", f"[{timeframe}] Fetched Buffer", {
"Rows": len(df),
"Start": str(df['time'].iloc[0]),
"End": str(df['time'].iloc[-1])
})
return df
except Exception as e:
logger.error(f"Error processing latest candles for {timeframe}: {e}")
return None
def _save_ohlc(self, timeframe: str, df: pd.DataFrame, is_update: bool):
"""Save OHLC data to CSV with optimization for updates"""
from core.data_validator import get_validator
path = self.get_ohlc_path(timeframe)
validator = get_validator()
with self._file_lock:
# OPTIMIZED APPEND PATH - DISABLED for stability (issues with freshness)
if False and is_update and path.exists():
try:
# 1. Read ONLY the last timestamp (fast)
# We assume time is sorted.
# Standard pandas read_csv is slow for large files.
# For massive files, we should use 'seek' but let's try reading just tail first.
try:
# Logic: Read last 5 lines to be safe
# Use engine='c' for speed, skip lines? No, skipfooter/skiprows logic is messy.
# Simple approach: Read generic. If massive, this line is the bottleneck.
# BUT, appending implies we trust the file structure.
# Better: use partial read if supported, or just trust the append logic?
# We must prevent duplicates.
# Let's read the full file ONLY if we can't determine the last time easily.
# Actually, for 1m data (1M rows), read_csv takes ~200ms. Writing takes longer.
# We want to Avoid reading everything if possible.
# Peak efficiency: Read existing last timestamp.
import csv
last_time = None
with open(path, 'r', newline='') as f:
# Seek to end
try:
f.seek(0, 2) # End
f_len = f.tell()
# Backtrack a bit to find last line
found = False
for i in range(100, 2000, 100): # Dynamic backtrack
if i > f_len: i = f_len
f.seek(f_len - i)
lines = f.readlines()
if len(lines) >= 2: # Header + at least one line (or just data)
last_line = lines[-1].strip()
if last_line:
# Parse last line (Time,Open,High,Low,Close,Volume,Spread)
parts = last_line.split(',')
if len(parts) >= 1:
last_time = pd.to_datetime(parts[0])
found = True
break
if not found:
pass # Fallback
except Exception:
pass # formatting error, fallback
if last_time:
logger.debug(f"[{timeframe}] File last_time: {last_time}. New data range: {df['time'].min()} - {df['time'].max()}")
# Filter NEW data only
df_new = df[df['time'] > last_time]
if df_new.empty:
logger.info(f"[{timeframe}] No new data after filtering (File up to {last_time}, Fetched up to {df['time'].max()})")
return # Nothing to append
df = df_new
if df.empty:
return # Nothing to append
# Validate ONLY the new chunk
df, report = validator.validate_and_clean(df, timeframe)
if df.empty:
return
# Append
df.to_csv(path, mode='a', header=False, index=False)
# Metadata update
try:
# Incremental metadata update in DB (Requires custom logic or full re-calc?)
# DB expects full candle count. We might need to estimate or just skip count update to save time?
# Let's Skip DB update on fast path to avoid overhead, or just update last_date.
pass
except: pass
logger.debug(f"Appended {len(df)} candles to {timeframe}")
return
except Exception as e:
logger.warning(f"Fast append failed for {timeframe}, falling back to full rewrite: {e}")
# Fallback to slow path below
except Exception:
pass
# SLOW PATH (Full Write / Initial)
if is_update and path.exists():
try:
existing_df = pd.read_csv(path, parse_dates=['time'])
df = pd.concat([existing_df, df], ignore_index=True)
df = df.drop_duplicates(subset=['time'], keep='last')
df = df.sort_values('time')
except Exception as e:
logger.error(f"Error reading existing OHLC for append: {e}")
# Validate full dataset
df, report = validator.validate_and_clean(df, timeframe)
# --- ENFORCE PROFILE LIMITS (TRIMMING) ---
df = self._enforce_profile_limits(df, timeframe)
if report['rows_removed'] > 0 or report['rows_cleaned'] > 0:
logger.info(f"[{timeframe}] Cleaned: removed {report['rows_removed']} rows, fixed {report['rows_cleaned']} values")
try:
df.to_csv(path, index=False)
logger.info(f"Saved OHLC to {path} (Range: {df['time'].min()} - {df['time'].max()})")
except Exception as e:
logger.error(f"Error saving OHLC to {path}: {e}")
return
# Save metadata (Slow path only or lightweight update)
try:
db = get_database()
file_size_kb = path.stat().st_size / 1024
db.update_ohlc_metadata(
timeframe=timeframe,
first_date=df['time'].min(),
last_date=df['time'].max(),
candle_count=len(df),
file_size_kb=file_size_kb,
file_path=str(path),
source="MT5"
)
except Exception: pass
def _enforce_profile_limits(self, df: pd.DataFrame, timeframe: str) -> pd.DataFrame:
"""Trim DataFrame according to active Performance Profile"""
if df.empty: return df
try:
profile = getattr(default_settings, 'performance_profile', 'PRO')
limit_year = 2015 # Default
if profile == 'BALANCED': limit_year = 2020
elif profile == 'ECO': limit_year = 2023
limit_date = pd.Timestamp(f"{limit_year}-01-01")
# Check if trimming needed
if df['time'].min() < limit_date:
original_len = len(df)
df = df[df['time'] >= limit_date]
new_len = len(df)
if original_len != new_len:
logger.info(f"[{timeframe}] Trimmed data for {profile} profile: {original_len} -> {new_len} candles (>= {limit_year})")
except Exception as e:
logger.error(f"Failed to enforce profile limits: {e}")
return df
[docs]
def load_ohlc(self, timeframe: str) -> Optional[pd.DataFrame]:
"""Load OHLC data from CSV"""
path = self.get_ohlc_path(timeframe)
if not path.exists():
return None
with self._file_lock:
try:
df = pd.read_csv(path, parse_dates=['time'])
return df
except Exception as e:
logger.error(f"Failed to load OHLC for {timeframe}: {e}")
return None
[docs]
def load_ohlc_buffer(self, timeframe: str, n: int = 5000) -> Optional[pd.DataFrame]:
"""
Fast disk load: reads only the last N rows of the CSV.
Crucial for large 1m files (200MB+) to prevent real-time stalls.
"""
path = self.get_ohlc_path(timeframe)
if not path.exists():
logger.debug(f"OHLC path does not exist: {path}")
return None
with self._file_lock:
try:
# ROBUST TAIL READ STRATEGY (O(1) Speed)
# Instead of guessing row counts, we read the last X bytes of the file.
# 1. Estimate bytes needed: 2000 rows * ~100 bytes = 200KB
# Let's read 500KB to be safe.
bytes_to_read = n * 150 # 150 bytes avg row size safety margin
with open(path, 'rb') as f:
# Move to end
f.seek(0, 2)
size = f.tell()
if size < bytes_to_read:
# File smaller than buffer, read all
f.seek(0)
data = f.read()
else:
# Move back
f.seek(-bytes_to_read, 2)
# Read to end
data = f.read()
# Discard the first partial line
try:
# Try to find first newline
first_cal = data.find(b'\n')
if first_cal != -1:
data = data[first_cal+1:]
except: pass
from io import BytesIO
# We need header names since we skipped the real header
header = pd.read_csv(path, nrows=0).columns.tolist()
df = pd.read_csv(
BytesIO(data),
names=header,
header=None,
parse_dates=['time']
)
return df
except Exception as e:
logger.debug(f"Exception in load_ohlc_buffer: {e}")
logger.error(f"Failed to load OHLC buffer for {timeframe}: {e}")
# Fallback to full load if optimized fails
return self.load_ohlc(timeframe)
[docs]
def get_ohlc_status(self, timeframe: str) -> dict:
"""Get status of OHLC file for a timeframe"""
path = self.get_ohlc_path(timeframe)
if not path.exists():
return {
"exists": False,
"path": str(path),
"candles": 0,
"from_date": None,
"to_date": None,
}
# We reuse load_ohlc which has the lock
df = self.load_ohlc(timeframe)
if df is None or df.empty:
return {
"exists": True,
"path": str(path),
"candles": 0,
"from_date": None,
"to_date": None,
}
return {
"exists": True,
"path": str(path),
"candles": len(df),
"from_date": df['time'].min(),
"to_date": df['time'].max(),
}
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# SINGLETON ACCESSOR (for use by chart_widget and other modules)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
_global_connector: Optional[MT5Connector] = None
[docs]
def get_mt5_connector() -> MT5Connector:
"""Get or create the global MT5Connector instance."""
global _global_connector
if _global_connector is None:
_global_connector = MT5Connector()
return _global_connector