Files
thesis/code/notebooks/stft.ipynb
nuluh a93adc8af3 feat(notebooks): minimize stft.ipynb notebooks and add STFT data preview plot.
- Consolidated import statements for pandas and matplotlib.
- Updated STFT plotting for Sensor 1 and Sensor 2 datasets with improved visualization using pcolormesh.
- Enhanced subplot organization for better clarity in visual representation.
- Added titles and adjusted layout for all plots.
2025-06-30 01:36:44 +07:00

928 lines
30 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sensor1 = pd.read_csv('D:/thesis/data/converted/raw/DAMAGE_0/DAMAGE_0_TEST1_01.csv',sep=',')\n",
"sensor2 = pd.read_csv('D:/thesis/data/converted/raw/DAMAGE_0/DAMAGE_0_TEST1_02.csv',sep=',')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sensor1.columns"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df1 = pd.DataFrame()\n",
"df1['s1'] = sensor1[sensor1.columns[-1]]\n",
"df1['s2'] = sensor2[sensor2.columns[-1]]\n",
"df1\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def merge_two_sensors(damage_path, damage):\n",
" df = pd.DataFrame()\n",
" for file in os.listdir(damage_path):\n",
" pattern = re.compile(r'DAMAGE_\\d+_TEST\\d+_\\d{2}\\.csv')\n",
" try:\n",
" assert pattern.match(file), f\"File {file} does not match the required format, skipping...\"\n",
" # assert \"TEST01\" in file, f\"File {file} does not contain 'TEST01', skipping...\" #TODO: should be trained using the whole test file\n",
" print(f\"Processing file: {file}\")\n",
" # Append the full path of the file to sensor1 or sensor2 based on the filename\n",
" if file.endswith('_01.csv'):\n",
" df['sensor 1'] = pd.read_csv(os.path.join('D:/thesis/data/converted/raw', damage, file), sep=',', usecols=[1])\n",
" elif file.endswith('_02.csv'):\n",
" df['sensor 2'] = pd.read_csv(os.path.join('D:/thesis/data/converted/raw', damage, file), sep=',', usecols=[1])\n",
" except AssertionError as e:\n",
" print(e)\n",
" continue # Skip to the next iteration\n",
" return df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import re\n",
"\n",
"df = []\n",
"for damage in os.listdir('D:/thesis/data/converted/raw'):\n",
" damage_path = os.path.join('D:/thesis/data/converted/raw', damage)\n",
" df.append(merge_two_sensors(damage_path, damage))\n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"len(df)\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Combined Plot for sensor 1 and sensor 2 from data1 file in which motor is operated at 800 rpm\n",
"\n",
"plt.plot(df1['s2'], label='Sensor 1', color='C1', alpha=0.6)\n",
"plt.plot(df1['s1'], label='Sensor 2', color='C0', alpha=0.6)\n",
"plt.xlabel(\"Number of samples\")\n",
"plt.ylabel(\"Amplitude\")\n",
"plt.title(\"Raw vibration signal\")\n",
"plt.ylim(-7.5, 5)\n",
"plt.legend()\n",
"plt.locator_params(axis='x', nbins=8)\n",
"plt.ylim(-1, 1) # Adjust range as needed\n",
"plt.grid(True, linestyle='--', alpha=0.5)\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"signal_sensor1_test1 = []\n",
"signal_sensor2_test1 = []\n",
"\n",
"for data in df:\n",
" if not data.empty and 'sensor 1' in data.columns and 'sensor 2' in data.columns:\n",
" signal_sensor1_test1.append(data['sensor 1'].values)\n",
" signal_sensor2_test1.append(data['sensor 2'].values)\n",
"\n",
"print(len(signal_sensor1_test1))\n",
"print(len(signal_sensor2_test1))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Applying Short-Time Fourier Transform (STFT)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"os.getcwd()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import pandas as pd\n",
"import numpy as np\n",
"from scipy.signal import stft, hann\n",
"# from multiprocessing import Pool\n",
"\n",
"# Function to compute and append STFT data\n",
"def process_stft(args):\n",
" # Define STFT parameters\n",
" window_size = 1024\n",
" hop_size = 512\n",
" window = hann(window_size)\n",
"\n",
" Fs = 1024 # Sampling frequency in Hz\n",
" \n",
" damage_num, test_num, sensor_suffix = args\n",
" sensor_name = active_sensors[sensor_suffix]\n",
" sensor_num = sensor_suffix[-1] # '1' or '2'\n",
" \n",
" # Construct the file path\n",
" file_name = f'DAMAGE_{damage_num}_TEST{test_num}_{sensor_suffix}.csv'\n",
" file_path = os.path.join(damage_base_path, f'DAMAGE_{damage_num}', file_name)\n",
" \n",
" # Check if the file exists\n",
" if not os.path.isfile(file_path):\n",
" print(f\"File {file_path} does not exist. Skipping...\")\n",
" return\n",
" \n",
" # Read the CSV\n",
" try:\n",
" df = pd.read_csv(file_path)\n",
" except Exception as e:\n",
" print(f\"Error reading {file_path}: {e}. Skipping...\")\n",
" return\n",
" \n",
" # Ensure the CSV has exactly two columns\n",
" if df.shape[1] != 2:\n",
" print(f\"Unexpected number of columns in {file_path}. Skipping...\")\n",
" return\n",
" \n",
" # Extract sensor data\n",
" sensor_column = df.columns[1]\n",
" sensor_data = df[sensor_column].values\n",
" \n",
" # Compute STFT\n",
" frequencies, times, Zxx = stft(sensor_data, fs=Fs, window=window, nperseg=window_size, noverlap=window_size - hop_size)\n",
" magnitude = np.abs(Zxx)\n",
" df_stft = pd.DataFrame(magnitude, index=frequencies, columns=times).T\n",
" df_stft.columns = [f\"Freq_{i}\" for i in frequencies]\n",
" \n",
" # Define the output CSV file path\n",
" stft_file_name = f'stft_data{sensor_num}_{damage_num}.csv'\n",
" sensor_output_dir = os.path.join(damage_base_path, sensor_name.lower())\n",
" os.makedirs(sensor_output_dir, exist_ok=True)\n",
" stft_file_path = os.path.join(sensor_output_dir, stft_file_name)\n",
" # Append the flattened STFT to the CSV\n",
" try:\n",
" if not os.path.isfile(stft_file_path):\n",
" # Create a new CSV\n",
" df_stft.to_csv(stft_file_path, index=False, header=False)\n",
" else:\n",
" # Append to existing CSV\n",
" df_stft.to_csv(stft_file_path, mode='a', index=False, header=False)\n",
" print(f\"Appended STFT data to {stft_file_path}\")\n",
" except Exception as e:\n",
" print(f\"Error writing to {stft_file_path}: {e}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define the base path where DAMAGE_X folders are located\n",
"damage_base_path = 'D:/thesis/data/converted/raw/'\n",
"\n",
"# Define active sensors\n",
"active_sensors = {\n",
" '01': 'sensor1', # Beginning map sensor\n",
" '02': 'sensor2' # End map sensor\n",
"}\n",
"\n",
"# Define damage cases and test runs\n",
"damage_cases = range(1, 7) # Adjust based on actual number of damage cases\n",
"test_runs = range(1, 6) # TEST01 to TEST05\n",
"args_list = []\n",
"\n",
"# Prepare the list of arguments for parallel processing\n",
"for damage_num in damage_cases:\n",
" for test_num in test_runs:\n",
" for sensor_suffix in active_sensors.keys():\n",
" args_list.append((damage_num, test_num, sensor_suffix))\n",
"\n",
"print(len(args_list))\n",
"args_list"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Process STFTs sequentially instead of in parallel\n",
"if __name__ == \"__main__\":\n",
" print(f\"Starting sequential STFT processing...\")\n",
" for i, arg in enumerate(args_list, 1):\n",
" process_stft(arg)\n",
" print(f\"Processed {i}/{len(args_list)} files\")\n",
" print(\"STFT processing completed.\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from scipy.signal import stft, hann\n",
"\n",
"# Applying STFT\n",
"vibration_data = df1['s1'].values # Using sensor 1 data for STFT\n",
"window_size = 1024\n",
"hop_size = 512\n",
"window = hann(window_size) # Creating a Hanning window\n",
"Fs = 1024\n",
"\n",
"frequencies, times, Zxx = stft(vibration_data, \n",
" fs=Fs, \n",
" window=window, \n",
" nperseg=window_size, \n",
" noverlap=window_size - hop_size)\n",
"# Plotting the STFT Data\n",
"plt.pcolormesh(times, frequencies, np.abs(Zxx), cmap='jet', vmax=0.03, vmin=0.0)\n",
"# plt.ylabel(f'Frequency [Hz]')\n",
"# plt.xlabel(f'Time [sec]')\n",
"plt.show()\n",
"\n",
"# get current y ticks in list\n",
"print(len(frequencies))\n",
"print(len(times))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Loading STFT Data from CSV Files"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import pandas as pd\n",
"import matplotlib.pyplot as plt\n",
"os.listdir('D:/thesis/data/working')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ready_data1a = []\n",
"for file in os.listdir('D:/thesis/data/converted/raw/sensor1'):\n",
" ready_data1a.append(pd.read_csv(os.path.join('D:/thesis/data/converted/raw/sensor1', file), skiprows=1))\n",
"# colormesh give title x is frequency and y is time and rotate/transpose the data\n",
"# Plotting the STFT Data\n",
"plt.figure(dpi=300) # Set figure size and DPI\n",
"plt.pcolormesh(ready_data1a[0].transpose(), cmap='jet', vmax=0.03, vmin=0.0)\n",
"plt.title('STFT of Sensor 1 Dataset A Label 0 Undamaged')\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ready_data2a = []\n",
"for file in os.listdir('D:/thesis/data/converted/raw/sensor2'):\n",
" ready_data2a.append(pd.read_csv(os.path.join('D:/thesis/data/converted/raw/sensor2', file), skiprows=1))\n",
"\n",
"plt.figure(dpi=300) # Set figure size and DPI\n",
"plt.pcolormesh(ready_data2a[0].transpose(), cmap='jet', vmax=0.03, vmin=0.0)\n",
"plt.title('STFT of Sensor 2 Dataset A Label 0 Undamaged')\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from cmcrameri import cm\n",
"# Create a figure and subplots\n",
"fig, axes = plt.subplots(2, 3, figsize=(15, 8), sharex=True, sharey=True)\n",
"\n",
"# Flatten the axes array for easier iteration\n",
"axes = axes.flatten()\n",
"\n",
"# Loop through each subplot and plot the data\n",
"for i in range(6):\n",
" pcm = axes[i].pcolormesh(ready_data1a[i+1].transpose(), cmap='jet', vmax=0.03, vmin=0.0)\n",
" axes[i].set_title(f'Label {i+1}', fontsize=12)\n",
"\n",
"# Add a single color bar for all subplots\n",
"# Use the first `pcolormesh` object (or any valid one) for the color bar\n",
"cbar = fig.colorbar(pcm, ax=axes, orientation='vertical')\n",
"# cbar.set_label('Magnitude')\n",
"\n",
"# Set shared labels\n",
"fig.text(0.5, 0.04, 'Time Frames', ha='center', fontsize=12)\n",
"fig.text(0.04, 0.5, 'Frequency [Hz]', va='center', rotation='vertical', fontsize=12)\n",
"fig.suptitle('STFT of Sensor 1 Dataset A', fontsize=16)\n",
"\n",
"# Adjust layout\n",
"# plt.tight_layout(rect=[0.05, 0.05, 1, 1]) # Leave space for shared labels\n",
"plt.subplots_adjust(left=0.1, right=0.75, top=0.9, bottom=0.1, wspace=0.2, hspace=0.2)\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from cmcrameri import cm\n",
"# Create a figure and subplots\n",
"fig, axes = plt.subplots(2, 3, figsize=(15, 8), sharex=True, sharey=True)\n",
"\n",
"# Flatten the axes array for easier iteration\n",
"axes = axes.flatten()\n",
"\n",
"# Loop through each subplot and plot the data\n",
"for i in range(6):\n",
" pcm = axes[i].pcolormesh(ready_data2a[i+1].transpose(), cmap='jet', vmax=0.03, vmin=0.0)\n",
" axes[i].set_title(f'Label {i+1}', fontsize=12)\n",
"\n",
"# Add a single color bar for all subplots\n",
"# Use the first `pcolormesh` object (or any valid one) for the color bar\n",
"cbar = fig.colorbar(pcm, ax=axes, orientation='vertical')\n",
"# cbar.set_label('Magnitude')\n",
"\n",
"# Set shared labels\n",
"fig.text(0.5, 0.04, 'Time Frames', ha='center', fontsize=12)\n",
"fig.text(0.04, 0.5, 'Frequency [Hz]', va='center', rotation='vertical', fontsize=12)\n",
"fig.suptitle('STFT of Sensor 2 Dataset A', fontsize=16)\n",
"\n",
"# Adjust layout\n",
"# plt.tight_layout(rect=[0.05, 0.05, 1, 1]) # Leave space for shared labels\n",
"plt.subplots_adjust(left=0.1, right=0.75, top=0.9, bottom=0.1, wspace=0.2, hspace=0.2)\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ready_data1b = []\n",
"for file in os.listdir('D:/thesis/data/converted/raw_B/sensor1'):\n",
" ready_data1b.append(pd.read_csv(os.path.join('D:/thesis/data/converted/raw_B/sensor1', file), skiprows=1))\n",
"\n",
"plt.figure(dpi=300) # Set figure size and DPI\n",
"plt.pcolormesh(ready_data1b[0].transpose(), cmap='jet', vmax=0.03, vmin=0.0)\n",
"plt.title('STFT of Sensor 1 Dataset B Label 0 Undamaged')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ready_data2b = []\n",
"for file in os.listdir('D:/thesis/data/converted/raw_B/sensor2'):\n",
" ready_data2b.append(pd.read_csv(os.path.join('D:/thesis/data/converted/raw_B/sensor2', file), skiprows=1))\n",
"\n",
"plt.figure(dpi=300) # Set figure size and DPI\n",
"plt.pcolormesh(ready_data2b[0].transpose(), cmap='jet', vmax=0.03, vmin=0.0)\n",
"plt.title('STFT of Sensor 2 Dataset B Label 0 Undamaged')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from cmcrameri import cm\n",
"# Create a figure and subplots\n",
"fig, axes = plt.subplots(2, 3, figsize=(15, 8), sharex=True, sharey=True)\n",
"\n",
"# Flatten the axes array for easier iteration\n",
"axes = axes.flatten()\n",
"\n",
"# Loop through each subplot and plot the data\n",
"for i in range(6):\n",
" pcm = axes[i].pcolormesh(ready_data1b[i+1].transpose(), cmap='jet', vmax=0.03, vmin=0.0)\n",
" axes[i].set_title(f'Label {i+1}', fontsize=12)\n",
"\n",
"# Add a single color bar for all subplots\n",
"# Use the first `pcolormesh` object (or any valid one) for the color bar\n",
"cbar = fig.colorbar(pcm, ax=axes, orientation='vertical')\n",
"# cbar.set_label('Magnitude')\n",
"\n",
"# Set shared labels\n",
"fig.text(0.5, 0.04, 'Time Frames', ha='center', fontsize=12)\n",
"fig.text(0.04, 0.5, 'Frequency [Hz]', va='center', rotation='vertical', fontsize=12)\n",
"fig.suptitle('STFT of Sensor 1 Dataset B', fontsize=16)\n",
"\n",
"# Adjust layout\n",
"# plt.tight_layout(rect=[0.05, 0.05, 1, 1]) # Leave space for shared labels\n",
"plt.subplots_adjust(left=0.1, right=0.75, top=0.9, bottom=0.1, wspace=0.2, hspace=0.2)\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from cmcrameri import cm\n",
"# Create a figure and subplots\n",
"fig, axes = plt.subplots(2, 3, figsize=(15, 8), sharex=True, sharey=True)\n",
"\n",
"# Flatten the axes array for easier iteration\n",
"axes = axes.flatten()\n",
"\n",
"# Loop through each subplot and plot the data\n",
"for i in range(6):\n",
" pcm = axes[i].pcolormesh(ready_data2b[i+1].transpose(), cmap='jet', vmax=0.03, vmin=0.0)\n",
" axes[i].set_title(f'Label {i+1}', fontsize=12)\n",
"\n",
"# Add a single color bar for all subplots\n",
"# Use the first `pcolormesh` object (or any valid one) for the color bar\n",
"cbar = fig.colorbar(pcm, ax=axes, orientation='vertical')\n",
"# cbar.set_label('Magnitude')\n",
"\n",
"# Set shared labels\n",
"fig.text(0.5, 0.04, 'Time Frames', ha='center', fontsize=12)\n",
"fig.text(0.04, 0.5, 'Frequency [Hz]', va='center', rotation='vertical', fontsize=12)\n",
"fig.suptitle('STFT of Sensor 2 Dataset B', fontsize=16)\n",
"\n",
"# Adjust layout\n",
"# plt.tight_layout(rect=[0.05, 0.05, 1, 1]) # Leave space for shared labels\n",
"plt.subplots_adjust(left=0.1, right=0.75, top=0.9, bottom=0.1, wspace=0.2, hspace=0.2)\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(len(ready_data1a))\n",
"print(len(ready_data2a))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from src.ml.model_selection import create_ready_data\n",
"\n",
"X1a, y = create_ready_data('D:/thesis/data/converted/raw/sensor1')\n",
"X2a, y = create_ready_data('D:/thesis/data/converted/raw/sensor2')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# X1a.iloc[-1,:]\n",
"y[2564]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.model_selection import train_test_split\n",
"\n",
"# sensor A\n",
"x_train1, x_test1, y_train, y_test = train_test_split(X1a, y, test_size=0.2, random_state=2)\n",
"# sensor B\n",
"x_train2, x_test2, y_train, y_test = train_test_split(X2a, y, test_size=0.2, random_state=2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Check the shapes of x_train and y_train\n",
"print(\"Shape of x1_train:\", x_train1.shape)\n",
"print(\"Shape of x2_train:\", x_train2.shape)\n",
"print(\"Shape of y_train:\", y_train.shape)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from src.ml.model_selection import train_and_evaluate_model\n",
"from sklearn.svm import SVC\n",
"from sklearn.pipeline import make_pipeline\n",
"from sklearn.preprocessing import StandardScaler\n",
"from sklearn.svm import SVC\n",
"from sklearn.decomposition import PCA\n",
"from xgboost import XGBClassifier\n",
"from sklearn.ensemble import RandomForestClassifier, BaggingClassifier\n",
"from sklearn.tree import DecisionTreeClassifier\n",
"from sklearn.neighbors import KNeighborsClassifier\n",
"from sklearn.discriminant_analysis import LinearDiscriminantAnalysis\n",
"\n",
"# Define models for sensor1\n",
"models_sensor1 = {\n",
" # \"Random Forest\": RandomForestClassifier(),\n",
" # \"Bagged Trees\": BaggingClassifier(estimator=DecisionTreeClassifier(), n_estimators=10),\n",
" # \"Decision Tree\": DecisionTreeClassifier(),\n",
" # \"KNN\": KNeighborsClassifier(),\n",
" # \"LDA\": LinearDiscriminantAnalysis(),\n",
" \"SVM\": SVC(),\n",
" \"SVM with StandardScaler and PCA\": make_pipeline(\n",
" StandardScaler(),\n",
" PCA(n_components=10),\n",
" SVC(kernel='rbf')\n",
" ),\n",
"\n",
" # \"XGBoost\": XGBClassifier()\n",
"}\n",
"\n",
"results_sensor1 = []\n",
"for name, model in models_sensor1.items():\n",
" res = train_and_evaluate_model(model, name, \"Sensor A\", x_train1, y_train, x_test1, y_test, export='D:/thesis/models/Sensor A')\n",
" results_sensor1.append(res)\n",
" print(f\"{name} on sensor1: Accuracy = {res['accuracy']:.2f}%\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from src.ml.model_selection import plot_confusion_matrix\n",
"\n",
"# Plot confusion matrix for sensor1\n",
"plot_confusion_matrix(results_sensor1, y_test, \"Confusion Matrix of Sensor A Validation Dataset A\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"models_sensor2 = {\n",
" # \"Random Forest\": RandomForestClassifier(),\n",
" # \"Bagged Trees\": BaggingClassifier(estimator=DecisionTreeClassifier(), n_estimators=10),\n",
" # \"Decision Tree\": DecisionTreeClassifier(),\n",
" # \"KNN\": KNeighborsClassifier(),\n",
" # \"LDA\": LinearDiscriminantAnalysis(),\n",
" \"SVM\": SVC(),\n",
" \"SVM with StandardScaler and PCA\": make_pipeline(\n",
" StandardScaler(),\n",
" PCA(n_components=10),\n",
" SVC(kernel='rbf')\n",
" ),\n",
" \"XGBoost\": XGBClassifier()\n",
"}\n",
"\n",
"results_sensor2 = []\n",
"for name, model in models_sensor2.items():\n",
" res = train_and_evaluate_model(model, name, \"sensor2\", x_train2, y_train, x_test2, y_test, export='D:/thesis/models/sensor2')\n",
" results_sensor2.append(res)\n",
" print(f\"{name} on sensor2: Accuracy = {res['accuracy']:.2f}%\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"all_results = {\n",
" \"sensor1\": results_sensor1,\n",
" \"sensor2\": results_sensor2\n",
"}\n",
"\n",
"print(all_results)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"\n",
"def prepare_plot_data(results_dict):\n",
" # Gather unique model names\n",
" models_set = {entry['model'] for sensor in results_dict.values() for entry in sensor}\n",
" models = sorted(list(models_set))\n",
" \n",
" # Create dictionaries mapping sensor -> accuracy list ordered by model name\n",
" sensor_accuracies = {}\n",
" for sensor, entries in results_dict.items():\n",
" # Build a mapping: model -> accuracy for the given sensor\n",
" mapping = {entry['model']: entry['accuracy'] for entry in entries}\n",
" # Order the accuracies consistent with the sorted model names\n",
" sensor_accuracies[sensor] = [mapping.get(model, 0) for model in models]\n",
" \n",
" return models, sensor_accuracies\n",
"\n",
"def plot_accuracies(models, sensor_accuracies):\n",
" bar_width = 0.35\n",
" x = np.arange(len(models))\n",
" sensors = list(sensor_accuracies.keys())\n",
" \n",
" plt.figure(figsize=(10, 6))\n",
" # Assume two sensors for plotting grouped bars\n",
" plt.bar(x - bar_width/2, sensor_accuracies[sensors[0]], width=bar_width, color='blue', label=sensors[0])\n",
" plt.bar(x + bar_width/2, sensor_accuracies[sensors[1]], width=bar_width, color='orange', label=sensors[1])\n",
" \n",
" # Add text labels on top of bars\n",
" for i, (a1, a2) in enumerate(zip(sensor_accuracies[sensors[0]], sensor_accuracies[sensors[1]])):\n",
" plt.text(x[i] - bar_width/2, a1 + 0.1, f\"{a1:.2f}%\", ha='center', va='bottom', color='black')\n",
" plt.text(x[i] + bar_width/2, a2 + 0.1, f\"{a2:.2f}%\", ha='center', va='bottom', color='black')\n",
" \n",
" plt.xlabel('Model Name')\n",
" plt.ylabel('Accuracy (%)')\n",
" plt.title('Accuracy of Classifiers for Each Sensor')\n",
" plt.xticks(x, models)\n",
" plt.legend()\n",
" plt.ylim(0, 105)\n",
" plt.tight_layout()\n",
" plt.show()\n",
"\n",
"# Use the functions\n",
"models, sensor_accuracies = prepare_plot_data(all_results)\n",
"plot_accuracies(models, sensor_accuracies)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"import os\n",
"import matplotlib.pyplot as plt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Inference"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from src.ml.model_selection import create_ready_data\n",
"\n",
"X1b, y = create_ready_data('D:/thesis/data/converted/raw_B/sensor1')\n",
"X2b, y = create_ready_data('D:/thesis/data/converted/raw_B/sensor2')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"y.shape"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.metrics import accuracy_score, classification_report\n",
"# 4. Validate on Dataset B\n",
"from joblib import load\n",
"svm_model = load('D:/thesis/models/sensor1/SVM.joblib')\n",
"# svm_model = load('D:/thesis/models/sensor1/SVM with StandardScaler and PCA.joblib')\n",
"y_pred_svm = svm_model.predict(X1b)\n",
"\n",
"# 5. Evaluate\n",
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm))\n",
"print(classification_report(y, y_pred_svm))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Model sensor 1 to predict sensor 2 data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.metrics import accuracy_score, classification_report\n",
"# 4. Validate on Dataset B\n",
"from joblib import load\n",
"svm_model = load('D:/thesis/models/sensor1/SVM.joblib')\n",
"y_pred_svm = svm_model.predict(X2b)\n",
"\n",
"# 5. Evaluate\n",
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm))\n",
"print(classification_report(y, y_pred_svm))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.metrics import accuracy_score, classification_report\n",
"# 4. Validate on Dataset B\n",
"y_pred = rf_model2.predict(X2b)\n",
"\n",
"# 5. Evaluate\n",
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred))\n",
"print(classification_report(y, y_pred))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"y_predict = svm_model2.predict(X2b.iloc[[5312],:])\n",
"print(y_predict)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"y[5312]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Confusion Matrix"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n",
"\n",
"\n",
"cm = confusion_matrix(y, y_pred_svm) # -> ndarray\n",
"\n",
"# get the class labels\n",
"labels = svm_model.classes_\n",
"\n",
"# Plot\n",
"disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)\n",
"disp.plot(cmap=plt.cm.Blues) # You can change colormap\n",
"plt.title(\"SVM Sensor1 CM Train w/ Dataset A Val w/ Dataset B from Sensor1 readings\")\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Self-test CM"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 1. Predict sensor 1 on Dataset A\n",
"y_test_pred = svm_model.predict(x_test1)\n",
"\n",
"# 2. Import confusion matrix tools\n",
"from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n",
"import matplotlib.pyplot as plt\n",
"\n",
"# 3. Create and plot confusion matrix\n",
"cm_train = confusion_matrix(y_test, y_test_pred)\n",
"labels = svm_model.classes_\n",
"\n",
"disp = ConfusionMatrixDisplay(confusion_matrix=cm_train, display_labels=labels)\n",
"disp.plot(cmap=plt.cm.Blues)\n",
"plt.title(\"Confusion Matrix: Train & Test on Dataset A\")\n",
"plt.show()\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}