import numpy as np import pandas as pd import os import matplotlib.pyplot as plt from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay from joblib import load def create_ready_data( stft_data_path: str, stratify: np.ndarray = None, ) -> tuple[pd.DataFrame, np.ndarray]: """ Create a stratified train-test split from STFT data. Parameters: ----------- stft_data_path : str Path to the directory containing STFT data files (e.g. 'data/converted/raw/sensor1') stratify : np.ndarray, optional Labels to use for stratified sampling Returns: -------- tuple (pd.DataFrame, np.ndarray) - Combined data and corresponding labels """ ready_data = [] for file in os.listdir(stft_data_path): ready_data.append(pd.read_csv(os.path.join(stft_data_path, file), skiprows=1)) y_data = [i for i in range(len(ready_data))] # TODO: Should be replaced with actual desired labels # Combine all dataframes in ready_data into a single dataframe if ready_data: # Check if the list is not empty # Use pandas concat function instead of iterative concatenation combined_data = pd.concat(ready_data, axis=0, ignore_index=True) print(f"Type of combined data: {type(combined_data)}") print(f"Shape of combined data: {combined_data.shape}") else: print("No data available in ready_data list") combined_data = pd.DataFrame() # Store the result in x1a for compatibility with subsequent code X = combined_data for i in range(len(y_data)): y_data[i] = [y_data[i]] * ready_data[i].shape[0] y_data[i] = np.array(y_data[i]) if y_data: # Use numpy concatenate function instead of iterative concatenation y = np.concatenate(y_data, axis=0) else: print("No labels available in y_data list") y = np.array([]) return X, y def train_and_evaluate_model( model, model_name, sensor_label, x_train, y_train, x_test, y_test, export=None ): """ Train a machine learning model, evaluate its performance, and optionally export it. This function trains the provided model on the training data, evaluates its performance on test data using accuracy score, and can save the trained model to disk if an export path is provided. Parameters ---------- model : estimator object The machine learning model to train. model_name : str Name of the model, used for the export filename and in the returned results. sensor_label : str Label identifying which sensor's data the model is being trained on. x_train : array-like or pandas.DataFrame The training input samples. y_train : array-like The target values for training. x_test : array-like or pandas.DataFrame The test input samples. y_test : array-like The target values for testing. export : str, optional Directory path where the trained model should be saved. If None, model won't be saved. Returns ------- dict Dictionary containing: - 'model': model_name (str) - 'sensor': sensor_label (str) - 'accuracy': accuracy percentage (float) Example ------- >>> from sklearn.svm import SVC >>> from sklearn.model_selection import train_test_split >>> X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2) >>> result = train_and_evaluate_model( ... SVC(), ... "SVM", ... "sensor1", ... X_train, ... y_train, ... X_test, ... y_test, ... export="models/sensor1" ... ) >>> print(f"Model accuracy: {result['accuracy']:.2f}%") """ from sklearn.metrics import accuracy_score result = {"model": model_name, "sensor": sensor_label, "success": False} try: # Train the model model.fit(x_train, y_train) try: y_pred = model.predict(x_test) result["y_pred"] = y_pred # Convert to numpy array except Exception as e: result["error"] = f"Prediction error: {str(e)}" return result # Calculate accuracy try: accuracy = accuracy_score(y_test, y_pred) * 100 result["accuracy"] = accuracy except Exception as e: result["error"] = f"Accuracy calculation error: {str(e)}" return result # Export model if requested if export: try: import joblib full_path = os.path.join(export, f"{model_name}.joblib") os.makedirs(os.path.dirname(full_path), exist_ok=True) joblib.dump(model, full_path) print(f"Model saved to {full_path}") except Exception as e: print(f"Warning: Failed to export model to {export}: {str(e)}") result["export_error"] = str(e) # Continue despite export error result["success"] = True return result except Exception as e: result["error"] = f"Training error: {str(e)}" return result def plot_confusion_matrix(results_sensor, y_test, title): """ Plot confusion matrices for each model in results_sensor1. Parameters: ----------- results_sensor1 : list List of dictionaries containing model results. x_test1 : array-like Test input samples. y_test : array-like True labels for the test samples. Returns: -------- None This function will display confusion matrices for each model in results_sensor1. Example ------- >>> results_sensor1 = [ ... {'model': 'model1', 'accuracy': 95.0}, ... {'model': 'model2', 'accuracy': 90.0} ... ] >>> x_test1 = np.random.rand(100, 10) # Example test data >>> y_test = np.random.randint(0, 2, size=100) # Example true labels >>> plot_confusion_matrix(results_sensor1, x_test1, y_test) """ # Iterate through each model result and plot confusion matrix for i in results_sensor: model = load(f"D:/thesis/models/{i['sensor']}/{i['model']}.joblib") cm = confusion_matrix(y_test, i['y_pred']) # -> ndarray # get the class labels labels = model.classes_ # Plot disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels) disp.plot(cmap=plt.cm.Blues) # You can change colormap plt.title(f"{title}") def calculate_label_percentages(labels): """ Calculate and print the percentage distribution of unique labels in a numpy array. Parameters: labels (np.array): Input array of labels. Returns: None """ # Count occurrences of each unique label unique, counts = np.unique(labels, return_counts=True) # Calculate percentages percentages = (counts / len(labels)) * 100 # Build and print the result string result = "\n".join([f"Label {label}: {percentage:.2f}%" for label, percentage in zip(unique, percentages)]) return print(result) def inference_model( models, raw_file, column_question: int = None ): """ Perform inference using a trained machine learning model on a raw vibration data file with questioned column grid. Parameters ---------- model : dict with some exported model path The trained machine learning model to use for inference. x_test : array-like or pandas.DataFrame The input samples for which predictions are to be made. export : str, optional Directory path where the predictions should be saved. If None, predictions won't be saved. Returns ------- np.ndarray Array of predicted values. Example ------- >>> from sklearn.svm import SVC >>> model = {"SVM": "models/sensor1/SVM.joblib", "SVM with PCA": "models/sensor1/SVM_with_PCA.joblib"} >>> inference_model(model["SVM"], "zzzAD1.TXT", column_question=1) """ df = pd.read_csv(raw_file, delim_whitespace=True, skiprows=10, header=0, memory_map=True) col_idx = [] for i in range(1,6): idx = [i, i+5, i+10, i+15, i+20, i+25] col_idx.append(idx) vibration_data = df.iloc[:, column_question].values # Perform STFT from scipy.signal import stft, hann freq, times, Zxx = stft( vibration_data, fs=1024, window=hann(1024), nperseg=1024, noverlap=1024-512 ) data = pd.DataFrame(np.abs(Zxx).T, columns=[f"Freq_{freq:.2f}" for freq in np.linspace(0, 1024/2, Zxx.shape[1])]) data = data.rename(columns={"Freq_0.00": "00"}) # To match the model input format model = load(models) # Load the model from the provided path return calculate_label_percentages(model.predict(data.iloc[:21,:]))