1081 lines
32 KiB
Plaintext
1081 lines
32 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import pandas as pd\n",
|
|
"import numpy as np\n",
|
|
"import matplotlib.pyplot as plt"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"sensor1 = pd.read_csv('D:/thesis/data/converted/raw/DAMAGE_0/DAMAGE_0_TEST1_01.csv',sep=',')\n",
|
|
"sensor2 = pd.read_csv('D:/thesis/data/converted/raw/DAMAGE_0/DAMAGE_0_TEST1_02.csv',sep=',')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"sensor1.columns"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"df1 = pd.DataFrame()\n",
|
|
"df1['s1'] = sensor1[sensor1.columns[-1]]\n",
|
|
"df1['s2'] = sensor2[sensor2.columns[-1]]\n",
|
|
"df1\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"def merge_two_sensors(damage_path, damage):\n",
|
|
" df = pd.DataFrame()\n",
|
|
" for file in os.listdir(damage_path):\n",
|
|
" pattern = re.compile(r'DAMAGE_\\d+_TEST\\d+_\\d{2}\\.csv')\n",
|
|
" try:\n",
|
|
" assert pattern.match(file), f\"File {file} does not match the required format, skipping...\"\n",
|
|
" # assert \"TEST01\" in file, f\"File {file} does not contain 'TEST01', skipping...\" #TODO: should be trained using the whole test file\n",
|
|
" print(f\"Processing file: {file}\")\n",
|
|
" # Append the full path of the file to sensor1 or sensor2 based on the filename\n",
|
|
" if file.endswith('_01.csv'):\n",
|
|
" df['sensor 1'] = pd.read_csv(os.path.join('D:/thesis/data/converted/raw', damage, file), sep=',', usecols=[1])\n",
|
|
" elif file.endswith('_02.csv'):\n",
|
|
" df['sensor 2'] = pd.read_csv(os.path.join('D:/thesis/data/converted/raw', damage, file), sep=',', usecols=[1])\n",
|
|
" except AssertionError as e:\n",
|
|
" print(e)\n",
|
|
" continue # Skip to the next iteration\n",
|
|
" return df"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import os\n",
|
|
"import re\n",
|
|
"\n",
|
|
"df = []\n",
|
|
"for damage in os.listdir('D:/thesis/data/converted/raw'):\n",
|
|
" damage_path = os.path.join('D:/thesis/data/converted/raw', damage)\n",
|
|
" df.append(merge_two_sensors(damage_path, damage))\n",
|
|
" "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"len(df)\n",
|
|
"df"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Combined Plot for sensor 1 and sensor 2 from data1 file in which motor is operated at 800 rpm\n",
|
|
"\n",
|
|
"plt.plot(df1['s2'], label='Sensor 1', color='C1', alpha=0.6)\n",
|
|
"plt.plot(df1['s1'], label='Sensor 2', color='C0', alpha=0.6)\n",
|
|
"plt.xlabel(\"Number of samples\")\n",
|
|
"plt.ylabel(\"Amplitude\")\n",
|
|
"plt.title(\"Raw vibration signal\")\n",
|
|
"plt.ylim(-7.5, 5)\n",
|
|
"plt.legend()\n",
|
|
"plt.locator_params(axis='x', nbins=8)\n",
|
|
"plt.ylim(-1, 1) # Adjust range as needed\n",
|
|
"plt.grid(True, linestyle='--', alpha=0.5)\n",
|
|
"plt.show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"signal_sensor1_test1 = []\n",
|
|
"signal_sensor2_test1 = []\n",
|
|
"\n",
|
|
"for data in df:\n",
|
|
" if not data.empty and 'sensor 1' in data.columns and 'sensor 2' in data.columns:\n",
|
|
" signal_sensor1_test1.append(data['sensor 1'].values)\n",
|
|
" signal_sensor2_test1.append(data['sensor 2'].values)\n",
|
|
"\n",
|
|
"print(len(signal_sensor1_test1))\n",
|
|
"print(len(signal_sensor2_test1))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Applying Short-Time Fourier Transform (STFT)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"os.getcwd()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import os\n",
|
|
"import pandas as pd\n",
|
|
"import numpy as np\n",
|
|
"from scipy.signal import stft, hann\n",
|
|
"# from multiprocessing import Pool\n",
|
|
"\n",
|
|
"# Function to compute and append STFT data\n",
|
|
"def process_stft(args):\n",
|
|
" # Define STFT parameters\n",
|
|
" window_size = 1024\n",
|
|
" hop_size = 512\n",
|
|
" window = hann(window_size)\n",
|
|
"\n",
|
|
" Fs = 1024 # Sampling frequency in Hz\n",
|
|
" \n",
|
|
" damage_num, test_num, sensor_suffix = args\n",
|
|
" sensor_name = active_sensors[sensor_suffix]\n",
|
|
" sensor_num = sensor_suffix[-1] # '1' or '2'\n",
|
|
" \n",
|
|
" # Construct the file path\n",
|
|
" file_name = f'DAMAGE_{damage_num}_TEST{test_num}_{sensor_suffix}.csv'\n",
|
|
" file_path = os.path.join(damage_base_path, f'DAMAGE_{damage_num}', file_name)\n",
|
|
" \n",
|
|
" # Check if the file exists\n",
|
|
" if not os.path.isfile(file_path):\n",
|
|
" print(f\"File {file_path} does not exist. Skipping...\")\n",
|
|
" return\n",
|
|
" \n",
|
|
" # Read the CSV\n",
|
|
" try:\n",
|
|
" df = pd.read_csv(file_path)\n",
|
|
" except Exception as e:\n",
|
|
" print(f\"Error reading {file_path}: {e}. Skipping...\")\n",
|
|
" return\n",
|
|
" \n",
|
|
" # Ensure the CSV has exactly two columns\n",
|
|
" if df.shape[1] != 2:\n",
|
|
" print(f\"Unexpected number of columns in {file_path}. Skipping...\")\n",
|
|
" return\n",
|
|
" \n",
|
|
" # Extract sensor data\n",
|
|
" sensor_column = df.columns[1]\n",
|
|
" sensor_data = df[sensor_column].values\n",
|
|
" \n",
|
|
" # Compute STFT\n",
|
|
" frequencies, times, Zxx = stft(sensor_data, fs=Fs, window=window, nperseg=window_size, noverlap=window_size - hop_size)\n",
|
|
" magnitude = np.abs(Zxx)\n",
|
|
" df_stft = pd.DataFrame(magnitude, index=frequencies, columns=times).T\n",
|
|
" df_stft.columns = [f\"Freq_{i}\" for i in frequencies]\n",
|
|
" \n",
|
|
" # Define the output CSV file path\n",
|
|
" stft_file_name = f'stft_data{sensor_num}_{damage_num}.csv'\n",
|
|
" sensor_output_dir = os.path.join(damage_base_path, sensor_name.lower())\n",
|
|
" os.makedirs(sensor_output_dir, exist_ok=True)\n",
|
|
" stft_file_path = os.path.join(sensor_output_dir, stft_file_name)\n",
|
|
" # Append the flattened STFT to the CSV\n",
|
|
" try:\n",
|
|
" if not os.path.isfile(stft_file_path):\n",
|
|
" # Create a new CSV\n",
|
|
" df_stft.to_csv(stft_file_path, index=False, header=False)\n",
|
|
" else:\n",
|
|
" # Append to existing CSV\n",
|
|
" df_stft.to_csv(stft_file_path, mode='a', index=False, header=False)\n",
|
|
" print(f\"Appended STFT data to {stft_file_path}\")\n",
|
|
" except Exception as e:\n",
|
|
" print(f\"Error writing to {stft_file_path}: {e}\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Define the base path where DAMAGE_X folders are located\n",
|
|
"damage_base_path = 'D:/thesis/data/converted/raw/'\n",
|
|
"\n",
|
|
"# Define active sensors\n",
|
|
"active_sensors = {\n",
|
|
" '01': 'sensor1', # Beginning map sensor\n",
|
|
" '02': 'sensor2' # End map sensor\n",
|
|
"}\n",
|
|
"\n",
|
|
"# Define damage cases and test runs\n",
|
|
"damage_cases = range(1, 7) # Adjust based on actual number of damage cases\n",
|
|
"test_runs = range(1, 6) # TEST01 to TEST05\n",
|
|
"args_list = []\n",
|
|
"\n",
|
|
"# Prepare the list of arguments for parallel processing\n",
|
|
"for damage_num in damage_cases:\n",
|
|
" for test_num in test_runs:\n",
|
|
" for sensor_suffix in active_sensors.keys():\n",
|
|
" args_list.append((damage_num, test_num, sensor_suffix))\n",
|
|
"\n",
|
|
"print(len(args_list))\n",
|
|
"args_list"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Process STFTs sequentially instead of in parallel\n",
|
|
"if __name__ == \"__main__\":\n",
|
|
" print(f\"Starting sequential STFT processing...\")\n",
|
|
" for i, arg in enumerate(args_list, 1):\n",
|
|
" process_stft(arg)\n",
|
|
" print(f\"Processed {i}/{len(args_list)} files\")\n",
|
|
" print(\"STFT processing completed.\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from scipy.signal import stft, hann\n",
|
|
"\n",
|
|
"# Applying STFT\n",
|
|
"vibration_data = df1['s1'].values # Using sensor 1 data for STFT\n",
|
|
"window_size = 1024\n",
|
|
"hop_size = 512\n",
|
|
"window = hann(window_size) # Creating a Hanning window\n",
|
|
"Fs = 1024\n",
|
|
"\n",
|
|
"frequencies, times, Zxx = stft(vibration_data, \n",
|
|
" fs=Fs, \n",
|
|
" window=window, \n",
|
|
" nperseg=window_size, \n",
|
|
" noverlap=window_size - hop_size)\n",
|
|
"# Plotting the STFT Data\n",
|
|
"plt.pcolormesh(times, frequencies, np.abs(Zxx), cmap='jet', vmax=0.03, vmin=0.0)\n",
|
|
"# plt.ylabel(f'Frequency [Hz]')\n",
|
|
"# plt.xlabel(f'Time [sec]')\n",
|
|
"plt.show()\n",
|
|
"\n",
|
|
"# get current y ticks in list\n",
|
|
"print(len(frequencies))\n",
|
|
"print(len(times))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Loading STFT Data from CSV Files"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import os\n",
|
|
"os.listdir('D:/thesis/data/working')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import pandas as pd\n",
|
|
"import matplotlib.pyplot as plt\n",
|
|
"ready_data1a = []\n",
|
|
"for file in os.listdir('D:/thesis/data/converted/raw/sensor1'):\n",
|
|
" ready_data1a.append(pd.read_csv(os.path.join('D:/thesis/data/converted/raw/sensor1', file), skiprows=1))\n",
|
|
"# colormesh give title x is frequency and y is time and rotate/transpose the data\n",
|
|
"# Plotting the STFT Data"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import numpy as np\n",
|
|
"import matplotlib.pyplot as plt\n",
|
|
"from mpl_toolkits.mplot3d import Axes3D\n",
|
|
"\n",
|
|
"# Assuming ready_data1a[0] is a DataFrame or 2D array\n",
|
|
"spectrogram_data = ready_data1a[0].values # Convert to NumPy array if it's a DataFrame\n",
|
|
"\n",
|
|
"# Get the dimensions of the spectrogram\n",
|
|
"num_frequencies, num_time_frames = spectrogram_data.shape\n",
|
|
"\n",
|
|
"# Create frequency and time arrays\n",
|
|
"frequencies = np.arange(num_frequencies) # Replace with actual frequency values if available\n",
|
|
"time_frames = np.arange(num_time_frames) # Replace with actual time values if available\n",
|
|
"\n",
|
|
"# Create a meshgrid for plotting\n",
|
|
"T, F = np.meshgrid(time_frames, frequencies)\n",
|
|
"\n",
|
|
"# Create a 3D plot\n",
|
|
"fig = plt.figure(figsize=(12, 8))\n",
|
|
"ax = fig.add_subplot(111, projection='3d')\n",
|
|
"\n",
|
|
"# Plot the surface\n",
|
|
"surf = ax.plot_surface(T, F, spectrogram_data, cmap='bwr', edgecolor='none')\n",
|
|
"\n",
|
|
"# Add labels and a color bar\n",
|
|
"ax.set_xlabel('Time Frames')\n",
|
|
"ax.set_ylabel('Frequency [Hz]')\n",
|
|
"ax.set_zlabel('Magnitude')\n",
|
|
"ax.set_title('3D Spectrogram')\n",
|
|
"# Resize the z-axis (shrink it)\n",
|
|
"z_min, z_max = 0, 0.1 # Replace with your desired range\n",
|
|
"ax.set_zlim(z_min, z_max)\n",
|
|
"ax.get_proj = lambda: np.dot(Axes3D.get_proj(ax), np.diag([1, 1, 0.5, 1])) # Shrink z-axis by 50%\n",
|
|
"ax.set_facecolor('white')\n",
|
|
"fig.colorbar(surf, ax=ax, shrink=0.5, aspect=10)\n",
|
|
"\n",
|
|
"# Show the plot\n",
|
|
"plt.show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# dpi\n",
|
|
"plt.figure(dpi=300) # Set figure size and DPI\n",
|
|
"plt.pcolormesh(ready_data1a[0].transpose(), cmap='jet', vmax=0.03, vmin=0.0)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from cmcrameri import cm\n",
|
|
"# Create a figure and subplots\n",
|
|
"fig, axes = plt.subplots(2, 3, figsize=(15, 8), sharex=True, sharey=True)\n",
|
|
"\n",
|
|
"# Flatten the axes array for easier iteration\n",
|
|
"axes = axes.flatten()\n",
|
|
"\n",
|
|
"# Loop through each subplot and plot the data\n",
|
|
"for i in range(6):\n",
|
|
" pcm = axes[i].pcolormesh(ready_data1a[i].transpose(), cmap='jet', vmax=0.03, vmin=0.0)\n",
|
|
" axes[i].set_title(f'Case {i} Sensor A', fontsize=12)\n",
|
|
"\n",
|
|
"# Add a single color bar for all subplots\n",
|
|
"# Use the first `pcolormesh` object (or any valid one) for the color bar\n",
|
|
"cbar = fig.colorbar(pcm, ax=axes, orientation='vertical')\n",
|
|
"# cbar.set_label('Magnitude')\n",
|
|
"\n",
|
|
"# Set shared labels\n",
|
|
"fig.text(0.5, 0.04, 'Time Frames', ha='center', fontsize=12)\n",
|
|
"fig.text(0.04, 0.5, 'Frequency [Hz]', va='center', rotation='vertical', fontsize=12)\n",
|
|
"\n",
|
|
"# Adjust layout\n",
|
|
"# plt.tight_layout(rect=[0.05, 0.05, 1, 1]) # Leave space for shared labels\n",
|
|
"plt.subplots_adjust(left=0.1, right=0.75, top=0.9, bottom=0.1, wspace=0.2, hspace=0.2)\n",
|
|
"\n",
|
|
"plt.show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"ready_data2a = []\n",
|
|
"for file in os.listdir('D:/thesis/data/converted/raw/sensor2'):\n",
|
|
" ready_data2a.append(pd.read_csv(os.path.join('D:/thesis/data/converted/raw/sensor2', file), skiprows=1))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# dpi\n",
|
|
"plt.figure(dpi=300) # Set figure size and DPI\n",
|
|
"plt.pcolormesh(ready_data2a[0].transpose(), cmap='jet', vmax=0.03, vmin=0.0)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"ready_data1b = []\n",
|
|
"for file in os.listdir('D:/thesis/data/converted/raw_B/sensor1'):\n",
|
|
" ready_data1b.append(pd.read_csv(os.path.join('D:/thesis/data/converted/raw_B/sensor1', file), skiprows=1))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# dpi\n",
|
|
"plt.figure(dpi=300) # Set figure size and DPI\n",
|
|
"plt.pcolormesh(ready_data1b[0].iloc[:22,:].transpose(), cmap='jet', vmax=0.03, vmin=0.0)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"len(ready_data1b[0])"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"print(len(ready_data1a))\n",
|
|
"print(len(ready_data2a))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"x1a = 0\n",
|
|
"print(type(ready_data1a[0]))\n",
|
|
"ready_data1a[0].iloc[:,0]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Checking length of the total array"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"x1a = 0\n",
|
|
"print(type(x1a))\n",
|
|
"for i in range(len(ready_data1a)):\n",
|
|
" print(type(ready_data1a[i].shape[0]))\n",
|
|
" x1a = x1a + ready_data1a[i].shape[0]\n",
|
|
" print(type(x1a))\n",
|
|
"\n",
|
|
"print(x1a)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"x2a = 0\n",
|
|
"\n",
|
|
"for i in range(len(ready_data2a)):\n",
|
|
" print(ready_data2a[i].shape)\n",
|
|
" x2a = x2a + ready_data2a[i].shape[0]\n",
|
|
"\n",
|
|
"print(x2a)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Flatten 6 array into one array"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Combine all dataframes in ready_data1a into a single dataframe\n",
|
|
"if ready_data1a: # Check if the list is not empty\n",
|
|
" # Use pandas concat function instead of iterative concatenation\n",
|
|
" combined_data = pd.concat(ready_data1a, axis=0, ignore_index=True)\n",
|
|
" \n",
|
|
" print(f\"Type of combined data: {type(combined_data)}\")\n",
|
|
" print(f\"Shape of combined data: {combined_data.shape}\")\n",
|
|
" \n",
|
|
" # Display the combined dataframe\n",
|
|
" combined_data\n",
|
|
"else:\n",
|
|
" print(\"No data available in ready_data1a list\")\n",
|
|
" combined_data = pd.DataFrame()\n",
|
|
"\n",
|
|
"# Store the result in x1a for compatibility with subsequent code\n",
|
|
"x1a = combined_data"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Combine all dataframes in ready_data1a into a single dataframe\n",
|
|
"if ready_data2a: # Check if the list is not empty\n",
|
|
" # Use pandas concat function instead of iterative concatenation\n",
|
|
" combined_data = pd.concat(ready_data2a, axis=0, ignore_index=True)\n",
|
|
" \n",
|
|
" print(f\"Type of combined data: {type(combined_data)}\")\n",
|
|
" print(f\"Shape of combined data: {combined_data.shape}\")\n",
|
|
" \n",
|
|
" # Display the combined dataframe\n",
|
|
" combined_data\n",
|
|
"else:\n",
|
|
" print(\"No data available in ready_data1a list\")\n",
|
|
" combined_data = pd.DataFrame()\n",
|
|
"\n",
|
|
"# Store the result in x1a for compatibility with subsequent code\n",
|
|
"x2a = combined_data"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Creating the label"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"y_1 = 0\n",
|
|
"y_2 = 1\n",
|
|
"y_3 = 2\n",
|
|
"y_4 = 3\n",
|
|
"y_5 = 4\n",
|
|
"y_6 = 5"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"y_data = [y_1, y_2, y_3, y_4, y_5, y_6]\n",
|
|
"y_data"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"for i in range(len(y_data)):\n",
|
|
" print(ready_data1a[i].shape[0])"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import numpy as np\n",
|
|
"for i in range(len(y_data)):\n",
|
|
" y_data[i] = [y_data[i]]*ready_data1a[i].shape[0]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"len(y_data[0])\n",
|
|
"# y_data"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"y = y_data[0]\n",
|
|
"\n",
|
|
"for i in range(len(y_data) - 1):\n",
|
|
" #print(i)\n",
|
|
" y = np.concatenate((y, y_data[i+1]), axis=0)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"print(y.shape)\n",
|
|
"print(np.unique(y))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from src.ml.model_selection import create_ready_data\n",
|
|
"\n",
|
|
"X1a, y = create_ready_data('D:/thesis/data/converted/raw/sensor1')\n",
|
|
"X2a, y = create_ready_data('D:/thesis/data/converted/raw/sensor2')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# X1a.iloc[-1,:]\n",
|
|
"y[2564]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from sklearn.model_selection import train_test_split\n",
|
|
"\n",
|
|
"# sensor A\n",
|
|
"x_train1, x_test1, y_train, y_test = train_test_split(X1a, y, test_size=0.2, random_state=2)\n",
|
|
"# sensor B\n",
|
|
"x_train2, x_test2, y_train, y_test = train_test_split(X2a, y, test_size=0.2, random_state=2)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from sklearn.metrics import accuracy_score\n",
|
|
"from sklearn.ensemble import RandomForestClassifier, BaggingClassifier\n",
|
|
"from sklearn.tree import DecisionTreeClassifier\n",
|
|
"from sklearn.neighbors import KNeighborsClassifier\n",
|
|
"from sklearn.discriminant_analysis import LinearDiscriminantAnalysis\n",
|
|
"from sklearn.svm import SVC\n",
|
|
"from xgboost import XGBClassifier"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Check the shapes of x_train and y_train\n",
|
|
"print(\"Shape of x1_train:\", x_train1.shape)\n",
|
|
"print(\"Shape of x2_train:\", x_train2.shape)\n",
|
|
"print(\"Shape of y_train:\", y_train.shape)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from src.ml.model_selection import train_and_evaluate_model\n",
|
|
"from sklearn.svm import SVC\n",
|
|
"from sklearn.pipeline import make_pipeline\n",
|
|
"from sklearn.preprocessing import StandardScaler\n",
|
|
"from sklearn.svm import SVC\n",
|
|
"from sklearn.decomposition import PCA\n",
|
|
"from xgboost import XGBClassifier\n",
|
|
"# Define models for sensor1\n",
|
|
"models_sensor1 = {\n",
|
|
" # \"Random Forest\": RandomForestClassifier(),\n",
|
|
" # \"Bagged Trees\": BaggingClassifier(estimator=DecisionTreeClassifier(), n_estimators=10),\n",
|
|
" # \"Decision Tree\": DecisionTreeClassifier(),\n",
|
|
" # \"KNN\": KNeighborsClassifier(),\n",
|
|
" # \"LDA\": LinearDiscriminantAnalysis(),\n",
|
|
" # \"SVM\": SVC(),\n",
|
|
" # \"SVM with StandardScaler and PCA\": make_pipeline(\n",
|
|
" # StandardScaler(),\n",
|
|
" # PCA(n_components=10),\n",
|
|
" # SVC(kernel='rbf')\n",
|
|
" # ),\n",
|
|
"\n",
|
|
" \"XGBoost\": XGBClassifier()\n",
|
|
"}\n",
|
|
"\n",
|
|
"results_sensor1 = []\n",
|
|
"for name, model in models_sensor1.items():\n",
|
|
" res = train_and_evaluate_model(model, name, \"sensor1\", x_train1, y_train, x_test1, y_test, export='D:/thesis/models/sensor1')\n",
|
|
" results_sensor1.append(res)\n",
|
|
" print(f\"{name} on sensor1: Accuracy = {res['accuracy']:.2f}%\")\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from src.ml.model_selection import plot_confusion_matrix\n",
|
|
"\n",
|
|
"# Plot confusion matrix for sensor1\n",
|
|
"plot_confusion_matrix(results_sensor1, y_test)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"models_sensor2 = {\n",
|
|
" # \"Random Forest\": RandomForestClassifier(),\n",
|
|
" # \"Bagged Trees\": BaggingClassifier(estimator=DecisionTreeClassifier(), n_estimators=10),\n",
|
|
" # \"Decision Tree\": DecisionTreeClassifier(),\n",
|
|
" # \"KNN\": KNeighborsClassifier(),\n",
|
|
" # \"LDA\": LinearDiscriminantAnalysis(),\n",
|
|
" \"SVM\": SVC(),\n",
|
|
" \"SVM with StandardScaler and PCA\": make_pipeline(\n",
|
|
" StandardScaler(),\n",
|
|
" PCA(n_components=10),\n",
|
|
" SVC(kernel='rbf')\n",
|
|
" ),\n",
|
|
" \"XGBoost\": XGBClassifier()\n",
|
|
"}\n",
|
|
"\n",
|
|
"results_sensor2 = []\n",
|
|
"for name, model in models_sensor2.items():\n",
|
|
" res = train_and_evaluate_model(model, name, \"sensor2\", x_train2, y_train, x_test2, y_test, export='D:/thesis/models/sensor2')\n",
|
|
" results_sensor2.append(res)\n",
|
|
" print(f\"{name} on sensor2: Accuracy = {res['accuracy']:.2f}%\")\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"all_results = {\n",
|
|
" \"sensor1\": results_sensor1,\n",
|
|
" \"sensor2\": results_sensor2\n",
|
|
"}\n",
|
|
"\n",
|
|
"print(all_results)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import numpy as np\n",
|
|
"import matplotlib.pyplot as plt\n",
|
|
"\n",
|
|
"def prepare_plot_data(results_dict):\n",
|
|
" # Gather unique model names\n",
|
|
" models_set = {entry['model'] for sensor in results_dict.values() for entry in sensor}\n",
|
|
" models = sorted(list(models_set))\n",
|
|
" \n",
|
|
" # Create dictionaries mapping sensor -> accuracy list ordered by model name\n",
|
|
" sensor_accuracies = {}\n",
|
|
" for sensor, entries in results_dict.items():\n",
|
|
" # Build a mapping: model -> accuracy for the given sensor\n",
|
|
" mapping = {entry['model']: entry['accuracy'] for entry in entries}\n",
|
|
" # Order the accuracies consistent with the sorted model names\n",
|
|
" sensor_accuracies[sensor] = [mapping.get(model, 0) for model in models]\n",
|
|
" \n",
|
|
" return models, sensor_accuracies\n",
|
|
"\n",
|
|
"def plot_accuracies(models, sensor_accuracies):\n",
|
|
" bar_width = 0.35\n",
|
|
" x = np.arange(len(models))\n",
|
|
" sensors = list(sensor_accuracies.keys())\n",
|
|
" \n",
|
|
" plt.figure(figsize=(10, 6))\n",
|
|
" # Assume two sensors for plotting grouped bars\n",
|
|
" plt.bar(x - bar_width/2, sensor_accuracies[sensors[0]], width=bar_width, color='blue', label=sensors[0])\n",
|
|
" plt.bar(x + bar_width/2, sensor_accuracies[sensors[1]], width=bar_width, color='orange', label=sensors[1])\n",
|
|
" \n",
|
|
" # Add text labels on top of bars\n",
|
|
" for i, (a1, a2) in enumerate(zip(sensor_accuracies[sensors[0]], sensor_accuracies[sensors[1]])):\n",
|
|
" plt.text(x[i] - bar_width/2, a1 + 0.1, f\"{a1:.2f}%\", ha='center', va='bottom', color='black')\n",
|
|
" plt.text(x[i] + bar_width/2, a2 + 0.1, f\"{a2:.2f}%\", ha='center', va='bottom', color='black')\n",
|
|
" \n",
|
|
" plt.xlabel('Model Name')\n",
|
|
" plt.ylabel('Accuracy (%)')\n",
|
|
" plt.title('Accuracy of Classifiers for Each Sensor')\n",
|
|
" plt.xticks(x, models)\n",
|
|
" plt.legend()\n",
|
|
" plt.ylim(0, 105)\n",
|
|
" plt.tight_layout()\n",
|
|
" plt.show()\n",
|
|
"\n",
|
|
"# Use the functions\n",
|
|
"models, sensor_accuracies = prepare_plot_data(all_results)\n",
|
|
"plot_accuracies(models, sensor_accuracies)\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import pandas as pd\n",
|
|
"import numpy as np\n",
|
|
"import os\n",
|
|
"import matplotlib.pyplot as plt"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Inference"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from src.ml.model_selection import create_ready_data\n",
|
|
"\n",
|
|
"X1b, y = create_ready_data('D:/thesis/data/converted/raw_B/sensor1')\n",
|
|
"X2b, y = create_ready_data('D:/thesis/data/converted/raw_B/sensor2')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"y.shape"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from sklearn.metrics import accuracy_score, classification_report\n",
|
|
"# 4. Validate on Dataset B\n",
|
|
"from joblib import load\n",
|
|
"# svm_model = load('D:/thesis/models/sensor1/SVM.joblib')\n",
|
|
"svm_model = load('D:/thesis/models/sensor1/SVM with StandardScaler and PCA.joblib')\n",
|
|
"y_pred_svm = svm_model.predict(X1b)\n",
|
|
"\n",
|
|
"# 5. Evaluate\n",
|
|
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm))\n",
|
|
"print(classification_report(y, y_pred_svm))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Model sensor 1 to predict sensor 2 data"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from sklearn.metrics import accuracy_score, classification_report\n",
|
|
"# 4. Validate on Dataset B\n",
|
|
"from joblib import load\n",
|
|
"svm_model = load('D:/thesis/models/sensor1/SVM.joblib')\n",
|
|
"y_pred_svm = svm_model.predict(X2b)\n",
|
|
"\n",
|
|
"# 5. Evaluate\n",
|
|
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm))\n",
|
|
"print(classification_report(y, y_pred_svm))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"from sklearn.metrics import accuracy_score, classification_report\n",
|
|
"# 4. Validate on Dataset B\n",
|
|
"y_pred = rf_model2.predict(X2b)\n",
|
|
"\n",
|
|
"# 5. Evaluate\n",
|
|
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred))\n",
|
|
"print(classification_report(y, y_pred))"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"y_predict = svm_model2.predict(X2b.iloc[[5312],:])\n",
|
|
"print(y_predict)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"y[5312]"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Confusion Matrix"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import matplotlib.pyplot as plt\n",
|
|
"from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n",
|
|
"\n",
|
|
"\n",
|
|
"cm = confusion_matrix(y, y_pred_svm) # -> ndarray\n",
|
|
"\n",
|
|
"# get the class labels\n",
|
|
"labels = svm_model.classes_\n",
|
|
"\n",
|
|
"# Plot\n",
|
|
"disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)\n",
|
|
"disp.plot(cmap=plt.cm.Blues) # You can change colormap\n",
|
|
"plt.title(\"SVM Sensor1 CM Train w/ Dataset A Val w/ Dataset B from Sensor1 readings\")\n",
|
|
"plt.show()"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Self-test CM"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# 1. Predict sensor 1 on Dataset A\n",
|
|
"y_test_pred = svm_model.predict(x_test1)\n",
|
|
"\n",
|
|
"# 2. Import confusion matrix tools\n",
|
|
"from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n",
|
|
"import matplotlib.pyplot as plt\n",
|
|
"\n",
|
|
"# 3. Create and plot confusion matrix\n",
|
|
"cm_train = confusion_matrix(y_test, y_test_pred)\n",
|
|
"labels = svm_model.classes_\n",
|
|
"\n",
|
|
"disp = ConfusionMatrixDisplay(confusion_matrix=cm_train, display_labels=labels)\n",
|
|
"disp.plot(cmap=plt.cm.Blues)\n",
|
|
"plt.title(\"Confusion Matrix: Train & Test on Dataset A\")\n",
|
|
"plt.show()\n"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.10.8"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
}
|