Merge branch 'feat/103-feat-inference-function' into dev

This commit was merged in pull request #106.
This commit is contained in:
nuluh
2025-08-19 06:05:32 +07:00
3 changed files with 244 additions and 41 deletions

View File

@@ -35,8 +35,8 @@ def complement_pairs(n, prefix, extension):
if a != orig_a: # skip original a if a != orig_a: # skip original a
yield (filename, [a, a + 25]) # use yield instead of return to return a generator of tuples yield (filename, [a, a + 25]) # use yield instead of return to return a generator of tuples
def generate_df_tuples(total_dfs, prefix, extension, first_col_start, last_col_offset, def generate_df_tuples(prefix: str, total_dfs: int=30, extension: str="TXT", first_col_start: int=1, last_col_offset: int=25,
group_size=5, special_groups=None, group=True): group_size: int=5, special_groups: list=None, group: bool=True):
""" """
Generate a structured list of tuples containing DataFrame references and column indices. Generate a structured list of tuples containing DataFrame references and column indices.

View File

@@ -1,16 +1,190 @@
from src.ml.model_selection import inference_model
from joblib import load from joblib import load
import pandas as pd
from src.data_preprocessing import *
from src.process_stft import compute_stft
from typing import List, Tuple
from sklearn.base import BaseEstimator
import json
x = 30 def probability_damage(pred: Tuple[np.ndarray, np.ndarray], model_classes: BaseEstimator, percentage=False) -> Dict[str, int]:
file = f"D:/thesis/data/dataset_B/zzzBD{x}.TXT" """
sensor = 1 Process the prediction output to return unique labels and their counts.
model = {"SVM": f"D:/thesis/models/sensor{sensor}/SVM.joblib", """
"SVM with PCA": f"D:/thesis/models/sensor{sensor}/SVM with StandardScaler and PCA.joblib", labels, counts = np.unique(pred, return_counts=True)
"XGBoost": f"D:/thesis/models/sensor{sensor}/XGBoost.joblib"} label_counts = dict(zip(labels, counts))
# init all models classes probability of damage with 0 in dictionary
pod: Dict[np.ndarray, int] = dict.fromkeys(model_classes.classes_, 0)
# update corresponding data
pod.update(label_counts)
# turn the value into ratio instead of prediction counts
for label, count in pod.items():
ratio: float = count/np.sum(counts)
if percentage:
pod[label] = ratio * 100
else:
pod[label] = ratio
return pod
def convert_keys_to_strings(obj):
"""
Recursively convert all dictionary keys to strings.
"""
if isinstance(obj, dict):
return {str(key): convert_keys_to_strings(value) for key, value in obj["data"].items()}
elif isinstance(obj, list):
return [convert_keys_to_strings(item) for item in obj["data"]]
else:
return obj
def inference(model_sensor_A_path: str, model_sensor_B_path: str, file_path: str):
# Generate column indices
column_index: List[Tuple[int, int]] = [
(i + 1, i + 26)
for i in range(5)
]
# Load a single case data
df: pd.DataFrame = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True)
# Take case name
case_name: str = file_path.split("/")[-1].split(".")[0]
# Extract relevant columns for each sensor
column_data: List[Tuple[pd.Series[float], pd.Series[float]]] = [
(df.iloc[:, i[0]], df.iloc[:, i[1]])
for i in column_index
]
column_data_stft: List[Tuple[pd.DataFrame, pd.DataFrame]] = [
(compute_stft(sensor_A), compute_stft(sensor_B))
for (sensor_A, sensor_B) in column_data
]
# Load the model
model_sensor_A = load(model_sensor_A_path)
model_sensor_B = load(model_sensor_B_path)
res = {}
for i, (stft_A, stft_B) in enumerate(column_data_stft):
# Make predictions using the model
pred_A: list[int] = model_sensor_A.predict(stft_A)
pred_B: list[int] = model_sensor_B.predict(stft_B)
percentage_A = probability_damage(pred_A, model_sensor_A)
percentage_B = probability_damage(pred_B, model_sensor_B)
res[f"Column_{i+1}"] = {
"Sensor_A": {
# "Predictions": pred_A,
"PoD": percentage_A
},
"Sensor_B": {
# "Predictions": pred_B,
"PoD": percentage_B
}
}
final_res = {"data": res, "case": case_name}
return final_res
def heatmap(result, damage_classes: list[int] = [1, 2, 3, 4, 5, 6]):
from scipy.interpolate import RectBivariateSpline
resolution = 300
y = list(range(1, len(damage_classes)+1))
# length of column
x = list(range(len(result["data"])))
# X, Y = np.meshgrid(x, y)
Z = []
for _, column_data in result["data"].items():
sensor_a_pod = column_data['Sensor_A']['PoD']
Z.append([sensor_a_pod.get(cls, 0) for cls in damage_classes])
Z = np.array(Z).T
y2 = np.linspace(1, len(damage_classes), resolution)
x2 = np.linspace(0,4,resolution)
f = RectBivariateSpline(x, y, Z.T, kx=2, ky=2) # 2nd degree quadratic spline interpolation
Z2 = f(x2, y2).T.clip(0, 1) # clip to ignores negative values from cubic interpolation
X2, Y2 = np.meshgrid(x2, y2)
# breakpoint()
c = plt.pcolormesh(X2, Y2, Z2, cmap='jet', shading='auto')
# Add a colorbar
plt.colorbar(c, label='Probability of Damage (PoD)')
plt.gca().invert_xaxis()
plt.grid(True, linestyle='-', alpha=0.7)
plt.xticks(np.arange(int(X2.min()), int(X2.max())+1, 1))
plt.xlabel("Column Index")
plt.ylabel("Damage Index")
plt.title(result["case"])
# plt.xticks(ticks=x2, labels=[f'Col_{i+1}' for i in range(len(result))])
# plt.gca().xaxis.set_major_locator(MultipleLocator(65/4))
plt.show()
if __name__ == "__main__":
import matplotlib.pyplot as plt
import json
from scipy.interpolate import UnivariateSpline
result = inference(
"D:/thesis/models/Sensor A/SVM with StandardScaler and PCA.joblib",
"D:/thesis/models/Sensor B/SVM with StandardScaler and PCA.joblib",
"D:/thesis/data/dataset_B/zzzBD19.TXT"
)
# heatmap(result)
# Convert all keys to strings before dumping to JSON
# result_with_string_keys = convert_keys_to_strings(result)
# print(json.dumps(result_with_string_keys, indent=4))
# Create a 5x2 subplot grid (5 rows for each column, 2 columns for sensors)
fig, axes = plt.subplots(nrows=5, ncols=2, figsize=(5, 50))
# # Define damage class labels for x-axis
damage_classes = [1, 2, 3, 4, 5, 6]
# # Loop through each column in the data
for row_idx, (column_name, column_data) in enumerate(result['data'].items()):
# Plot Sensor A in the first column of subplots
sensor_a_pod = column_data['Sensor_A']['PoD']
x_values = list(range(len(damage_classes)))
y_values = [sensor_a_pod.get(cls, 0) for cls in damage_classes]
# x2 = np.linspace(1, 6, 100)
# interp = UnivariateSpline(x_values, y_values, s=0)
axes[row_idx, 0].plot(x_values, y_values, '-', linewidth=2, markersize=8)
axes[row_idx, 0].set_title(f"{column_name} - Sensor A", fontsize=10)
axes[row_idx, 0].set_xticks(x_values)
axes[row_idx, 0].set_xticklabels(damage_classes)
axes[row_idx, 0].set_ylim(0, 1.05)
axes[row_idx, 0].set_ylabel('Probability')
axes[row_idx, 0].set_xlabel('Damage Class')
axes[row_idx, 0].grid(True, linestyle='-', alpha=0.5)
# Plot Sensor B in the second column of subplots
sensor_b_pod = column_data['Sensor_B']['PoD']
y_values = [sensor_b_pod.get(cls, 0) for cls in damage_classes]
axes[row_idx, 1].plot(x_values, y_values, '-', linewidth=2, markersize=8)
axes[row_idx, 1].set_title(f"{column_name} - Sensor B", fontsize=10)
axes[row_idx, 1].set_xticks(x_values)
axes[row_idx, 1].set_xticklabels(damage_classes)
axes[row_idx, 1].set_ylim(0, 1.05)
axes[row_idx, 1].set_ylabel('Probability')
axes[row_idx, 1].set_xlabel('Damage Class')
axes[row_idx, 1].grid(True, linestyle='-', alpha=0.5)
# Adjust layout to prevent overlap
fig.tight_layout(rect=[0, 0, 1, 0.96]) # Leave space for suptitle
plt.subplots_adjust(hspace=1, wspace=0.3) # Adjust spacing between subplots
plt.suptitle(f"Case {result['case']}", fontsize=16, y=0.98) # Adjust suptitle position
plt.show()
index = ((x-1) % 5) + 1
inference_model(model["SVM"], file, column_question=index)
print("---")
inference_model(model["SVM with PCA"], file, column_question=index)
print("---")
inference_model(model["XGBoost"], file, column_question=index)

View File

@@ -1,9 +1,11 @@
import os import os
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from scipy.signal import stft, hann from scipy.signal import stft
from scipy.signal.windows import hann
import glob import glob
import multiprocessing # Added import for multiprocessing import multiprocessing # Added import for multiprocessing
from typing import Union, Tuple
# Define the base directory where DAMAGE_X folders are located # Define the base directory where DAMAGE_X folders are located
damage_base_path = 'D:/thesis/data/converted/raw' damage_base_path = 'D:/thesis/data/converted/raw'
@@ -19,16 +21,38 @@ for dir_path in output_dirs.values():
os.makedirs(dir_path, exist_ok=True) os.makedirs(dir_path, exist_ok=True)
# Define STFT parameters # Define STFT parameters
# Number of damage cases (adjust as needed)
num_damage_cases = 6 # Change to 30 if you have 30 damage cases
# Function to perform STFT and return magnitude
def compute_stft(vibration_data: np.ndarray, return_param: bool = False) -> Union[pd.DataFrame, Tuple[pd.DataFrame, list[int, int, int]]]:
"""
Computes the Short-Time Fourier Transform (STFT) magnitude of the input vibration data.
Parameters
----------
vibration_data : numpy.ndarray
The input vibration data as a 1D NumPy array.
return_param : bool, optional
If True, the function returns additional STFT parameters (window size, hop size, and sampling frequency).
Defaults to False.
Returns
-------
pd.DataFrame
The transposed STFT magnitude, with frequencies as columns, if `return_param` is False.
tuple
If `return_param` is True, returns a tuple containing:
- pd.DataFrame: The transposed STFT magnitude, with frequencies as columns.
- list[int, int, int]: A list of STFT parameters [window_size, hop_size, Fs].
"""
window_size = 1024 window_size = 1024
hop_size = 512 hop_size = 512
window = hann(window_size) window = hann(window_size)
Fs = 1024 Fs = 1024
# Number of damage cases (adjust as needed)
num_damage_cases = 0 # Change to 30 if you have 30 damage cases
# Function to perform STFT and return magnitude
def compute_stft(vibration_data, Fs=Fs, window_size=window_size, hop_size=hop_size):
frequencies, times, Zxx = stft( frequencies, times, Zxx = stft(
vibration_data, vibration_data,
fs=Fs, fs=Fs,
@@ -37,9 +61,19 @@ def compute_stft(vibration_data, Fs=Fs, window_size=window_size, hop_size=hop_si
noverlap=window_size - hop_size noverlap=window_size - hop_size
) )
stft_magnitude = np.abs(Zxx) stft_magnitude = np.abs(Zxx)
return stft_magnitude.T # Transpose to have frequencies as columns
def process_damage_case(damage_num, Fs=Fs, window_size=window_size, hop_size=hop_size, output_dirs=output_dirs): # Convert STFT result to DataFrame
df_stft = pd.DataFrame(
stft_magnitude.T,
columns=[f"Freq_{freq:.2f}" for freq in np.linspace(0, Fs/2, stft_magnitude.shape[1])]
)
# breakpoint()
if return_param:
return df_stft, [window_size, hop_size, Fs]
else:
return df_stft
def process_damage_case(damage_num):
damage_folder = os.path.join(damage_base_path, f'DAMAGE_{damage_num}') damage_folder = os.path.join(damage_base_path, f'DAMAGE_{damage_num}')
if damage_num == 0: if damage_num == 0:
# Number of test runs per damage case # Number of test runs per damage case
@@ -83,13 +117,8 @@ def process_damage_case(damage_num, Fs=Fs, window_size=window_size, hop_size=hop
vibration_data = df.iloc[:, 1].values vibration_data = df.iloc[:, 1].values
# Perform STFT # Perform STFT
stft_magnitude = compute_stft(vibration_data, Fs=Fs, window_size=window_size, hop_size=hop_size) df_stft = compute_stft(vibration_data)
# Convert STFT result to DataFrame
df_stft = pd.DataFrame(
stft_magnitude,
columns=[f"Freq_{freq:.2f}" for freq in np.linspace(0, Fs/2, stft_magnitude.shape[1])]
)
# only inlcude 21 samples vector features for first 45 num_test_runs else include 22 samples vector features # only inlcude 21 samples vector features for first 45 num_test_runs else include 22 samples vector features
if damage_num == 0: if damage_num == 0:
print(f"Processing damage_num = 0, test_num = {test_num}") print(f"Processing damage_num = 0, test_num = {test_num}")
@@ -117,11 +146,11 @@ def process_damage_case(damage_num, Fs=Fs, window_size=window_size, hop_size=hop
# Save the aggregated STFT to CSV # Save the aggregated STFT to CSV
with open(output_file, 'w') as file: with open(output_file, 'w') as file:
file.write('sep=,\n') file.write('sep=,\n')
df_aggregated.to_csv(output_file, index=False) df_aggregated.to_csv(file, index=False)
print(f"Saved aggregated STFT for Sensor {sensor_num}, Damage {damage_num} to {output_file}") print(f"Saved aggregated STFT for Sensor {sensor_num}, Damage {damage_num} to {output_file}")
else: else:
print(f"No STFT data aggregated for Sensor {sensor_num}, Damage {damage_num}.") print(f"No STFT data aggregated for Sensor {sensor_num}, Damage {damage_num}.")
if __name__ == "__main__": # Added main guard for multiprocessing if __name__ == "__main__": # Added main guard for multiprocessing
with multiprocessing.Pool() as pool: with multiprocessing.Pool() as pool:
pool.map(process_damage_case, range(0, num_damage_cases + 1)) pool.map(process_damage_case, range(num_damage_cases + 1))