Source code for gui.workers.prediction_worker

import time
import logging
from PyQt6.QtCore import QObject, pyqtSignal

logger = logging.getLogger(__name__)

[docs] class PredictionWorker(QObject): """ Worker for running prediction logic in a separate thread. Communicates via signals to ensure thread safety. Uses callbacks to receive granular updates from PredictionEngine. """ # Signal definitions step_updated = pyqtSignal(str, str, float) # step_key, state, elapsed_sec finished = pyqtSignal(dict) # result_data error = pyqtSignal(str, str) # step_key, error_message def __init__(self, app_controller, tf): super().__init__() self.app_controller = app_controller self.tf = tf self._is_running = True self._start_time = 0 self._step_times = {} def _prediction_callback(self, event: str, data: dict): """Handle updates from PredictionEngine""" if not self._is_running: return try: current_time = time.time() - self._start_time if event == "extraction_start" or event == "data_update": self.step_updated.emit("data", "active", 0.0) elif event == "extraction_complete": # Data done, but features might start next self.step_updated.emit("data", "done", 0.5) elif event == "feature_calc": self.step_updated.emit("data", "done", 0.5) # Ensure data is marked done self.step_updated.emit("features", "active", 0.0) model_type = data.get("model_type", "") if model_type in ['xgboost', 'lightgbm', 'randomforest', 'catboost', 'stacking']: self.step_updated.emit(model_type, "done", 0.2) elif event == "model_prediction": # Ensure features are done self.step_updated.emit("features", "done", 1.0) # Mark all steps as done just in case some events were missed steps_to_finish = ["data", "optimize", "features", "xgboost", "lightgbm", "randomforest", "catboost", "stacking", "congress", "risk"] for s in steps_to_finish: self.step_updated.emit(s, "done", 0.1) elif event == "prediction_complete": # Mark remaining as done self.step_updated.emit("congress", "done", 0.1) self.step_updated.emit("risk", "done", 0.1) self.step_updated.emit("signal", "done", 0.1) except Exception as e: logger.error(f"Worker callback error: {e}")
[docs] def run(self): """Main execution method intended to run in QThread""" try: logger.info("[WORKER] Thread STARTED - Entering run()") import threading threading.current_thread().name = f"PredWorker-{self.tf}" self._start_time = time.time() result = None logger.info("[WORKER] Accessing AppController...") if not self.app_controller: logger.critical("[WORKER] AppController is None!") raise RuntimeError("AppController is None") logger.info("[WORKER] Accessing PredictionEngine...") # Use getattr to avoid crash if attribute is missing engine = getattr(self.app_controller, 'prediction_engine', None) if engine is None: logger.critical("[WORKER] PredictionEngine is None!") # We can't proceed without engine, but let's try to emit error self.error.emit("system", "Critical: PredictionEngine not initialized") return logger.info("[WORKER] Adding callback...") # Register callback if hasattr(engine, 'add_callback'): engine.add_callback(self._prediction_callback) logger.info(f"[WORKER] Starting prediction pipeline for {self.tf}") # === STEP 1: HARDWARE CHECK === self.step_updated.emit("hardware", "done", 0.1) self.step_updated.emit("data", "active", 0.0) # === STEP 2-8: RUN PREDICTION PIPELINE === # The app_controller.predict_manual is synchronous/heavy, # so we run it directly here since we are ALREADY in a background thread. logger.info("[WORKER] Calling predict_manual...") t0 = time.time() # Pass volume to predict_manual result = self.app_controller.predict_manual(self.tf) dt = time.time() - t0 logger.info(f"[WORKER] predict_manual returned in {dt:.2f}s") if not result: logger.warning("[WORKER] Result is None/Empty") self.error.emit("features", "Prediction returned None") return # If result implies error if result.get("signal") == "ERROR": logger.error(f"[WORKER] Result contains error: {result}") self.error.emit("features", result.get("error", "Unknown error")) return # Note: The callback handles intermediate steps. # But we ensure everything is marked "done" just in case callback missed something # (e.g. if engine didn't notify some steps). # === FINAL RESULT === total_time = time.time() - self._start_time self.step_updated.emit("result", "done", total_time) self.finished.emit(result) logger.info(f"[WORKER] Prediction finished for {self.tf} in {total_time:.2f}s") except Exception as e: logger.critical(f"[WORKER CRASH] Unhandled exception in run: {e}", exc_info=True) # Format error for UI err_msg = str(e) if "DataMissingError" in str(type(e)): err_msg = "Data Missing (Check Extraction)" self.error.emit("data", err_msg) elif "MemoryError" in err_msg: err_msg = "Memory Limit Exceeded" self.error.emit("hardware", err_msg) else: # Try to associate generic errors with the likely step based on time/context # Default fallback self.error.emit("features", f"System Error: {err_msg}") finally: # Cleanup callback try: if self.app_controller and hasattr(self.app_controller, 'prediction_engine'): engine = getattr(self.app_controller, 'prediction_engine', None) if engine and hasattr(engine, 'remove_callback'): engine.remove_callback(self._prediction_callback) except Exception as final_e: logger.error(f"[WORKER] Cleanup error: {final_e}")