{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "import matplotlib.pyplot as plt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sensor1 = pd.read_csv('D:/thesis/data/converted/raw/DAMAGE_1/DAMAGE_0_TEST1_01.csv',sep=',')\n", "sensor2 = pd.read_csv('D:/thesis/data/converted/raw/DAMAGE_1/DAMAGE_0_TEST1_02.csv',sep=',')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sensor1.columns" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df1 = pd.DataFrame()\n", "df1['s1'] = sensor1[sensor1.columns[-1]]\n", "df1['s2'] = sensor2[sensor2.columns[-1]]\n", "df1\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def merge_two_sensors(damage_path, damage):\n", " df = pd.DataFrame()\n", " for file in os.listdir(damage_path):\n", " pattern = re.compile(r'DAMAGE_\\d+_TEST\\d+_\\d{2}\\.csv')\n", " try:\n", " assert pattern.match(file), f\"File {file} does not match the required format, skipping...\"\n", " # assert \"TEST01\" in file, f\"File {file} does not contain 'TEST01', skipping...\" #TODO: should be trained using the whole test file\n", " print(f\"Processing file: {file}\")\n", " # Append the full path of the file to sensor1 or sensor2 based on the filename\n", " if file.endswith('_01.csv'):\n", " df['sensor 1'] = pd.read_csv(os.path.join('D:/thesis/data/converted/raw', damage, file), sep=',', usecols=[1])\n", " elif file.endswith('_02.csv'):\n", " df['sensor 2'] = pd.read_csv(os.path.join('D:/thesis/data/converted/raw', damage, file), sep=',', usecols=[1])\n", " except AssertionError as e:\n", " print(e)\n", " continue # Skip to the next iteration\n", " return df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import re\n", "\n", "df = []\n", "for damage in os.listdir('D:/thesis/data/converted/raw'):\n", " damage_path = os.path.join('D:/thesis/data/converted/raw', damage)\n", " df.append(merge_two_sensors(damage_path, damage))\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "len(df)\n", "df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Combined Plot for sensor 1 and sensor 2 from data1 file in which motor is operated at 800 rpm\n", "\n", "plt.plot(df1['s2'], label='Sensor 1', color='C1', alpha=0.6)\n", "plt.plot(df1['s1'], label='Sensor 2', color='C0', alpha=0.6)\n", "plt.xlabel(\"Number of samples\")\n", "plt.ylabel(\"Amplitude\")\n", "plt.title(\"Raw vibration signal\")\n", "plt.ylim(-7.5, 5)\n", "plt.legend()\n", "plt.locator_params(axis='x', nbins=8)\n", "plt.ylim(-1, 1) # Adjust range as needed\n", "plt.grid(True, linestyle='--', alpha=0.5)\n", "plt.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "signal_sensor1_test1 = []\n", "signal_sensor2_test1 = []\n", "\n", "for data in df:\n", " if not data.empty and 'sensor 1' in data.columns and 'sensor 2' in data.columns:\n", " signal_sensor1_test1.append(data['sensor 1'].values)\n", " signal_sensor2_test1.append(data['sensor 2'].values)\n", "\n", "print(len(signal_sensor1_test1))\n", "print(len(signal_sensor2_test1))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Applying Short-Time Fourier Transform (STFT)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "os.getcwd()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import pandas as pd\n", "import numpy as np\n", "from scipy.signal import stft, hann\n", "# from multiprocessing import Pool\n", "\n", "# Function to compute and append STFT data\n", "def process_stft(args):\n", " # Define STFT parameters\n", " window_size = 1024\n", " hop_size = 512\n", " window = hann(window_size)\n", "\n", " Fs = 1024 # Sampling frequency in Hz\n", " \n", " damage_num, test_num, sensor_suffix = args\n", " sensor_name = active_sensors[sensor_suffix]\n", " sensor_num = sensor_suffix[-1] # '1' or '2'\n", " \n", " # Construct the file path\n", " file_name = f'DAMAGE_{damage_num}_TEST{test_num}_{sensor_suffix}.csv'\n", " file_path = os.path.join(damage_base_path, f'DAMAGE_{damage_num}', file_name)\n", " \n", " # Check if the file exists\n", " if not os.path.isfile(file_path):\n", " print(f\"File {file_path} does not exist. Skipping...\")\n", " return\n", " \n", " # Read the CSV\n", " try:\n", " df = pd.read_csv(file_path)\n", " except Exception as e:\n", " print(f\"Error reading {file_path}: {e}. Skipping...\")\n", " return\n", " \n", " # Ensure the CSV has exactly two columns\n", " if df.shape[1] != 2:\n", " print(f\"Unexpected number of columns in {file_path}. Skipping...\")\n", " return\n", " \n", " # Extract sensor data\n", " sensor_column = df.columns[1]\n", " sensor_data = df[sensor_column].values\n", " \n", " # Compute STFT\n", " frequencies, times, Zxx = stft(sensor_data, fs=Fs, window=window, nperseg=window_size, noverlap=window_size - hop_size)\n", " magnitude = np.abs(Zxx)\n", " df_stft = pd.DataFrame(magnitude, index=frequencies, columns=times).T\n", " df_stft.columns = [f\"Freq_{i}\" for i in frequencies]\n", " \n", " # Define the output CSV file path\n", " stft_file_name = f'stft_data{sensor_num}_{damage_num}.csv'\n", " sensor_output_dir = os.path.join(damage_base_path, sensor_name.lower())\n", " os.makedirs(sensor_output_dir, exist_ok=True)\n", " stft_file_path = os.path.join(sensor_output_dir, stft_file_name)\n", " # Append the flattened STFT to the CSV\n", " try:\n", " if not os.path.isfile(stft_file_path):\n", " # Create a new CSV\n", " df_stft.to_csv(stft_file_path, index=False, header=False)\n", " else:\n", " # Append to existing CSV\n", " df_stft.to_csv(stft_file_path, mode='a', index=False, header=False)\n", " print(f\"Appended STFT data to {stft_file_path}\")\n", " except Exception as e:\n", " print(f\"Error writing to {stft_file_path}: {e}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define the base path where DAMAGE_X folders are located\n", "damage_base_path = 'D:/thesis/data/converted/raw/'\n", "\n", "# Define active sensors\n", "active_sensors = {\n", " '01': 'sensor1', # Beginning map sensor\n", " '02': 'sensor2' # End map sensor\n", "}\n", "\n", "# Define damage cases and test runs\n", "damage_cases = range(1, 7) # Adjust based on actual number of damage cases\n", "test_runs = range(1, 6) # TEST01 to TEST05\n", "args_list = []\n", "\n", "# Prepare the list of arguments for parallel processing\n", "for damage_num in damage_cases:\n", " for test_num in test_runs:\n", " for sensor_suffix in active_sensors.keys():\n", " args_list.append((damage_num, test_num, sensor_suffix))\n", "\n", "print(len(args_list))\n", "args_list" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Process STFTs sequentially instead of in parallel\n", "if __name__ == \"__main__\":\n", " print(f\"Starting sequential STFT processing...\")\n", " for i, arg in enumerate(args_list, 1):\n", " process_stft(arg)\n", " print(f\"Processed {i}/{len(args_list)} files\")\n", " print(\"STFT processing completed.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from scipy.signal import stft, hann\n", "\n", "# Applying STFT\n", "vibration_data = signal_sensor1_test1[1]\n", "window_size = 1024\n", "hop_size = 512\n", "window = hann(window_size) # Creating a Hanning window\n", "Fs = 1024\n", "\n", "frequencies, times, Zxx = stft(vibration_data, \n", " fs=Fs, \n", " window=window, \n", " nperseg=window_size, \n", " noverlap=window_size - hop_size)\n", "# Plotting the STFT Data\n", "plt.pcolormesh(times, frequencies, np.abs(Zxx), shading='gouraud')\n", "plt.title(f'STFT Magnitude for case {1} signal sensor 2')\n", "plt.ylabel(f'Frequency [Hz]')\n", "plt.xlabel(f'Time [sec]')\n", "plt.show()\n", "\n", "# get current y ticks in list\n", "print(len(frequencies))\n", "print(len(times))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Loading STFT Data from CSV Files" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "os.listdir('D:/thesis/data/working')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "ready_data1a = []\n", "for file in os.listdir('D:/thesis/data/converted/raw/sensor1'):\n", " ready_data1a.append(pd.read_csv(os.path.join('D:/thesis/data/converted/raw/sensor1', file)))\n", "# colormesh give title x is frequency and y is time and rotate/transpose the data\n", "# Plotting the STFT Data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "from mpl_toolkits.mplot3d import Axes3D\n", "\n", "# Assuming ready_data1a[0] is a DataFrame or 2D array\n", "spectrogram_data = ready_data1a[0].values # Convert to NumPy array if it's a DataFrame\n", "\n", "# Get the dimensions of the spectrogram\n", "num_frequencies, num_time_frames = spectrogram_data.shape\n", "\n", "# Create frequency and time arrays\n", "frequencies = np.arange(num_frequencies) # Replace with actual frequency values if available\n", "time_frames = np.arange(num_time_frames) # Replace with actual time values if available\n", "\n", "# Create a meshgrid for plotting\n", "T, F = np.meshgrid(time_frames, frequencies)\n", "\n", "# Create a 3D plot\n", "fig = plt.figure(figsize=(12, 8))\n", "ax = fig.add_subplot(111, projection='3d')\n", "\n", "# Plot the surface\n", "surf = ax.plot_surface(T, F, spectrogram_data, cmap='bwr', edgecolor='none')\n", "\n", "# Add labels and a color bar\n", "ax.set_xlabel('Time Frames')\n", "ax.set_ylabel('Frequency [Hz]')\n", "ax.set_zlabel('Magnitude')\n", "ax.set_title('3D Spectrogram')\n", "# Resize the z-axis (shrink it)\n", "z_min, z_max = 0, 0.1 # Replace with your desired range\n", "ax.set_zlim(z_min, z_max)\n", "ax.get_proj = lambda: np.dot(Axes3D.get_proj(ax), np.diag([1, 1, 0.5, 1])) # Shrink z-axis by 50%\n", "ax.set_facecolor('white')\n", "fig.colorbar(surf, ax=ax, shrink=0.5, aspect=10)\n", "\n", "# Show the plot\n", "plt.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from cmcrameri import cm\n", "# Create a figure and subplots\n", "fig, axes = plt.subplots(2, 3, figsize=(15, 8), sharex=True, sharey=True)\n", "\n", "# Flatten the axes array for easier iteration\n", "axes = axes.flatten()\n", "\n", "# Loop through each subplot and plot the data\n", "for i in range(6):\n", " pcm = axes[i].pcolormesh(ready_data1a[i].transpose(), cmap='bwr', vmax=0.03, vmin=0.0)\n", " axes[i].set_title(f'Case {i} Sensor A', fontsize=12)\n", "\n", "# Add a single color bar for all subplots\n", "# Use the first `pcolormesh` object (or any valid one) for the color bar\n", "cbar = fig.colorbar(pcm, ax=axes, orientation='vertical')\n", "# cbar.set_label('Magnitude')\n", "\n", "# Set shared labels\n", "fig.text(0.5, 0.04, 'Time Frames', ha='center', fontsize=12)\n", "fig.text(0.04, 0.5, 'Frequency [Hz]', va='center', rotation='vertical', fontsize=12)\n", "\n", "# Adjust layout\n", "# plt.tight_layout(rect=[0.05, 0.05, 1, 1]) # Leave space for shared labels\n", "plt.subplots_adjust(left=0.1, right=0.75, top=0.9, bottom=0.1, wspace=0.2, hspace=0.2)\n", "\n", "plt.show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ready_data2a = []\n", "for file in os.listdir('D:/thesis/data/converted/raw/sensor2'):\n", " ready_data2a.append(pd.read_csv(os.path.join('D:/thesis/data/converted/raw/sensor2', file)))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(len(ready_data1a))\n", "print(len(ready_data2a))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "x1a = 0\n", "print(type(ready_data1a[0]))\n", "ready_data1a[0].iloc[:,0]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Checking length of the total array" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "x1a = 0\n", "print(type(x1a))\n", "for i in range(len(ready_data1a)):\n", " print(type(ready_data1a[i].shape[0]))\n", " x1a = x1a + ready_data1a[i].shape[0]\n", " print(type(x1a))\n", "\n", "print(x1a)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "x2a = 0\n", "\n", "for i in range(len(ready_data2a)):\n", " print(ready_data2a[i].shape)\n", " x2a = x2a + ready_data2a[i].shape[0]\n", "\n", "print(x2a)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Flatten 6 array into one array" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Combine all dataframes in ready_data1a into a single dataframe\n", "if ready_data1a: # Check if the list is not empty\n", " # Use pandas concat function instead of iterative concatenation\n", " combined_data = pd.concat(ready_data1a, axis=0, ignore_index=True)\n", " \n", " print(f\"Type of combined data: {type(combined_data)}\")\n", " print(f\"Shape of combined data: {combined_data.shape}\")\n", " \n", " # Display the combined dataframe\n", " combined_data\n", "else:\n", " print(\"No data available in ready_data1a list\")\n", " combined_data = pd.DataFrame()\n", "\n", "# Store the result in x1a for compatibility with subsequent code\n", "x1a = combined_data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Combine all dataframes in ready_data1a into a single dataframe\n", "if ready_data2a: # Check if the list is not empty\n", " # Use pandas concat function instead of iterative concatenation\n", " combined_data = pd.concat(ready_data2a, axis=0, ignore_index=True)\n", " \n", " print(f\"Type of combined data: {type(combined_data)}\")\n", " print(f\"Shape of combined data: {combined_data.shape}\")\n", " \n", " # Display the combined dataframe\n", " combined_data\n", "else:\n", " print(\"No data available in ready_data1a list\")\n", " combined_data = pd.DataFrame()\n", "\n", "# Store the result in x1a for compatibility with subsequent code\n", "x2a = combined_data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Creating the label" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y_1 = 0\n", "y_2 = 1\n", "y_3 = 2\n", "y_4 = 3\n", "y_5 = 4\n", "y_6 = 5" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y_data = [y_1, y_2, y_3, y_4, y_5, y_6]\n", "y_data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for i in range(len(y_data)):\n", " print(ready_data1a[i].shape[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "for i in range(len(y_data)):\n", " y_data[i] = [y_data[i]]*ready_data1a[i].shape[0]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "len(y_data[0])\n", "# y_data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y = y_data[0]\n", "\n", "for i in range(len(y_data) - 1):\n", " #print(i)\n", " y = np.concatenate((y, y_data[i+1]), axis=0)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(y.shape)\n", "print(np.unique(y))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from src.ml.model_selection import create_ready_data\n", "\n", "X1a, y = create_ready_data('D:/thesis/data/converted/raw/sensor1')\n", "X2a, y = create_ready_data('D:/thesis/data/converted/raw/sensor2')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "X1a.iloc[-1,:]\n", "# y[2565]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.model_selection import train_test_split\n", "\n", "x_train1, x_test1, y_train, y_test = train_test_split(X1a, y, test_size=0.2, random_state=2)\n", "x_train2, x_test2, y_train, y_test = train_test_split(X2a, y, test_size=0.2, random_state=2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.metrics import accuracy_score\n", "from sklearn.ensemble import RandomForestClassifier, BaggingClassifier\n", "from sklearn.tree import DecisionTreeClassifier\n", "from sklearn.neighbors import KNeighborsClassifier\n", "from sklearn.discriminant_analysis import LinearDiscriminantAnalysis\n", "from sklearn.svm import SVC\n", "from xgboost import XGBClassifier" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Check the shapes of x_train and y_train\n", "print(\"Shape of x1_train:\", x_train1.shape)\n", "print(\"Shape of x2_train:\", x_train2.shape)\n", "print(\"Shape of y_train:\", y_train.shape)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def train_and_evaluate_model(model, model_name, sensor_label, x_train, y_train, x_test, y_test):\n", " model.fit(x_train, y_train)\n", " y_pred = model.predict(x_test)\n", " accuracy = accuracy_score(y_test, y_pred) * 100\n", " return {\n", " \"model\": model_name,\n", " \"sensor\": sensor_label,\n", " \"accuracy\": accuracy\n", " }" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define models for sensor1\n", "models_sensor1 = {\n", " # \"Random Forest\": RandomForestClassifier(),\n", " # \"Bagged Trees\": BaggingClassifier(estimator=DecisionTreeClassifier(), n_estimators=10),\n", " # \"Decision Tree\": DecisionTreeClassifier(),\n", " # \"KNN\": KNeighborsClassifier(),\n", " # \"LDA\": LinearDiscriminantAnalysis(),\n", " \"SVM\": SVC(),\n", " # \"XGBoost\": XGBClassifier()\n", "}\n", "\n", "results_sensor1 = []\n", "for name, model in models_sensor1.items():\n", " res = train_and_evaluate_model(model, name, \"sensor1\", x_train1, y_train, x_test1, y_test)\n", " results_sensor1.append(res)\n", " print(f\"{name} on sensor1: Accuracy = {res['accuracy']:.2f}%\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "models_sensor2 = {\n", " # \"Random Forest\": RandomForestClassifier(),\n", " # \"Bagged Trees\": BaggingClassifier(estimator=DecisionTreeClassifier(), n_estimators=10),\n", " # \"Decision Tree\": DecisionTreeClassifier(),\n", " # \"KNN\": KNeighborsClassifier(),\n", " # \"LDA\": LinearDiscriminantAnalysis(),\n", " \"SVM\": SVC(),\n", " # \"XGBoost\": XGBClassifier()\n", "}\n", "\n", "results_sensor2 = []\n", "for name, model in models_sensor2.items():\n", " res = train_and_evaluate_model(model, name, \"sensor2\", x_train2, y_train, x_test2, y_test)\n", " results_sensor2.append(res)\n", " print(f\"{name} on sensor2: Accuracy = {res['accuracy']:.2f}%\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "all_results = {\n", " \"sensor1\": results_sensor1,\n", " \"sensor2\": results_sensor2\n", "}\n", "\n", "print(all_results)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "\n", "def prepare_plot_data(results_dict):\n", " # Gather unique model names\n", " models_set = {entry['model'] for sensor in results_dict.values() for entry in sensor}\n", " models = sorted(list(models_set))\n", " \n", " # Create dictionaries mapping sensor -> accuracy list ordered by model name\n", " sensor_accuracies = {}\n", " for sensor, entries in results_dict.items():\n", " # Build a mapping: model -> accuracy for the given sensor\n", " mapping = {entry['model']: entry['accuracy'] for entry in entries}\n", " # Order the accuracies consistent with the sorted model names\n", " sensor_accuracies[sensor] = [mapping.get(model, 0) for model in models]\n", " \n", " return models, sensor_accuracies\n", "\n", "def plot_accuracies(models, sensor_accuracies):\n", " bar_width = 0.35\n", " x = np.arange(len(models))\n", " sensors = list(sensor_accuracies.keys())\n", " \n", " plt.figure(figsize=(10, 6))\n", " # Assume two sensors for plotting grouped bars\n", " plt.bar(x - bar_width/2, sensor_accuracies[sensors[0]], width=bar_width, color='blue', label=sensors[0])\n", " plt.bar(x + bar_width/2, sensor_accuracies[sensors[1]], width=bar_width, color='orange', label=sensors[1])\n", " \n", " # Add text labels on top of bars\n", " for i, (a1, a2) in enumerate(zip(sensor_accuracies[sensors[0]], sensor_accuracies[sensors[1]])):\n", " plt.text(x[i] - bar_width/2, a1 + 0.1, f\"{a1:.2f}%\", ha='center', va='bottom', color='black')\n", " plt.text(x[i] + bar_width/2, a2 + 0.1, f\"{a2:.2f}%\", ha='center', va='bottom', color='black')\n", " \n", " plt.xlabel('Model Name')\n", " plt.ylabel('Accuracy (%)')\n", " plt.title('Accuracy of Classifiers for Each Sensor')\n", " plt.xticks(x, models)\n", " plt.legend()\n", " plt.ylim(0, 105)\n", " plt.tight_layout()\n", " plt.show()\n", "\n", "# Use the functions\n", "models, sensor_accuracies = prepare_plot_data(all_results)\n", "plot_accuracies(models, sensor_accuracies)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "import os\n", "import matplotlib.pyplot as plt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from src.ml.model_selection import create_ready_data\n", "\n", "X1b, y = create_ready_data('D:/thesis/data/converted/raw_B/sensor1')\n", "X2b, y = create_ready_data('D:/thesis/data/converted/raw_B/sensor2')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y.shape" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.metrics import accuracy_score, classification_report\n", "# 4. Validate on Dataset B\n", "y_pred_svm = svm_model.predict(X1b)\n", "\n", "# 5. Evaluate\n", "print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm))\n", "print(classification_report(y, y_pred_svm))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.metrics import accuracy_score, classification_report\n", "# 4. Validate on Dataset B\n", "y_pred = rf_model2.predict(X2b)\n", "\n", "# 5. Evaluate\n", "print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred))\n", "print(classification_report(y, y_pred))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y_predict = svm_model2.predict(X2b.iloc[[5312],:])\n", "print(y_predict)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "y[5312]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Confusion Matrix" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n", "\n", "\n", "cm = confusion_matrix(y, y_pred_svm) # -> ndarray\n", "\n", "# get the class labels\n", "labels = svm_model.classes_\n", "\n", "# Plot\n", "disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)\n", "disp.plot(cmap=plt.cm.Blues) # You can change colormap\n", "plt.title(\"SVM Sensor1 CM Train w/ Dataset A Val w/ Dataset B\")\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Self-test CM" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 1. Predict sensor 1 on Dataset A\n", "y_train_pred = svm_model.predict(x_train1)\n", "\n", "# 2. Import confusion matrix tools\n", "from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n", "import matplotlib.pyplot as plt\n", "\n", "# 3. Create and plot confusion matrix\n", "cm_train = confusion_matrix(y_train, y_train_pred)\n", "labels = svm_model.classes_\n", "\n", "disp = ConfusionMatrixDisplay(confusion_matrix=cm_train, display_labels=labels)\n", "disp.plot(cmap=plt.cm.Blues)\n", "plt.title(\"Confusion Matrix: Train & Test on Dataset A\")\n", "plt.show()\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.8" } }, "nbformat": 4, "nbformat_minor": 2 }