import time
import logging
from PyQt6.QtCore import QObject, pyqtSignal
logger = logging.getLogger(__name__)
[docs]
class PredictionWorker(QObject):
"""
Worker for running prediction logic in a separate thread.
Communicates via signals to ensure thread safety.
Uses callbacks to receive granular updates from PredictionEngine.
"""
# Signal definitions
step_updated = pyqtSignal(str, str, float) # step_key, state, elapsed_sec
finished = pyqtSignal(dict) # result_data
error = pyqtSignal(str, str) # step_key, error_message
def __init__(self, app_controller, tf):
super().__init__()
self.app_controller = app_controller
self.tf = tf
self._is_running = True
self._start_time = 0
self._step_times = {}
def _prediction_callback(self, event: str, data: dict):
"""Handle updates from PredictionEngine"""
if not self._is_running:
return
try:
current_time = time.time() - self._start_time
if event == "extraction_start" or event == "data_update":
self.step_updated.emit("data", "active", 0.0)
elif event == "extraction_complete":
# Data done, but features might start next
self.step_updated.emit("data", "done", 0.5)
elif event == "feature_calc":
self.step_updated.emit("data", "done", 0.5) # Ensure data is marked done
self.step_updated.emit("features", "active", 0.0)
model_type = data.get("model_type", "")
if model_type in ['xgboost', 'lightgbm', 'randomforest', 'catboost', 'stacking']:
self.step_updated.emit(model_type, "done", 0.2)
elif event == "model_prediction":
# Ensure features are done
self.step_updated.emit("features", "done", 1.0)
# Mark all steps as done just in case some events were missed
steps_to_finish = ["data", "optimize", "features", "xgboost", "lightgbm", "randomforest", "catboost", "stacking", "congress", "risk"]
for s in steps_to_finish:
self.step_updated.emit(s, "done", 0.1)
elif event == "prediction_complete":
# Mark remaining as done
self.step_updated.emit("congress", "done", 0.1)
self.step_updated.emit("risk", "done", 0.1)
self.step_updated.emit("signal", "done", 0.1)
except Exception as e:
logger.error(f"Worker callback error: {e}")
[docs]
def run(self):
"""Main execution method intended to run in QThread"""
try:
logger.info("[WORKER] Thread STARTED - Entering run()")
import threading
threading.current_thread().name = f"PredWorker-{self.tf}"
self._start_time = time.time()
result = None
logger.info("[WORKER] Accessing AppController...")
if not self.app_controller:
logger.critical("[WORKER] AppController is None!")
raise RuntimeError("AppController is None")
logger.info("[WORKER] Accessing PredictionEngine...")
# Use getattr to avoid crash if attribute is missing
engine = getattr(self.app_controller, 'prediction_engine', None)
if engine is None:
logger.critical("[WORKER] PredictionEngine is None!")
# We can't proceed without engine, but let's try to emit error
self.error.emit("system", "Critical: PredictionEngine not initialized")
return
logger.info("[WORKER] Adding callback...")
# Register callback
if hasattr(engine, 'add_callback'):
engine.add_callback(self._prediction_callback)
logger.info(f"[WORKER] Starting prediction pipeline for {self.tf}")
# === STEP 1: HARDWARE CHECK ===
self.step_updated.emit("hardware", "done", 0.1)
self.step_updated.emit("data", "active", 0.0)
# === STEP 2-8: RUN PREDICTION PIPELINE ===
# The app_controller.predict_manual is synchronous/heavy,
# so we run it directly here since we are ALREADY in a background thread.
logger.info("[WORKER] Calling predict_manual...")
t0 = time.time()
# Pass volume to predict_manual
result = self.app_controller.predict_manual(self.tf)
dt = time.time() - t0
logger.info(f"[WORKER] predict_manual returned in {dt:.2f}s")
if not result:
logger.warning("[WORKER] Result is None/Empty")
self.error.emit("features", "Prediction returned None")
return
# If result implies error
if result.get("signal") == "ERROR":
logger.error(f"[WORKER] Result contains error: {result}")
self.error.emit("features", result.get("error", "Unknown error"))
return
# Note: The callback handles intermediate steps.
# But we ensure everything is marked "done" just in case callback missed something
# (e.g. if engine didn't notify some steps).
# === FINAL RESULT ===
total_time = time.time() - self._start_time
self.step_updated.emit("result", "done", total_time)
self.finished.emit(result)
logger.info(f"[WORKER] Prediction finished for {self.tf} in {total_time:.2f}s")
except Exception as e:
logger.critical(f"[WORKER CRASH] Unhandled exception in run: {e}", exc_info=True)
# Format error for UI
err_msg = str(e)
if "DataMissingError" in str(type(e)):
err_msg = "Data Missing (Check Extraction)"
self.error.emit("data", err_msg)
elif "MemoryError" in err_msg:
err_msg = "Memory Limit Exceeded"
self.error.emit("hardware", err_msg)
else:
# Try to associate generic errors with the likely step based on time/context
# Default fallback
self.error.emit("features", f"System Error: {err_msg}")
finally:
# Cleanup callback
try:
if self.app_controller and hasattr(self.app_controller, 'prediction_engine'):
engine = getattr(self.app_controller, 'prediction_engine', None)
if engine and hasattr(engine, 'remove_callback'):
engine.remove_callback(self._prediction_callback)
except Exception as final_e:
logger.error(f"[WORKER] Cleanup error: {final_e}")