Source code for core.feature_engine

"""
Cerebrum Forex - Feature Engine
Calculate technical indicators from OHLC data.
"""

import logging
from typing import Optional

import pandas as pd
import numpy as np

try:
    import ta
    TA_AVAILABLE = True
except ImportError:
    TA_AVAILABLE = False

try:
    from core.economic_calendar import get_economic_calendar
    CALENDAR_AVAILABLE = True
except ImportError:
    CALENDAR_AVAILABLE = False

logger = logging.getLogger(__name__)


[docs] class FeatureEngine: """Calculate technical indicators and features for ML models""" def __init__(self): if not TA_AVAILABLE: logger.warning("ta library not available, some indicators will be missing")
[docs] def calculate_all_features(self, df: pd.DataFrame, timeframe: str = "1h", progress_callback=None, is_training: bool = False) -> pd.DataFrame: """ Calculate all technical indicators with Turbo I/O (Parquet Caching). Args: df: Input OHLC DataFrame timeframe: Current timeframe (for cache naming) progress_callback: Optional UI status function is_training: If True, bypasses 5000 row limit and uses caching Returns: DataFrame with 1000+ calculated features """ if df.empty: return df # --- BLOCK 1: TURBO I/O CACHE CHECK --- # We only cache during training or for large datasets to save disk I/O on small requests if is_training: from config.settings import FEATURES_DIR import hashlib # Generate a unique hash based on the input data and timeframe # This ensures that if the OHLC changes, the cache is invalidated automatically. data_hash = hashlib.md5(pd.util.hash_pandas_object(df, index=True).values).hexdigest() cache_file = FEATURES_DIR / f"features_{timeframe}_{data_hash}.parquet" if cache_file.exists(): if progress_callback: progress_callback(f" 🚀 Turbo I/O: Loading {timeframe} features from cache...") try: cached_df = pd.read_parquet(cache_file) logger.info(f"[{timeframe}] Turbo I/O: Cache Hit! ({len(cached_df)} rows)") return cached_df except Exception as e: logger.warning(f"[{timeframe}] Cache read failed: {e}. Recalculating...") # --- BLOCK 2: DATA PREPARATION --- # OPTIMIZATION: We must copy once to avoid Touching Original df = df.copy() # Ensure numeric types for calculation robustness cols_to_numeric = ['open', 'high', 'low', 'close', 'volume', 'spread'] for col in cols_to_numeric: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # SAFEGUARD: Adaptive history capping based on Profile if not is_training: from config.settings import default_settings profile = getattr(default_settings, "performance_profile", "BALANCED") # Map profiles to row limits if profile == "ECO": max_rows = 500 elif profile == "PRO": max_rows = 5000 else: max_rows = 1500 # BALANCED if len(df) > max_rows: df = df.tail(max_rows).reset_index(drop=True) logger.debug(f"FeatureEngine enforced profile ({profile}) limit: {max_rows} rows") # --- BLOCK 3: MASSIVE INDICATOR CALCULATION --- # Calculate indicators IN-PLACE to save memory if progress_callback: progress_callback(" ⚙️ Trend Indicators...") self._add_trend_indicators(df) if progress_callback: progress_callback(" ⚙️ Momentum Indicators...") self._add_momentum_indicators(df) if progress_callback: progress_callback(" ⚙️ Volatility Indicators...") self._add_volatility_indicators(df) if progress_callback: progress_callback(" ⚙️ Volume Indicators...") self._add_volume_indicators(df) if progress_callback: progress_callback(" ⚙️ Custom Features...") self._add_custom_features(df) if progress_callback: progress_callback(" ⚙️ Noble KPI Features...") df = self._add_noble_kpi_features(df, timeframe) # KPI might return new DF due to Noble logic, safer to keep assignment # Add economic calendar features if progress_callback: progress_callback(" ⚙️ Calendar Features...") self._add_calendar_features(df) logger.debug(f"Features calculated: {len(df.columns)} columns for {len(df)} rows") # --- BLOCK 4: CLEANUP & CACHE SAVE --- # Drop NaN rows (common at the start of technical indicator arrays) df.dropna(inplace=True) # Store valid row count for logging logger.debug(f"After dropna: {len(df)} rows remain") if is_training: try: if progress_callback: progress_callback(f" 💾 Turbo I/O: Saving {timeframe} features to cache...") # Save as Parquet for high-speed binary I/O df.to_parquet(cache_file, compression='snappy') logger.info(f"[{timeframe}] Turbo I/O: Features cached successfully.") except Exception as e: logger.error(f"[{timeframe}] Failed to save cache: {e}") return df
def _add_trend_indicators(self, df: pd.DataFrame): """Add trend indicators (In-Place)""" close = df['close'] high = df['high'] low = df['low'] # Simple Moving Averages df['sma_5'] = close.rolling(window=5).mean() df['sma_10'] = close.rolling(window=10).mean() df['sma_20'] = close.rolling(window=20).mean() df['sma_50'] = close.rolling(window=50).mean() df['sma_100'] = close.rolling(window=100).mean() df['sma_200'] = close.rolling(window=200).mean() # Exponential Moving Averages df['ema_5'] = close.ewm(span=5, adjust=False).mean() df['ema_10'] = close.ewm(span=10, adjust=False).mean() df['ema_20'] = close.ewm(span=20, adjust=False).mean() df['ema_50'] = close.ewm(span=50, adjust=False).mean() # MACD ema_12 = close.ewm(span=12, adjust=False).mean() ema_26 = close.ewm(span=26, adjust=False).mean() df['macd'] = ema_12 - ema_26 df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean() df['macd_histogram'] = df['macd'] - df['macd_signal'] # ADX (Average Directional Index) if TA_AVAILABLE: try: adx_indicator = ta.trend.ADXIndicator(high, low, close, window=14) df['adx'] = adx_indicator.adx() df['adx_pos'] = adx_indicator.adx_pos() df['adx_neg'] = adx_indicator.adx_neg() except Exception: # Fallback to manual if TA fails self._calculate_manual_adx(df, high, low, close) else: # Manual fallback self._calculate_manual_adx(df, high, low, close) # Ichimoku Cloud # Ichimoku Cloud if TA_AVAILABLE: try: ichimoku = ta.trend.IchimokuIndicator(high, low) df['ichimoku_a'] = ichimoku.ichimoku_a() df['ichimoku_b'] = ichimoku.ichimoku_b() df['ichimoku_base'] = ichimoku.ichimoku_base_line() df['ichimoku_conv'] = ichimoku.ichimoku_conversion_line() except Exception: self._calculate_manual_ichimoku(df, high, low) else: self._calculate_manual_ichimoku(df, high, low) def _add_momentum_indicators(self, df: pd.DataFrame): """Add momentum indicators (In-Place)""" close = df['close'] high = df['high'] low = df['low'] # RSI delta = close.diff() gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) avg_gain = gain.ewm(alpha=1/14, min_periods=14, adjust=False).mean() avg_loss = loss.ewm(alpha=1/14, min_periods=14, adjust=False).mean() rs = avg_gain / avg_loss.replace(0, 0.001) # Avoid div by zero df['rsi'] = 100 - (100 / (1 + rs)) # Stochastic Oscillator lowest_low = low.rolling(window=14).min() highest_high = high.rolling(window=14).max() denom = (highest_high - lowest_low).replace(0, 0.001) df['stoch_k'] = 100 * (close - lowest_low) / denom df['stoch_d'] = df['stoch_k'].rolling(window=3).mean() # Williams %R df['williams_r'] = -100 * (highest_high - close) / denom # CCI (Commodity Channel Index) if TA_AVAILABLE: try: df['cci'] = ta.trend.CCIIndicator(high, low, close, window=20).cci() except Exception: self._calculate_manual_cci(df, high, low, close) else: self._calculate_manual_cci(df, high, low, close) # ROC (Rate of Change) df['roc'] = close.pct_change(periods=10) * 100 # Momentum df['momentum'] = close - close.shift(10) def _add_volatility_indicators(self, df: pd.DataFrame): """Add volatility indicators (In-Place)""" close = df['close'] high = df['high'] low = df['low'] # ATR (Average True Range) tr1 = high - low tr2 = abs(high - close.shift()) tr3 = abs(low - close.shift()) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) df['atr'] = tr.rolling(window=14).mean() # Bollinger Bands sma_20 = close.rolling(window=20).mean() std_20 = close.rolling(window=20).std() df['bb_upper'] = sma_20 + (std_20 * 2) df['bb_middle'] = sma_20 df['bb_lower'] = sma_20 - (std_20 * 2) width_denom = df['bb_middle'].replace(0, 0.001) df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / width_denom pband_denom = (df['bb_upper'] - df['bb_lower']).replace(0, 0.001) df['bb_pband'] = (close - df['bb_lower']) / pband_denom # Keltner Channel ema_20 = close.ewm(span=20, adjust=False).mean() df['kc_upper'] = ema_20 + (df['atr'] * 2) df['kc_middle'] = ema_20 df['kc_lower'] = ema_20 - (df['atr'] * 2) # Volatility (standard deviation) df['volatility'] = close.rolling(window=20).std() def _add_volume_indicators(self, df: pd.DataFrame): """Add volume indicators (In-Place)""" if 'volume' not in df.columns: return close = df['close'] volume = df['volume'] high = df['high'] low = df['low'] # Volume SMA df['volume_sma'] = volume.rolling(window=20).mean() df['volume_ratio'] = volume / df['volume_sma'].replace(0, 1) # OBV price_direction = np.sign(close.diff()) _obv_cumulative = (price_direction * volume).fillna(0).cumsum() df['obv'] = _obv_cumulative.diff(20).fillna(0) # MFI if TA_AVAILABLE: try: df['mfi'] = ta.volume.MFIIndicator(high, low, close, volume, window=14).money_flow_index() except Exception: self._calculate_manual_mfi(df, high, low, close, volume) else: self._calculate_manual_mfi(df, high, low, close, volume) # VWAP typical_price = (high + low + close) / 3 volume_price_product = typical_price * volume vol_sum = volume.rolling(window=20).sum().replace(0, 1) # ZERO safe df['vwap'] = volume_price_product.rolling(window=20).sum() / vol_sum def _add_custom_features(self, df: pd.DataFrame): """Add custom forex-specific features (In-Place)""" close = df['close'] high = df['high'] low = df['low'] open_ = df['open'] # Candle patterns df['candle_body'] = close - open_ df['candle_body_pct'] = df['candle_body'] / open_.replace(0, 1) * 100 df['candle_upper_shadow'] = high - pd.concat([open_, close], axis=1).max(axis=1) df['candle_lower_shadow'] = pd.concat([open_, close], axis=1).min(axis=1) - low df['candle_range'] = high - low # Price position range_hl = (high - low).replace(0, 0.0001) df['price_position'] = (close - low) / range_hl # Returns df['return_1'] = close.pct_change(periods=1, fill_method=None) df['return_5'] = close.pct_change(periods=5, fill_method=None) df['return_10'] = close.pct_change(periods=10, fill_method=None) df['return_20'] = close.pct_change(periods=20, fill_method=None) # Trend strength df['trend_sma'] = np.where(close > df['sma_20'], 1, -1) df['trend_ema'] = np.where(close > df['ema_20'], 1, -1) # Support/Resistance proximity df['distance_to_high_20'] = (high.rolling(20).max() - close) / close * 100 df['distance_to_low_20'] = (close - low.rolling(20).min()) / close * 100 # Lag features for lag in [1, 2, 3, 5]: df[f'close_lag_{lag}'] = close.shift(lag) df[f'return_lag_{lag}'] = df['return_1'].shift(lag) def _add_noble_kpi_features(self, df: pd.DataFrame, timeframe: str) -> pd.DataFrame: """Add Noble KPI features for AI training""" from core.noble_kpi import calculate_atr, noble_safe_range, K_CONSTANTS # Calculate ATR (needed for Noble KPI) if 'atr' not in df.columns: high = df['high'] low = df['low'] close = df['close'] tr1 = high - low tr2 = abs(high - close.shift()) tr3 = abs(low - close.shift()) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.rolling(window=14).mean() else: atr = df['atr'] # Get K constant for this timeframe k = K_CONSTANTS.get(timeframe, 4.5) # Calculate Safe High/Low for each row df['kpi_safe_high'] = df['open'] + (k * atr) df['kpi_safe_low'] = df['open'] - (k * atr) # Distance features (normalized by ATR for scale independence) df['distance_to_safe_high'] = (df['kpi_safe_high'] - df['close']) / atr df['distance_to_safe_low'] = (df['close'] - df['kpi_safe_low']) / atr # KPI Zone: -1 = below safe_low (danger), 0 = in range (safe), 1 = above safe_high (danger) df['kpi_zone'] = 0 df.loc[df['close'] > df['kpi_safe_high'], 'kpi_zone'] = 1 df.loc[df['close'] < df['kpi_safe_low'], 'kpi_zone'] = -1 # Position within range (0-100%) range_size = df['kpi_safe_high'] - df['kpi_safe_low'] df['kpi_position'] = ((df['close'] - df['kpi_safe_low']) / range_size) * 100 df['kpi_position'] = df['kpi_position'].clip(0, 100) return df def _add_calendar_features(self, df: pd.DataFrame) -> pd.DataFrame: """ Add economic calendar features. Features: - hours_to_high_impact: Hours until next high-impact event - hours_since_high_impact: Hours since last event - is_high_impact_day: 1 if high-impact event today - eur_events_today, usd_events_today: Event counts """ if not CALENDAR_AVAILABLE: return df try: calendar = get_economic_calendar() # Refresh if no events cached if len(calendar._events) == 0: calendar.refresh() # Add calendar features df = calendar.create_features(df) logger.debug(f"Added 5 calendar features") # SAFEGUARD: Fill NaNs in calendar columns to prevent data loss # (e.g. if we are predicting far into future where no events exist) cal_cols = ['hours_to_high_impact', 'hours_since_high_impact', 'is_high_impact_day', 'eur_events_today', 'usd_events_today'] for col in cal_cols: if col in df.columns: # Fill 'hours' with 999 (far away), counts/flags with 0 fill_val = 999 if 'hours' in col else 0 df[col] = df[col].fillna(fill_val) except Exception as e: logger.warning(f"Could not add calendar features: {e}") # Add default values df['hours_to_high_impact'] = 999 df['hours_since_high_impact'] = 999 df['is_high_impact_day'] = 0 df['eur_events_today'] = 0 df['usd_events_today'] = 0 return df
[docs] def get_feature_columns(self) -> list: """Get list of feature column names (excluding OHLC and time)""" # Create a sample dataframe to get column names sample = pd.DataFrame({ 'time': pd.date_range('2015-01-01', periods=300, freq='H'), 'open': np.random.randn(300).cumsum() + 100, 'high': np.random.randn(300).cumsum() + 101, 'low': np.random.randn(300).cumsum() + 99, 'close': np.random.randn(300).cumsum() + 100, 'volume': np.random.randint(1000, 10000, 300), }) result = self.calculate_all_features(sample) exclude = ['time', 'open', 'high', 'low', 'close', 'volume'] return [col for col in result.columns if col not in exclude]
[docs] def select_best_features(self, df: pd.DataFrame, target, n_features: int = 30) -> list: """ Select top N features using XGBoost importance. OPTIMIZED: Uses larger sample (50k) and outlier treatment for robust selection. """ logger.info(f"Selecting Top {n_features} features from {len(df.columns)} columns...") try: # 1. Prepare Data exclude = ['time', 'open', 'high', 'low', 'close', 'volume', 'spread'] feature_cols = [c for c in df.columns if c not in exclude] # OPTIMIZATION: Use 50k samples for better coverage (user request) sample_size = min(len(df), 50000) df_sample = df.tail(sample_size).copy() y_sample = target[-sample_size:] # --- OUTLIER TREATMENT (WINSORIZATION) --- # Clip features to 1st and 99th percentiles to improve model quality for col in feature_cols: if col in df_sample.columns: lower = df_sample[col].quantile(0.01) upper = df_sample[col].quantile(0.99) df_sample[col] = df_sample[col].clip(lower, upper) X = df_sample[feature_cols].fillna(0).replace([np.inf, -np.inf], 0) y = y_sample logger.info(f"Feature selection on {len(X)} samples, {len(feature_cols)} features...") # 2. Train Quick XGBoost with early stopping from xgboost import XGBClassifier model = XGBClassifier( n_estimators=30, max_depth=4, learning_rate=0.1, # Slightly slower for 50k samples n_jobs=2, random_state=42, early_stopping_rounds=5, eval_metric='mlogloss' ) # Split for early stopping split_idx = int(len(X) * 0.8) X_train, X_val = X.iloc[:split_idx], X.iloc[split_idx:] y_train, y_val = y[:split_idx], y[split_idx:] model.fit(X_train, y_train, eval_set=[(X_val, y_val)], verbose=False) # 3. Get Importance importances = model.feature_importances_ indices = np.argsort(importances)[::-1] # 4. Select Top N top_indices = indices[:n_features] top_features = [feature_cols[i] for i in top_indices] # Always ensure 'atr' is included if present if 'atr' not in top_features and 'atr' in feature_cols: top_features.append('atr') logger.info(f"Feature Selection DONE. Top 5: {top_features[:5]}") return top_features except Exception as e: logger.error(f"Feature Selection failed: {e}") return []
def _calculate_manual_adx(self, df: pd.DataFrame, high: pd.Series, low: pd.Series, close: pd.Series, window: int = 14): """ Calculate ADX manually without TA library dependency. Uses standard Wilder's Smoothing (approximated with EMA/Rolling). """ try: # True Range tr1 = high - low tr2 = abs(high - close.shift()) tr3 = abs(low - close.shift()) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.rolling(window=window).mean() # Directional Movement up_move = high.diff() down_move = -low.diff() plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0) minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0) plus_dm = pd.Series(plus_dm, index=df.index) minus_dm = pd.Series(minus_dm, index=df.index) # Smoothing (Simple Rolling for robustness/speed) plus_di = 100 * (plus_dm.rolling(window=window).mean() / atr) minus_di = 100 * (minus_dm.rolling(window=window).mean() / atr) # DX dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di + 1e-10) # ADX df['adx'] = dx.rolling(window=window).mean().fillna(20.0) df['adx_pos'] = plus_di.fillna(0.0) df['adx_neg'] = minus_di.fillna(0.0) except Exception as e: logger.error(f"Manual ADX calculation failed: {e}") df['adx'] = 25.0 df['adx_pos'] = 20.0 df['adx_neg'] = 20.0 def _calculate_manual_cci(self, df: pd.DataFrame, high, low, close, window=20): try: tp = (high + low + close) / 3 sma = tp.rolling(window).mean() mad = tp.rolling(window).apply(lambda x: np.mean(np.abs(x - np.mean(x))), raw=True) df['cci'] = (tp - sma) / (0.015 * mad) df['cci'] = df['cci'].fillna(0.0) except Exception: df['cci'] = 0.0 def _calculate_manual_mfi(self, df: pd.DataFrame, high, low, close, volume, window=14): try: typical_price = (high + low + close) / 3 raw_money_flow = typical_price * volume # Identify positive and negative flow # 1. Compare Typical Price vs yesterday tp_diff = typical_price.diff() pos_flow = np.where(tp_diff > 0, raw_money_flow, 0) neg_flow = np.where(tp_diff < 0, raw_money_flow, 0) # Sum over window pos_mf = pd.Series(pos_flow).rolling(window).sum() neg_mf = pd.Series(neg_flow).rolling(window).sum() # MFI Formula: 100 - (100 / (1 + MFR)) mfr = pos_mf / neg_mf.replace(0, 1) # Avoid div by zero df['mfi'] = 100 - (100 / (1 + mfr)) df['mfi'] = df['mfi'].fillna(50.0) except Exception: df['mfi'] = 50.0 def _calculate_manual_ichimoku(self, df: pd.DataFrame, high, low): try: # Tenkan-sen (Conversion Line): (9-period high + 9-period low)/2 high_9 = high.rolling(window=9).max() low_9 = low.rolling(window=9).min() df['ichimoku_conv'] = (high_9 + low_9) / 2 # Kijun-sen (Base Line): (26-period high + 26-period low)/2 high_26 = high.rolling(window=26).max() low_26 = low.rolling(window=26).min() df['ichimoku_base'] = (high_26 + low_26) / 2 # Senkou Span A (Leading Span A): (Conversion Line + Base Line)/2 df['ichimoku_a'] = ((df['ichimoku_conv'] + df['ichimoku_base']) / 2).shift(26) # Senkou Span B (Leading Span B): (52-period high + 52-period low)/2 high_52 = high.rolling(window=52).max() low_52 = low.rolling(window=52).min() df['ichimoku_b'] = ((high_52 + low_52) / 2).shift(26) # Fill NaNs from shift df['ichimoku_a'] = df['ichimoku_a'].fillna(method='bfill') df['ichimoku_b'] = df['ichimoku_b'].fillna(method='bfill') except Exception: df['ichimoku_conv'] = (high + low)/2 df['ichimoku_base'] = (high + low)/2 df['ichimoku_a'] = (high + low)/2 df['ichimoku_b'] = (high + low)/2