Compare commits

...

7 Commits

Author SHA1 Message Date
nuluh
855114d633 refactor(notebooks): clean up imports, adjust damage case processing, and improve model training structure
- Removed unnecessary imports (os, pandas, numpy) from the STFT notebook.
- Adjusted the number of damage cases in the multiprocessing pool to correctly reflect the range.
- Updated model training code for Sensor B to ensure consistent naming and structure.
- Cleaned up commented-out code for clarity and maintainability.
2025-08-17 23:39:57 +07:00
nuluh
4a1c0ed83e feat(src): implement inference function with damage probability calculations and visualization
Closes #103
2025-08-17 22:21:17 +07:00
nuluh
274cd60d27 refactor(src): update generate_df_tuples function signature to include type hints for better clarity 2025-08-11 18:49:41 +07:00
nuluh
9f23d82fab fix(src): correct file writing method in process.stft.process_damage_case function to fix incorrect first column name
Closes #104
2025-08-11 13:17:46 +07:00
nuluh
a8288b1426 refactor(src): enhance compute_stft function with type hints, improved documentation by moving column renaming process from process_damage_case to compute_stft 2025-08-11 13:15:48 +07:00
nuluh
860542f3f9 refactor(src): restructure compute_stft function to be pure function and include return parameters and improve clarity 2025-08-10 20:02:45 +07:00
nuluh
0e28ed6dd0 feat(notebooks): add cross-dataset validation for Sensor A and Sensor B models
Closes #74
2025-07-28 16:41:54 +07:00
4 changed files with 467 additions and 74 deletions

View File

@@ -217,9 +217,6 @@
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"import os\n",
"import pandas as pd\n",
"import numpy as np\n",
"from scipy.signal import hann\n", "from scipy.signal import hann\n",
"import multiprocessing" "import multiprocessing"
] ]
@@ -244,16 +241,6 @@
"Fs = 1024" "Fs = 1024"
] ]
}, },
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define the base directory where DAMAGE_X folders are located\n",
"damage_base_path = 'D:/thesis/data/converted/raw'"
]
},
{ {
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
@@ -304,11 +291,11 @@
"# Import custom in-house functions\n", "# Import custom in-house functions\n",
"from src.process_stft import process_damage_case\n", "from src.process_stft import process_damage_case\n",
"\n", "\n",
"num_damage_cases = 7 # DAMAGE_0 to DAMAGE_6\n", "num_damage_cases = 6 # DAMAGE_0 to DAMAGE_6\n",
"\n", "\n",
"with multiprocessing.Pool() as pool:\n", "with multiprocessing.Pool() as pool:\n",
" # Process each DAMAGE_X case in parallel\n", " # Process each DAMAGE_X case in parallel\n",
" pool.map(process_damage_case, range(num_damage_cases), Fs, window_size, hop_size, output_dirs)" " pool.map(process_damage_case, range(num_damage_cases + 1))"
] ]
}, },
{ {
@@ -665,24 +652,33 @@
" # \"Decision Tree\": DecisionTreeClassifier(),\n", " # \"Decision Tree\": DecisionTreeClassifier(),\n",
" # \"KNN\": KNeighborsClassifier(),\n", " # \"KNN\": KNeighborsClassifier(),\n",
" # \"LDA\": LinearDiscriminantAnalysis(),\n", " # \"LDA\": LinearDiscriminantAnalysis(),\n",
" # \"SVM\": make_pipeline(\n", " \"SVM\": make_pipeline(\n",
" # StandardScaler(),\n", " StandardScaler(),\n",
" # SVC(kernel='rbf', probability=True)\n", " SVC(kernel='rbf')\n",
" # ),\n", " ),\n",
" # \"SVM with StandardScaler and PCA\": make_pipeline(\n", " \"SVM with StandardScaler and PCA\": make_pipeline(\n",
" # StandardScaler(),\n", " StandardScaler(),\n",
" # PCA(n_components=10),\n", " PCA(n_components=10),\n",
" # SVC(kernel='rbf')\n", " SVC(kernel='rbf')\n",
" # ),\n", " ),\n",
"\n", "\n",
" # \"XGBoost\": XGBClassifier()\n", " # \"XGBoost\": XGBClassifier()\n",
" \"MLPClassifier\": make_pipeline(\n", " # \"MLPClassifier\": make_pipeline(\n",
" StandardScaler(),\n", " # StandardScaler(),\n",
" MLPClassifier(hidden_layer_sizes=(1, 10), max_iter=500, random_state=42)\n", " # MLPClassifier(hidden_layer_sizes=(1, 10), max_iter=500, random_state=42)\n",
" )\n", " # )\n",
"}" "}"
] ]
}, },
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"x_train1"
]
},
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
@@ -712,9 +708,15 @@
" # \"Decision Tree\": DecisionTreeClassifier(),\n", " # \"Decision Tree\": DecisionTreeClassifier(),\n",
" # \"KNN\": KNeighborsClassifier(),\n", " # \"KNN\": KNeighborsClassifier(),\n",
" # \"LDA\": LinearDiscriminantAnalysis(),\n", " # \"LDA\": LinearDiscriminantAnalysis(),\n",
" \"SVM\": SVC(),\n", " \"SVM\": make_pipeline(\n",
" # \"SVM with StandardScaler and PCA\": make_pipeline(\n", " StandardScaler(),\n",
" # StandardScaler(),\n", " SVC(kernel='rbf')\n",
" ),\n",
" \"SVM with StandardScaler and PCA\": make_pipeline(\n",
" StandardScaler(),\n",
" PCA(n_components=10),\n",
" SVC(kernel='rbf')\n",
" ),\n",
" # PCA(n_components=10),\n", " # PCA(n_components=10),\n",
" # SVC(kernel='rbf')\n", " # SVC(kernel='rbf')\n",
" # ),\n", " # ),\n",
@@ -730,8 +732,8 @@
"source": [ "source": [
"results_sensor2 = []\n", "results_sensor2 = []\n",
"for name, model in models_sensor2.items():\n", "for name, model in models_sensor2.items():\n",
" res = train_and_evaluate_model(model, name, \"sensor2\", x_train2, y_train2, x_test2, y_test2, \n", " res = train_and_evaluate_model(model, name, \"Sensor B\", x_train2, y_train2, x_test2, y_test2, \n",
" export='D:/thesis/models/sensor2')\n", " export='D:/thesis/models/Sensor B')\n",
" results_sensor2.append(res)\n", " results_sensor2.append(res)\n",
" print(f\"{name} on sensor2: Accuracy = {res['accuracy']:.2f}%\")\n", " print(f\"{name} on sensor2: Accuracy = {res['accuracy']:.2f}%\")\n",
"\n", "\n",
@@ -938,6 +940,194 @@
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm))\n", "print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm))\n",
"print(classification_report(y, y_pred_svm))" "print(classification_report(y, y_pred_svm))"
] ]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cross Dataset Validation"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Training"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.model_selection import train_test_split\n",
"\n",
"# sensor A\n",
"x_train1, x_test1, y_train1, y_test1 = train_test_split(X1b, y, test_size=0.2, random_state=2)\n",
"# sensor B\n",
"x_train2, x_test2, y_train2, y_test2 = train_test_split(X2b, y, test_size=0.2, random_state=2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"results_sensor1 = []\n",
"for name, model in models_sensor1.items():\n",
" res = train_and_evaluate_model(model, name, \"Sensor A\", x_train1, y_train1, x_test1, y_test1, \n",
" export='D:/thesis/datasetB/models/Sensor A')\n",
" results_sensor1.append(res)\n",
" print(f\"{name} on sensor1: Accuracy = {res['accuracy']:.2f}%\")\n",
"\n",
"# Display result\n",
"results_sensor1"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"results_sensor2 = []\n",
"for name, model in models_sensor2.items():\n",
" res = train_and_evaluate_model(model, name, \"sensor2\", x_train2, y_train2, x_test2, y_test2, \n",
" export='D:/thesis/datasetB/models/sensor2')\n",
" results_sensor2.append(res)\n",
" print(f\"{name} on sensor2: Accuracy = {res['accuracy']:.2f}%\")\n",
"\n",
"# Display result\n",
"results_sensor2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Testing"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Sensor A"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 4. Sensor A Validate on Dataset A\n",
"from joblib import load\n",
"svm_model = load('D:/thesis/datasetB/models/sensor1/SVM.joblib')\n",
"y_pred_svm_1 = svm_model.predict(X1a)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.metrics import accuracy_score, classification_report\n",
"\n",
"# 5. Evaluate\n",
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm_1))\n",
"print(classification_report(y, y_pred_svm_1))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Confusion Matrix Sensor A"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n",
"\n",
"\n",
"cm = confusion_matrix(y, y_pred_svm_1) # -> ndarray\n",
"\n",
"# get the class labels\n",
"labels = svm_model.classes_\n",
"\n",
"# Plot\n",
"disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)\n",
"disp.plot(cmap=plt.cm.Blues) # You can change colormap\n",
"plt.title(\"Confusion Matrix of Sensor A Test on Dataset B\")\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Sensor B"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"svm_model = load('D:/thesis/datasetB/models/sensor2/SVM.joblib')\n",
"# svm_model = load('D:/thesis/models/sensor2/SVM with StandardScaler and PCA.joblib')\n",
"y_pred_svm_2 = svm_model.predict(X2a)\n",
"\n",
"# 5. Evaluate\n",
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm_2))\n",
"print(classification_report(y, y_pred_svm_2))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Confusion Matrix Sensor B"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n",
"\n",
"\n",
"cm = confusion_matrix(y, y_pred_svm_2) # -> ndarray\n",
"\n",
"# get the class labels\n",
"labels = svm_model.classes_\n",
"\n",
"# Plot\n",
"disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)\n",
"disp.plot(cmap=plt.cm.Blues) # You can change colormap\n",
"plt.title(\"Confusion Matrix of Sensor B Test on Dataset B\")\n",
"plt.show()"
]
} }
], ],
"metadata": { "metadata": {

View File

@@ -35,8 +35,8 @@ def complement_pairs(n, prefix, extension):
if a != orig_a: # skip original a if a != orig_a: # skip original a
yield (filename, [a, a + 25]) # use yield instead of return to return a generator of tuples yield (filename, [a, a + 25]) # use yield instead of return to return a generator of tuples
def generate_df_tuples(total_dfs, prefix, extension, first_col_start, last_col_offset, def generate_df_tuples(prefix: str, total_dfs: int=30, extension: str="TXT", first_col_start: int=1, last_col_offset: int=25,
group_size=5, special_groups=None, group=True): group_size: int=5, special_groups: list=None, group: bool=True):
""" """
Generate a structured list of tuples containing DataFrame references and column indices. Generate a structured list of tuples containing DataFrame references and column indices.

View File

@@ -1,16 +1,190 @@
from src.ml.model_selection import inference_model
from joblib import load from joblib import load
import pandas as pd
from src.data_preprocessing import *
from src.process_stft import compute_stft
from typing import List, Tuple
from sklearn.base import BaseEstimator
import json
x = 30 def probability_damage(pred: Tuple[np.ndarray, np.ndarray], model_classes: BaseEstimator, percentage=False) -> Dict[str, int]:
file = f"D:/thesis/data/dataset_B/zzzBD{x}.TXT" """
sensor = 1 Process the prediction output to return unique labels and their counts.
model = {"SVM": f"D:/thesis/models/sensor{sensor}/SVM.joblib", """
"SVM with PCA": f"D:/thesis/models/sensor{sensor}/SVM with StandardScaler and PCA.joblib", labels, counts = np.unique(pred, return_counts=True)
"XGBoost": f"D:/thesis/models/sensor{sensor}/XGBoost.joblib"} label_counts = dict(zip(labels, counts))
index = ((x-1) % 5) + 1 # init all models classes probability of damage with 0 in dictionary
inference_model(model["SVM"], file, column_question=index) pod: Dict[np.ndarray, int] = dict.fromkeys(model_classes.classes_, 0)
print("---")
inference_model(model["SVM with PCA"], file, column_question=index) # update corresponding data
print("---") pod.update(label_counts)
inference_model(model["XGBoost"], file, column_question=index)
# turn the value into ratio instead of prediction counts
for label, count in pod.items():
ratio: float = count/np.sum(counts)
if percentage:
pod[label] = ratio * 100
else:
pod[label] = ratio
return pod
def convert_keys_to_strings(obj):
"""
Recursively convert all dictionary keys to strings.
"""
if isinstance(obj, dict):
return {str(key): convert_keys_to_strings(value) for key, value in obj["data"].items()}
elif isinstance(obj, list):
return [convert_keys_to_strings(item) for item in obj["data"]]
else:
return obj
def inference(model_sensor_A_path: str, model_sensor_B_path: str, file_path: str):
# Generate column indices
column_index: List[Tuple[int, int]] = [
(i + 1, i + 26)
for i in range(5)
]
# Load a single case data
df: pd.DataFrame = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True)
# Take case name
case_name: str = file_path.split("/")[-1].split(".")[0]
# Extract relevant columns for each sensor
column_data: List[Tuple[pd.Series[float], pd.Series[float]]] = [
(df.iloc[:, i[0]], df.iloc[:, i[1]])
for i in column_index
]
column_data_stft: List[Tuple[pd.DataFrame, pd.DataFrame]] = [
(compute_stft(sensor_A), compute_stft(sensor_B))
for (sensor_A, sensor_B) in column_data
]
# Load the model
model_sensor_A = load(model_sensor_A_path)
model_sensor_B = load(model_sensor_B_path)
res = {}
for i, (stft_A, stft_B) in enumerate(column_data_stft):
# Make predictions using the model
pred_A: list[int] = model_sensor_A.predict(stft_A)
pred_B: list[int] = model_sensor_B.predict(stft_B)
percentage_A = probability_damage(pred_A, model_sensor_A)
percentage_B = probability_damage(pred_B, model_sensor_B)
res[f"Column_{i+1}"] = {
"Sensor_A": {
# "Predictions": pred_A,
"PoD": percentage_A
},
"Sensor_B": {
# "Predictions": pred_B,
"PoD": percentage_B
}
}
final_res = {"data": res, "case": case_name}
return final_res
def heatmap(result, damage_classes: list[int] = [1, 2, 3, 4, 5, 6]):
from scipy.interpolate import RectBivariateSpline
resolution = 300
y = list(range(1, len(damage_classes)+1))
# length of column
x = list(range(len(result["data"])))
# X, Y = np.meshgrid(x, y)
Z = []
for _, column_data in result["data"].items():
sensor_a_pod = column_data['Sensor_A']['PoD']
Z.append([sensor_a_pod.get(cls, 0) for cls in damage_classes])
Z = np.array(Z).T
y2 = np.linspace(1, len(damage_classes), resolution)
x2 = np.linspace(0,4,resolution)
f = RectBivariateSpline(x, y, Z.T, kx=2, ky=2) # 2nd degree quadratic spline interpolation
Z2 = f(x2, y2).T.clip(0, 1) # clip to ignores negative values from cubic interpolation
X2, Y2 = np.meshgrid(x2, y2)
# breakpoint()
c = plt.pcolormesh(X2, Y2, Z2, cmap='jet', shading='auto')
# Add a colorbar
plt.colorbar(c, label='Probability of Damage (PoD)')
plt.gca().invert_xaxis()
plt.grid(True, linestyle='-', alpha=0.7)
plt.xticks(np.arange(int(X2.min()), int(X2.max())+1, 1))
plt.xlabel("Column Index")
plt.ylabel("Damage Index")
plt.title(result["case"])
# plt.xticks(ticks=x2, labels=[f'Col_{i+1}' for i in range(len(result))])
# plt.gca().xaxis.set_major_locator(MultipleLocator(65/4))
plt.show()
if __name__ == "__main__":
import matplotlib.pyplot as plt
import json
from scipy.interpolate import UnivariateSpline
result = inference(
"D:/thesis/models/Sensor A/SVM with StandardScaler and PCA.joblib",
"D:/thesis/models/Sensor B/SVM with StandardScaler and PCA.joblib",
"D:/thesis/data/dataset_B/zzzBD19.TXT"
)
# heatmap(result)
# Convert all keys to strings before dumping to JSON
# result_with_string_keys = convert_keys_to_strings(result)
# print(json.dumps(result_with_string_keys, indent=4))
# Create a 5x2 subplot grid (5 rows for each column, 2 columns for sensors)
fig, axes = plt.subplots(nrows=5, ncols=2, figsize=(5, 50))
# # Define damage class labels for x-axis
damage_classes = [1, 2, 3, 4, 5, 6]
# # Loop through each column in the data
for row_idx, (column_name, column_data) in enumerate(result['data'].items()):
# Plot Sensor A in the first column of subplots
sensor_a_pod = column_data['Sensor_A']['PoD']
x_values = list(range(len(damage_classes)))
y_values = [sensor_a_pod.get(cls, 0) for cls in damage_classes]
# x2 = np.linspace(1, 6, 100)
# interp = UnivariateSpline(x_values, y_values, s=0)
axes[row_idx, 0].plot(x_values, y_values, '-', linewidth=2, markersize=8)
axes[row_idx, 0].set_title(f"{column_name} - Sensor A", fontsize=10)
axes[row_idx, 0].set_xticks(x_values)
axes[row_idx, 0].set_xticklabels(damage_classes)
axes[row_idx, 0].set_ylim(0, 1.05)
axes[row_idx, 0].set_ylabel('Probability')
axes[row_idx, 0].set_xlabel('Damage Class')
axes[row_idx, 0].grid(True, linestyle='-', alpha=0.5)
# Plot Sensor B in the second column of subplots
sensor_b_pod = column_data['Sensor_B']['PoD']
y_values = [sensor_b_pod.get(cls, 0) for cls in damage_classes]
axes[row_idx, 1].plot(x_values, y_values, '-', linewidth=2, markersize=8)
axes[row_idx, 1].set_title(f"{column_name} - Sensor B", fontsize=10)
axes[row_idx, 1].set_xticks(x_values)
axes[row_idx, 1].set_xticklabels(damage_classes)
axes[row_idx, 1].set_ylim(0, 1.05)
axes[row_idx, 1].set_ylabel('Probability')
axes[row_idx, 1].set_xlabel('Damage Class')
axes[row_idx, 1].grid(True, linestyle='-', alpha=0.5)
# Adjust layout to prevent overlap
fig.tight_layout(rect=[0, 0, 1, 0.96]) # Leave space for suptitle
plt.subplots_adjust(hspace=1, wspace=0.3) # Adjust spacing between subplots
plt.suptitle(f"Case {result['case']}", fontsize=16, y=0.98) # Adjust suptitle position
plt.show()

View File

@@ -1,9 +1,11 @@
import os import os
import pandas as pd import pandas as pd
import numpy as np import numpy as np
from scipy.signal import stft, hann from scipy.signal import stft
from scipy.signal.windows import hann
import glob import glob
import multiprocessing # Added import for multiprocessing import multiprocessing # Added import for multiprocessing
from typing import Union, Tuple
# Define the base directory where DAMAGE_X folders are located # Define the base directory where DAMAGE_X folders are located
damage_base_path = 'D:/thesis/data/converted/raw' damage_base_path = 'D:/thesis/data/converted/raw'
@@ -19,27 +21,59 @@ for dir_path in output_dirs.values():
os.makedirs(dir_path, exist_ok=True) os.makedirs(dir_path, exist_ok=True)
# Define STFT parameters # Define STFT parameters
window_size = 1024
hop_size = 512
window = hann(window_size)
Fs = 1024
# Number of damage cases (adjust as needed) # Number of damage cases (adjust as needed)
num_damage_cases = 0 # Change to 30 if you have 30 damage cases num_damage_cases = 6 # Change to 30 if you have 30 damage cases
# Function to perform STFT and return magnitude # Function to perform STFT and return magnitude
def compute_stft(vibration_data, Fs=Fs, window_size=window_size, hop_size=hop_size): def compute_stft(vibration_data: np.ndarray, return_param: bool = False) -> Union[pd.DataFrame, Tuple[pd.DataFrame, list[int, int, int]]]:
frequencies, times, Zxx = stft( """
vibration_data, Computes the Short-Time Fourier Transform (STFT) magnitude of the input vibration data.
fs=Fs,
window=window,
nperseg=window_size,
noverlap=window_size - hop_size
)
stft_magnitude = np.abs(Zxx)
return stft_magnitude.T # Transpose to have frequencies as columns
def process_damage_case(damage_num, Fs=Fs, window_size=window_size, hop_size=hop_size, output_dirs=output_dirs): Parameters
----------
vibration_data : numpy.ndarray
The input vibration data as a 1D NumPy array.
return_param : bool, optional
If True, the function returns additional STFT parameters (window size, hop size, and sampling frequency).
Defaults to False.
Returns
-------
pd.DataFrame
The transposed STFT magnitude, with frequencies as columns, if `return_param` is False.
tuple
If `return_param` is True, returns a tuple containing:
- pd.DataFrame: The transposed STFT magnitude, with frequencies as columns.
- list[int, int, int]: A list of STFT parameters [window_size, hop_size, Fs].
"""
window_size = 1024
hop_size = 512
window = hann(window_size)
Fs = 1024
frequencies, times, Zxx = stft(
vibration_data,
fs=Fs,
window=window,
nperseg=window_size,
noverlap=window_size - hop_size
)
stft_magnitude = np.abs(Zxx)
# Convert STFT result to DataFrame
df_stft = pd.DataFrame(
stft_magnitude.T,
columns=[f"Freq_{freq:.2f}" for freq in np.linspace(0, Fs/2, stft_magnitude.shape[1])]
)
# breakpoint()
if return_param:
return df_stft, [window_size, hop_size, Fs]
else:
return df_stft
def process_damage_case(damage_num):
damage_folder = os.path.join(damage_base_path, f'DAMAGE_{damage_num}') damage_folder = os.path.join(damage_base_path, f'DAMAGE_{damage_num}')
if damage_num == 0: if damage_num == 0:
# Number of test runs per damage case # Number of test runs per damage case
@@ -83,13 +117,8 @@ def process_damage_case(damage_num, Fs=Fs, window_size=window_size, hop_size=hop
vibration_data = df.iloc[:, 1].values vibration_data = df.iloc[:, 1].values
# Perform STFT # Perform STFT
stft_magnitude = compute_stft(vibration_data, Fs=Fs, window_size=window_size, hop_size=hop_size) df_stft = compute_stft(vibration_data)
# Convert STFT result to DataFrame
df_stft = pd.DataFrame(
stft_magnitude,
columns=[f"Freq_{freq:.2f}" for freq in np.linspace(0, Fs/2, stft_magnitude.shape[1])]
)
# only inlcude 21 samples vector features for first 45 num_test_runs else include 22 samples vector features # only inlcude 21 samples vector features for first 45 num_test_runs else include 22 samples vector features
if damage_num == 0: if damage_num == 0:
print(f"Processing damage_num = 0, test_num = {test_num}") print(f"Processing damage_num = 0, test_num = {test_num}")
@@ -117,11 +146,11 @@ def process_damage_case(damage_num, Fs=Fs, window_size=window_size, hop_size=hop
# Save the aggregated STFT to CSV # Save the aggregated STFT to CSV
with open(output_file, 'w') as file: with open(output_file, 'w') as file:
file.write('sep=,\n') file.write('sep=,\n')
df_aggregated.to_csv(output_file, index=False) df_aggregated.to_csv(file, index=False)
print(f"Saved aggregated STFT for Sensor {sensor_num}, Damage {damage_num} to {output_file}") print(f"Saved aggregated STFT for Sensor {sensor_num}, Damage {damage_num} to {output_file}")
else: else:
print(f"No STFT data aggregated for Sensor {sensor_num}, Damage {damage_num}.") print(f"No STFT data aggregated for Sensor {sensor_num}, Damage {damage_num}.")
if __name__ == "__main__": # Added main guard for multiprocessing if __name__ == "__main__": # Added main guard for multiprocessing
with multiprocessing.Pool() as pool: with multiprocessing.Pool() as pool:
pool.map(process_damage_case, range(0, num_damage_cases + 1)) pool.map(process_damage_case, range(num_damage_cases + 1))