"""
Cerebrum Forex - Feature Engine
Calculate technical indicators from OHLC data.
"""
import logging
from typing import Optional
import pandas as pd
import numpy as np
try:
import ta
TA_AVAILABLE = True
except ImportError:
TA_AVAILABLE = False
try:
from core.economic_calendar import get_economic_calendar
CALENDAR_AVAILABLE = True
except ImportError:
CALENDAR_AVAILABLE = False
logger = logging.getLogger(__name__)
[docs]
class FeatureEngine:
"""Calculate technical indicators and features for ML models"""
def __init__(self):
if not TA_AVAILABLE:
logger.warning("ta library not available, some indicators will be missing")
[docs]
def calculate_all_features(self, df: pd.DataFrame, timeframe: str = "1h", progress_callback=None, is_training: bool = False) -> pd.DataFrame:
"""
Calculate all technical indicators with Turbo I/O (Parquet Caching).
Args:
df: Input OHLC DataFrame
timeframe: Current timeframe (for cache naming)
progress_callback: Optional UI status function
is_training: If True, bypasses 5000 row limit and uses caching
Returns:
DataFrame with 1000+ calculated features
"""
if df.empty:
return df
# --- BLOCK 1: TURBO I/O CACHE CHECK ---
# We only cache during training or for large datasets to save disk I/O on small requests
if is_training:
from config.settings import FEATURES_DIR
import hashlib
# Generate a unique hash based on the input data and timeframe
# This ensures that if the OHLC changes, the cache is invalidated automatically.
data_hash = hashlib.md5(pd.util.hash_pandas_object(df, index=True).values).hexdigest()
cache_file = FEATURES_DIR / f"features_{timeframe}_{data_hash}.parquet"
if cache_file.exists():
if progress_callback: progress_callback(f" 🚀 Turbo I/O: Loading {timeframe} features from cache...")
try:
cached_df = pd.read_parquet(cache_file)
logger.info(f"[{timeframe}] Turbo I/O: Cache Hit! ({len(cached_df)} rows)")
return cached_df
except Exception as e:
logger.warning(f"[{timeframe}] Cache read failed: {e}. Recalculating...")
# --- BLOCK 2: DATA PREPARATION ---
# OPTIMIZATION: We must copy once to avoid Touching Original
df = df.copy()
# Ensure numeric types for calculation robustness
cols_to_numeric = ['open', 'high', 'low', 'close', 'volume', 'spread']
for col in cols_to_numeric:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# SAFEGUARD: Adaptive history capping based on Profile
if not is_training:
from config.settings import default_settings
profile = getattr(default_settings, "performance_profile", "BALANCED")
# Map profiles to row limits
if profile == "ECO": max_rows = 500
elif profile == "PRO": max_rows = 5000
else: max_rows = 1500 # BALANCED
if len(df) > max_rows:
df = df.tail(max_rows).reset_index(drop=True)
logger.debug(f"FeatureEngine enforced profile ({profile}) limit: {max_rows} rows")
# --- BLOCK 3: MASSIVE INDICATOR CALCULATION ---
# Calculate indicators IN-PLACE to save memory
if progress_callback: progress_callback(" ⚙️ Trend Indicators...")
self._add_trend_indicators(df)
if progress_callback: progress_callback(" ⚙️ Momentum Indicators...")
self._add_momentum_indicators(df)
if progress_callback: progress_callback(" ⚙️ Volatility Indicators...")
self._add_volatility_indicators(df)
if progress_callback: progress_callback(" ⚙️ Volume Indicators...")
self._add_volume_indicators(df)
if progress_callback: progress_callback(" ⚙️ Custom Features...")
self._add_custom_features(df)
if progress_callback: progress_callback(" ⚙️ Noble KPI Features...")
df = self._add_noble_kpi_features(df, timeframe) # KPI might return new DF due to Noble logic, safer to keep assignment
# Add economic calendar features
if progress_callback: progress_callback(" ⚙️ Calendar Features...")
self._add_calendar_features(df)
logger.debug(f"Features calculated: {len(df.columns)} columns for {len(df)} rows")
# --- BLOCK 4: CLEANUP & CACHE SAVE ---
# Drop NaN rows (common at the start of technical indicator arrays)
df.dropna(inplace=True)
# Store valid row count for logging
logger.debug(f"After dropna: {len(df)} rows remain")
if is_training:
try:
if progress_callback: progress_callback(f" 💾 Turbo I/O: Saving {timeframe} features to cache...")
# Save as Parquet for high-speed binary I/O
df.to_parquet(cache_file, compression='snappy')
logger.info(f"[{timeframe}] Turbo I/O: Features cached successfully.")
except Exception as e:
logger.error(f"[{timeframe}] Failed to save cache: {e}")
return df
def _add_trend_indicators(self, df: pd.DataFrame):
"""Add trend indicators (In-Place)"""
close = df['close']
high = df['high']
low = df['low']
# Simple Moving Averages
df['sma_5'] = close.rolling(window=5).mean()
df['sma_10'] = close.rolling(window=10).mean()
df['sma_20'] = close.rolling(window=20).mean()
df['sma_50'] = close.rolling(window=50).mean()
df['sma_100'] = close.rolling(window=100).mean()
df['sma_200'] = close.rolling(window=200).mean()
# Exponential Moving Averages
df['ema_5'] = close.ewm(span=5, adjust=False).mean()
df['ema_10'] = close.ewm(span=10, adjust=False).mean()
df['ema_20'] = close.ewm(span=20, adjust=False).mean()
df['ema_50'] = close.ewm(span=50, adjust=False).mean()
# MACD
ema_12 = close.ewm(span=12, adjust=False).mean()
ema_26 = close.ewm(span=26, adjust=False).mean()
df['macd'] = ema_12 - ema_26
df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean()
df['macd_histogram'] = df['macd'] - df['macd_signal']
# ADX (Average Directional Index)
if TA_AVAILABLE:
try:
adx_indicator = ta.trend.ADXIndicator(high, low, close, window=14)
df['adx'] = adx_indicator.adx()
df['adx_pos'] = adx_indicator.adx_pos()
df['adx_neg'] = adx_indicator.adx_neg()
except Exception:
# Fallback to manual if TA fails
self._calculate_manual_adx(df, high, low, close)
else:
# Manual fallback
self._calculate_manual_adx(df, high, low, close)
# Ichimoku Cloud
# Ichimoku Cloud
if TA_AVAILABLE:
try:
ichimoku = ta.trend.IchimokuIndicator(high, low)
df['ichimoku_a'] = ichimoku.ichimoku_a()
df['ichimoku_b'] = ichimoku.ichimoku_b()
df['ichimoku_base'] = ichimoku.ichimoku_base_line()
df['ichimoku_conv'] = ichimoku.ichimoku_conversion_line()
except Exception:
self._calculate_manual_ichimoku(df, high, low)
else:
self._calculate_manual_ichimoku(df, high, low)
def _add_momentum_indicators(self, df: pd.DataFrame):
"""Add momentum indicators (In-Place)"""
close = df['close']
high = df['high']
low = df['low']
# RSI
delta = close.diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.ewm(alpha=1/14, min_periods=14, adjust=False).mean()
avg_loss = loss.ewm(alpha=1/14, min_periods=14, adjust=False).mean()
rs = avg_gain / avg_loss.replace(0, 0.001) # Avoid div by zero
df['rsi'] = 100 - (100 / (1 + rs))
# Stochastic Oscillator
lowest_low = low.rolling(window=14).min()
highest_high = high.rolling(window=14).max()
denom = (highest_high - lowest_low).replace(0, 0.001)
df['stoch_k'] = 100 * (close - lowest_low) / denom
df['stoch_d'] = df['stoch_k'].rolling(window=3).mean()
# Williams %R
df['williams_r'] = -100 * (highest_high - close) / denom
# CCI (Commodity Channel Index)
if TA_AVAILABLE:
try:
df['cci'] = ta.trend.CCIIndicator(high, low, close, window=20).cci()
except Exception:
self._calculate_manual_cci(df, high, low, close)
else:
self._calculate_manual_cci(df, high, low, close)
# ROC (Rate of Change)
df['roc'] = close.pct_change(periods=10) * 100
# Momentum
df['momentum'] = close - close.shift(10)
def _add_volatility_indicators(self, df: pd.DataFrame):
"""Add volatility indicators (In-Place)"""
close = df['close']
high = df['high']
low = df['low']
# ATR (Average True Range)
tr1 = high - low
tr2 = abs(high - close.shift())
tr3 = abs(low - close.shift())
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
df['atr'] = tr.rolling(window=14).mean()
# Bollinger Bands
sma_20 = close.rolling(window=20).mean()
std_20 = close.rolling(window=20).std()
df['bb_upper'] = sma_20 + (std_20 * 2)
df['bb_middle'] = sma_20
df['bb_lower'] = sma_20 - (std_20 * 2)
width_denom = df['bb_middle'].replace(0, 0.001)
df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / width_denom
pband_denom = (df['bb_upper'] - df['bb_lower']).replace(0, 0.001)
df['bb_pband'] = (close - df['bb_lower']) / pband_denom
# Keltner Channel
ema_20 = close.ewm(span=20, adjust=False).mean()
df['kc_upper'] = ema_20 + (df['atr'] * 2)
df['kc_middle'] = ema_20
df['kc_lower'] = ema_20 - (df['atr'] * 2)
# Volatility (standard deviation)
df['volatility'] = close.rolling(window=20).std()
def _add_volume_indicators(self, df: pd.DataFrame):
"""Add volume indicators (In-Place)"""
if 'volume' not in df.columns:
return
close = df['close']
volume = df['volume']
high = df['high']
low = df['low']
# Volume SMA
df['volume_sma'] = volume.rolling(window=20).mean()
df['volume_ratio'] = volume / df['volume_sma'].replace(0, 1)
# OBV
price_direction = np.sign(close.diff())
_obv_cumulative = (price_direction * volume).fillna(0).cumsum()
df['obv'] = _obv_cumulative.diff(20).fillna(0)
# MFI
if TA_AVAILABLE:
try:
df['mfi'] = ta.volume.MFIIndicator(high, low, close, volume, window=14).money_flow_index()
except Exception:
self._calculate_manual_mfi(df, high, low, close, volume)
else:
self._calculate_manual_mfi(df, high, low, close, volume)
# VWAP
typical_price = (high + low + close) / 3
volume_price_product = typical_price * volume
vol_sum = volume.rolling(window=20).sum().replace(0, 1) # ZERO safe
df['vwap'] = volume_price_product.rolling(window=20).sum() / vol_sum
def _add_custom_features(self, df: pd.DataFrame):
"""Add custom forex-specific features (In-Place)"""
close = df['close']
high = df['high']
low = df['low']
open_ = df['open']
# Candle patterns
df['candle_body'] = close - open_
df['candle_body_pct'] = df['candle_body'] / open_.replace(0, 1) * 100
df['candle_upper_shadow'] = high - pd.concat([open_, close], axis=1).max(axis=1)
df['candle_lower_shadow'] = pd.concat([open_, close], axis=1).min(axis=1) - low
df['candle_range'] = high - low
# Price position
range_hl = (high - low).replace(0, 0.0001)
df['price_position'] = (close - low) / range_hl
# Returns
df['return_1'] = close.pct_change(periods=1, fill_method=None)
df['return_5'] = close.pct_change(periods=5, fill_method=None)
df['return_10'] = close.pct_change(periods=10, fill_method=None)
df['return_20'] = close.pct_change(periods=20, fill_method=None)
# Trend strength
df['trend_sma'] = np.where(close > df['sma_20'], 1, -1)
df['trend_ema'] = np.where(close > df['ema_20'], 1, -1)
# Support/Resistance proximity
df['distance_to_high_20'] = (high.rolling(20).max() - close) / close * 100
df['distance_to_low_20'] = (close - low.rolling(20).min()) / close * 100
# Lag features
for lag in [1, 2, 3, 5]:
df[f'close_lag_{lag}'] = close.shift(lag)
df[f'return_lag_{lag}'] = df['return_1'].shift(lag)
def _add_noble_kpi_features(self, df: pd.DataFrame, timeframe: str) -> pd.DataFrame:
"""Add Noble KPI features for AI training"""
from core.noble_kpi import calculate_atr, noble_safe_range, K_CONSTANTS
# Calculate ATR (needed for Noble KPI)
if 'atr' not in df.columns:
high = df['high']
low = df['low']
close = df['close']
tr1 = high - low
tr2 = abs(high - close.shift())
tr3 = abs(low - close.shift())
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(window=14).mean()
else:
atr = df['atr']
# Get K constant for this timeframe
k = K_CONSTANTS.get(timeframe, 4.5)
# Calculate Safe High/Low for each row
df['kpi_safe_high'] = df['open'] + (k * atr)
df['kpi_safe_low'] = df['open'] - (k * atr)
# Distance features (normalized by ATR for scale independence)
df['distance_to_safe_high'] = (df['kpi_safe_high'] - df['close']) / atr
df['distance_to_safe_low'] = (df['close'] - df['kpi_safe_low']) / atr
# KPI Zone: -1 = below safe_low (danger), 0 = in range (safe), 1 = above safe_high (danger)
df['kpi_zone'] = 0
df.loc[df['close'] > df['kpi_safe_high'], 'kpi_zone'] = 1
df.loc[df['close'] < df['kpi_safe_low'], 'kpi_zone'] = -1
# Position within range (0-100%)
range_size = df['kpi_safe_high'] - df['kpi_safe_low']
df['kpi_position'] = ((df['close'] - df['kpi_safe_low']) / range_size) * 100
df['kpi_position'] = df['kpi_position'].clip(0, 100)
return df
def _add_calendar_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Add economic calendar features.
Features:
- hours_to_high_impact: Hours until next high-impact event
- hours_since_high_impact: Hours since last event
- is_high_impact_day: 1 if high-impact event today
- eur_events_today, usd_events_today: Event counts
"""
if not CALENDAR_AVAILABLE:
return df
try:
calendar = get_economic_calendar()
# Refresh if no events cached
if len(calendar._events) == 0:
calendar.refresh()
# Add calendar features
df = calendar.create_features(df)
logger.debug(f"Added 5 calendar features")
# SAFEGUARD: Fill NaNs in calendar columns to prevent data loss
# (e.g. if we are predicting far into future where no events exist)
cal_cols = ['hours_to_high_impact', 'hours_since_high_impact',
'is_high_impact_day', 'eur_events_today', 'usd_events_today']
for col in cal_cols:
if col in df.columns:
# Fill 'hours' with 999 (far away), counts/flags with 0
fill_val = 999 if 'hours' in col else 0
df[col] = df[col].fillna(fill_val)
except Exception as e:
logger.warning(f"Could not add calendar features: {e}")
# Add default values
df['hours_to_high_impact'] = 999
df['hours_since_high_impact'] = 999
df['is_high_impact_day'] = 0
df['eur_events_today'] = 0
df['usd_events_today'] = 0
return df
[docs]
def get_feature_columns(self) -> list:
"""Get list of feature column names (excluding OHLC and time)"""
# Create a sample dataframe to get column names
sample = pd.DataFrame({
'time': pd.date_range('2015-01-01', periods=300, freq='H'),
'open': np.random.randn(300).cumsum() + 100,
'high': np.random.randn(300).cumsum() + 101,
'low': np.random.randn(300).cumsum() + 99,
'close': np.random.randn(300).cumsum() + 100,
'volume': np.random.randint(1000, 10000, 300),
})
result = self.calculate_all_features(sample)
exclude = ['time', 'open', 'high', 'low', 'close', 'volume']
return [col for col in result.columns if col not in exclude]
[docs]
def select_best_features(self, df: pd.DataFrame, target, n_features: int = 30) -> list:
"""
Select top N features using XGBoost importance.
OPTIMIZED: Uses larger sample (50k) and outlier treatment for robust selection.
"""
logger.info(f"Selecting Top {n_features} features from {len(df.columns)} columns...")
try:
# 1. Prepare Data
exclude = ['time', 'open', 'high', 'low', 'close', 'volume', 'spread']
feature_cols = [c for c in df.columns if c not in exclude]
# OPTIMIZATION: Use 50k samples for better coverage (user request)
sample_size = min(len(df), 50000)
df_sample = df.tail(sample_size).copy()
y_sample = target[-sample_size:]
# --- OUTLIER TREATMENT (WINSORIZATION) ---
# Clip features to 1st and 99th percentiles to improve model quality
for col in feature_cols:
if col in df_sample.columns:
lower = df_sample[col].quantile(0.01)
upper = df_sample[col].quantile(0.99)
df_sample[col] = df_sample[col].clip(lower, upper)
X = df_sample[feature_cols].fillna(0).replace([np.inf, -np.inf], 0)
y = y_sample
logger.info(f"Feature selection on {len(X)} samples, {len(feature_cols)} features...")
# 2. Train Quick XGBoost with early stopping
from xgboost import XGBClassifier
model = XGBClassifier(
n_estimators=30,
max_depth=4,
learning_rate=0.1, # Slightly slower for 50k samples
n_jobs=2,
random_state=42,
early_stopping_rounds=5,
eval_metric='mlogloss'
)
# Split for early stopping
split_idx = int(len(X) * 0.8)
X_train, X_val = X.iloc[:split_idx], X.iloc[split_idx:]
y_train, y_val = y[:split_idx], y[split_idx:]
model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False)
# 3. Get Importance
importances = model.feature_importances_
indices = np.argsort(importances)[::-1]
# 4. Select Top N
top_indices = indices[:n_features]
top_features = [feature_cols[i] for i in top_indices]
# Always ensure 'atr' is included if present
if 'atr' not in top_features and 'atr' in feature_cols:
top_features.append('atr')
logger.info(f"Feature Selection DONE. Top 5: {top_features[:5]}")
return top_features
except Exception as e:
logger.error(f"Feature Selection failed: {e}")
return []
def _calculate_manual_adx(self, df: pd.DataFrame, high: pd.Series, low: pd.Series, close: pd.Series, window: int = 14):
"""
Calculate ADX manually without TA library dependency.
Uses standard Wilder's Smoothing (approximated with EMA/Rolling).
"""
try:
# True Range
tr1 = high - low
tr2 = abs(high - close.shift())
tr3 = abs(low - close.shift())
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = tr.rolling(window=window).mean()
# Directional Movement
up_move = high.diff()
down_move = -low.diff()
plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0)
plus_dm = pd.Series(plus_dm, index=df.index)
minus_dm = pd.Series(minus_dm, index=df.index)
# Smoothing (Simple Rolling for robustness/speed)
plus_di = 100 * (plus_dm.rolling(window=window).mean() / atr)
minus_di = 100 * (minus_dm.rolling(window=window).mean() / atr)
# DX
dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di + 1e-10)
# ADX
df['adx'] = dx.rolling(window=window).mean().fillna(20.0)
df['adx_pos'] = plus_di.fillna(0.0)
df['adx_neg'] = minus_di.fillna(0.0)
except Exception as e:
logger.error(f"Manual ADX calculation failed: {e}")
df['adx'] = 25.0
df['adx_pos'] = 20.0
df['adx_neg'] = 20.0
def _calculate_manual_cci(self, df: pd.DataFrame, high, low, close, window=20):
try:
tp = (high + low + close) / 3
sma = tp.rolling(window).mean()
mad = tp.rolling(window).apply(lambda x: np.mean(np.abs(x - np.mean(x))), raw=True)
df['cci'] = (tp - sma) / (0.015 * mad)
df['cci'] = df['cci'].fillna(0.0)
except Exception:
df['cci'] = 0.0
def _calculate_manual_mfi(self, df: pd.DataFrame, high, low, close, volume, window=14):
try:
typical_price = (high + low + close) / 3
raw_money_flow = typical_price * volume
# Identify positive and negative flow
# 1. Compare Typical Price vs yesterday
tp_diff = typical_price.diff()
pos_flow = np.where(tp_diff > 0, raw_money_flow, 0)
neg_flow = np.where(tp_diff < 0, raw_money_flow, 0)
# Sum over window
pos_mf = pd.Series(pos_flow).rolling(window).sum()
neg_mf = pd.Series(neg_flow).rolling(window).sum()
# MFI Formula: 100 - (100 / (1 + MFR))
mfr = pos_mf / neg_mf.replace(0, 1) # Avoid div by zero
df['mfi'] = 100 - (100 / (1 + mfr))
df['mfi'] = df['mfi'].fillna(50.0)
except Exception:
df['mfi'] = 50.0
def _calculate_manual_ichimoku(self, df: pd.DataFrame, high, low):
try:
# Tenkan-sen (Conversion Line): (9-period high + 9-period low)/2
high_9 = high.rolling(window=9).max()
low_9 = low.rolling(window=9).min()
df['ichimoku_conv'] = (high_9 + low_9) / 2
# Kijun-sen (Base Line): (26-period high + 26-period low)/2
high_26 = high.rolling(window=26).max()
low_26 = low.rolling(window=26).min()
df['ichimoku_base'] = (high_26 + low_26) / 2
# Senkou Span A (Leading Span A): (Conversion Line + Base Line)/2
df['ichimoku_a'] = ((df['ichimoku_conv'] + df['ichimoku_base']) / 2).shift(26)
# Senkou Span B (Leading Span B): (52-period high + 52-period low)/2
high_52 = high.rolling(window=52).max()
low_52 = low.rolling(window=52).min()
df['ichimoku_b'] = ((high_52 + low_52) / 2).shift(26)
# Fill NaNs from shift
df['ichimoku_a'] = df['ichimoku_a'].fillna(method='bfill')
df['ichimoku_b'] = df['ichimoku_b'].fillna(method='bfill')
except Exception:
df['ichimoku_conv'] = (high + low)/2
df['ichimoku_base'] = (high + low)/2
df['ichimoku_a'] = (high + low)/2
df['ichimoku_b'] = (high + low)/2