import hopsworks import pandas as pd import os from datetime import datetime, timedelta from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score from sklearn.model_selection import train_test_split import joblib from pathlib import Path import hsfs import hsml # Define the base directory as the project root BASE_DIR = Path(__file__).resolve().parent.parent.parent class Trainer: def __init__(self, project_name, feature_group_name, model_registry_name, api_key): self.project_name = project_name self.feature_group_name = feature_group_name self.model_registry_name = model_registry_name self.api_key = api_key self.project = hopsworks.login(api_key_value=self.api_key) self.fs = self.project.get_feature_store() self.model_registry = self.project.get_model_registry() self.feature_view = None self.deployment = None def create_feature_view(self): """Select features from the feature group and create a feature view.""" selected_features = self.fs.get_or_create_feature_group( name=self.feature_group_name, version=1 ).select_all() print("Feature group selected successfully......... --->>") """Create or get a feature view for the last 30 days of data.""" try: self.feature_view = self.fs.get_or_create_feature_view( name=f"{self.feature_group_name}_view", version=1, description="Feature view with last 30 days of data for model training", query=selected_features, ) print("Feature view created or retrieved successfully.") except hsfs.client.exceptions.RestAPIError as e: print(f"Error creating feature view: {e}") def delete_feature_view(self): """Delete the feature view.""" try: self.feature_view.delete() print("Feature view deleted successfully.") except hsfs.client.exceptions.RestAPIError as e: print(f"Error deleting feature view: {e}") def get_retrain_data_from_feature_view(self): """Pull the last 30 days of data from the feature view till today.""" start_time = datetime.now() - timedelta(days=30) end_time = datetime.now() # Get the data as a DataFrame from the feature view df = self.feature_view.get_batch_data( start_time=start_time, end_time=end_time) # sort by datetime df = df.sort_values(by='datetime', ascending=False) print("Data pulled from feature view for retraining successfully.") return df def get_plot_data_from_feature_view(self, hours): # get last 12 hours of data starting from current hour to plot start_time = datetime.now() - timedelta(hours=hours) end_time = datetime.now() # Get the data as a DataFrame from the feature view df = self.feature_view.get_batch_data( start_time=start_time, end_time=end_time) # sort by datetime df = df.sort_values(by='datetime', ascending=False) print("Data pulled from feature view for plotting successfully.") return df def train_test_split(self, df, test_size=0.2): """Split data into training and test sets.""" # Define feature columns based on lagged features feature_columns = [ f"{prefix}_lag_{i}" for i in range(0, 13) for prefix in ["open", "high", "low", "close"] ] # Separate features and target X = df[feature_columns] y = df['target'] # Split into train and test sets X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=test_size, random_state=42) print("Data split into train and test sets.") return X_train, X_test, y_train, y_test def get_features_labels(self, df): """Split data into features and labels.""" # Define feature columns based on lagged features feature_columns = [ f"{prefix}_lag_{i}" for i in range(0, 13) for prefix in ["open", "high", "low", "close"] ] # Separate features and target X = df[feature_columns] y = df['target'] return X, y def train_model(self, model, X_train, y_train): """Train the model on training data.""" model.fit(X_train, y_train) print("Model training completed.") return model def evaluate_model(self, model, X_test, y_test, **kwargs): """Evaluate the model on the hold-out test set.""" y_pred = model.predict(X_test) # if show_pred in kwargs is true, print the predictions if "show_pred" in kwargs: print(f"Predictions: {y_pred}") mse = mean_squared_error(y_test, y_pred) mae = mean_absolute_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) print(f"Model Evaluation:\nMSE: {mse}\nMAE: {mae}\nR2 Score: {r2}") return {"mse": mse, "mae": mae, "r2": r2} def save_model_to_registry(self, model, metrics, model_schema, X_train): """Save the trained model to Hopsworks Model Registry.""" # Use BASE_DIR to define the model directory and path model_dir = BASE_DIR / "models" # Ensure the directory exists if not model_dir.exists(): model_dir.mkdir(parents=True, exist_ok=True) model_path = model_dir / f"{self.model_registry_name}.pkl" joblib.dump(model, model_path) new_model = self.model_registry.sklearn.create_model( name=self.model_registry_name, metrics=metrics, model_schema=model_schema, input_example=X_train.sample(), description="Trained model with 30-day feature view data", ) # Register the model and serve as endpoint new_model.save(str(model_path)) # new_model.deploy() print("Model saved to registry successfully.") def model_deploy(self): model = self.model_registry.get_model( self.model_registry_name) # strip all _ from self.model_registry_name and keep only alphanumeric characters deploy_name = self.model_registry_name.replace("_", "") # Get the dataset API for the project dataset_api = self.project.get_dataset_api() # Upload the file "predict_example.py" to the "Models" dataset # If a file with the same name already exists, overwrite it predictor_local_path = BASE_DIR / "src" / \ "training_pipeline" / "kserve_predict_script.py" uploaded_file_path = dataset_api.upload( predictor_local_path, "Models", overwrite=True) # Construct the full path to the uploaded predictor script predictor_script_path = os.path.join( "/Projects", self.project_name, uploaded_file_path) self.deployment = model.deploy( name=deploy_name, script_file=predictor_script_path,) # start the deployment self.deployment.start() def predict_with_hopsworks_api(self, X): """Use the deployed model to make predictions via the Hopsworks API.""" # Get model serving handle from the project model_serving = self.project.get_model_serving() model = self.model_registry.get_model( self.model_registry_name, version=1) # Ensure the deployment name follows the required regex pattern deploy_name = self.model_registry_name.replace("_", "") try: # Get the deployment deployment = model_serving.get_deployment(name=deploy_name) # Make predictions predictions = deployment.predict(inputs=X.values.tolist()) print("Predictions made via Hopsworks model API.") return predictions except hsml.client.exceptions.RestAPIError as e: print(f"Error making predictions: {e}") return None except Exception as e: print(f"Unexpected error: {e}") return None def stop_model_deployment(self): model = self.model_registry.get_model( self.model_registry_name, version=1) # Ensure the deployment name follows the required regex pattern deploy_name = self.model_registry_name.replace("_", "") # Get model serving handle model_serving = self.project.get_model_serving() try: # List deployments deployments = model_serving.get_deployments(model) for deployment in deployments: if deployment.name == deploy_name: # deployment.stop() deployment.delete(force=True) print( f"Deployment {deploy_name} stopped and deleted successfully.") break else: print(f"No deployment found with name: {deploy_name}") except hsml.client.exceptions.RestAPIError as e: print(f"Error stopping or deleting deployment: {e}") return model