Feat inference function #106
@@ -217,9 +217,6 @@
|
|||||||
"metadata": {},
|
"metadata": {},
|
||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
"import os\n",
|
|
||||||
"import pandas as pd\n",
|
|
||||||
"import numpy as np\n",
|
|
||||||
"from scipy.signal import hann\n",
|
"from scipy.signal import hann\n",
|
||||||
"import multiprocessing"
|
"import multiprocessing"
|
||||||
]
|
]
|
||||||
@@ -244,16 +241,6 @@
|
|||||||
"Fs = 1024"
|
"Fs = 1024"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": null,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [],
|
|
||||||
"source": [
|
|
||||||
"# Define the base directory where DAMAGE_X folders are located\n",
|
|
||||||
"damage_base_path = 'D:/thesis/data/converted/raw'"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"cell_type": "markdown",
|
"cell_type": "markdown",
|
||||||
"metadata": {},
|
"metadata": {},
|
||||||
@@ -304,11 +291,11 @@
|
|||||||
"# Import custom in-house functions\n",
|
"# Import custom in-house functions\n",
|
||||||
"from src.process_stft import process_damage_case\n",
|
"from src.process_stft import process_damage_case\n",
|
||||||
"\n",
|
"\n",
|
||||||
"num_damage_cases = 7 # DAMAGE_0 to DAMAGE_6\n",
|
"num_damage_cases = 6 # DAMAGE_0 to DAMAGE_6\n",
|
||||||
"\n",
|
"\n",
|
||||||
"with multiprocessing.Pool() as pool:\n",
|
"with multiprocessing.Pool() as pool:\n",
|
||||||
" # Process each DAMAGE_X case in parallel\n",
|
" # Process each DAMAGE_X case in parallel\n",
|
||||||
" pool.map(process_damage_case, range(num_damage_cases), Fs, window_size, hop_size, output_dirs)"
|
" pool.map(process_damage_case, range(num_damage_cases + 1))"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@@ -665,24 +652,33 @@
|
|||||||
" # \"Decision Tree\": DecisionTreeClassifier(),\n",
|
" # \"Decision Tree\": DecisionTreeClassifier(),\n",
|
||||||
" # \"KNN\": KNeighborsClassifier(),\n",
|
" # \"KNN\": KNeighborsClassifier(),\n",
|
||||||
" # \"LDA\": LinearDiscriminantAnalysis(),\n",
|
" # \"LDA\": LinearDiscriminantAnalysis(),\n",
|
||||||
" # \"SVM\": make_pipeline(\n",
|
" \"SVM\": make_pipeline(\n",
|
||||||
" # StandardScaler(),\n",
|
" StandardScaler(),\n",
|
||||||
" # SVC(kernel='rbf', probability=True)\n",
|
" SVC(kernel='rbf')\n",
|
||||||
" # ),\n",
|
" ),\n",
|
||||||
" # \"SVM with StandardScaler and PCA\": make_pipeline(\n",
|
" \"SVM with StandardScaler and PCA\": make_pipeline(\n",
|
||||||
" # StandardScaler(),\n",
|
" StandardScaler(),\n",
|
||||||
" # PCA(n_components=10),\n",
|
" PCA(n_components=10),\n",
|
||||||
" # SVC(kernel='rbf')\n",
|
" SVC(kernel='rbf')\n",
|
||||||
" # ),\n",
|
" ),\n",
|
||||||
"\n",
|
"\n",
|
||||||
" # \"XGBoost\": XGBClassifier()\n",
|
" # \"XGBoost\": XGBClassifier()\n",
|
||||||
" \"MLPClassifier\": make_pipeline(\n",
|
" # \"MLPClassifier\": make_pipeline(\n",
|
||||||
" StandardScaler(),\n",
|
" # StandardScaler(),\n",
|
||||||
" MLPClassifier(hidden_layer_sizes=(1, 10), max_iter=500, random_state=42)\n",
|
" # MLPClassifier(hidden_layer_sizes=(1, 10), max_iter=500, random_state=42)\n",
|
||||||
" )\n",
|
" # )\n",
|
||||||
"}"
|
"}"
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"x_train1"
|
||||||
|
]
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"execution_count": null,
|
"execution_count": null,
|
||||||
@@ -712,9 +708,15 @@
|
|||||||
" # \"Decision Tree\": DecisionTreeClassifier(),\n",
|
" # \"Decision Tree\": DecisionTreeClassifier(),\n",
|
||||||
" # \"KNN\": KNeighborsClassifier(),\n",
|
" # \"KNN\": KNeighborsClassifier(),\n",
|
||||||
" # \"LDA\": LinearDiscriminantAnalysis(),\n",
|
" # \"LDA\": LinearDiscriminantAnalysis(),\n",
|
||||||
" \"SVM\": SVC(),\n",
|
" \"SVM\": make_pipeline(\n",
|
||||||
" # \"SVM with StandardScaler and PCA\": make_pipeline(\n",
|
" StandardScaler(),\n",
|
||||||
" # StandardScaler(),\n",
|
" SVC(kernel='rbf')\n",
|
||||||
|
" ),\n",
|
||||||
|
" \"SVM with StandardScaler and PCA\": make_pipeline(\n",
|
||||||
|
" StandardScaler(),\n",
|
||||||
|
" PCA(n_components=10),\n",
|
||||||
|
" SVC(kernel='rbf')\n",
|
||||||
|
" ),\n",
|
||||||
" # PCA(n_components=10),\n",
|
" # PCA(n_components=10),\n",
|
||||||
" # SVC(kernel='rbf')\n",
|
" # SVC(kernel='rbf')\n",
|
||||||
" # ),\n",
|
" # ),\n",
|
||||||
@@ -730,8 +732,8 @@
|
|||||||
"source": [
|
"source": [
|
||||||
"results_sensor2 = []\n",
|
"results_sensor2 = []\n",
|
||||||
"for name, model in models_sensor2.items():\n",
|
"for name, model in models_sensor2.items():\n",
|
||||||
" res = train_and_evaluate_model(model, name, \"sensor2\", x_train2, y_train2, x_test2, y_test2, \n",
|
" res = train_and_evaluate_model(model, name, \"Sensor B\", x_train2, y_train2, x_test2, y_test2, \n",
|
||||||
" export='D:/thesis/models/sensor2')\n",
|
" export='D:/thesis/models/Sensor B')\n",
|
||||||
" results_sensor2.append(res)\n",
|
" results_sensor2.append(res)\n",
|
||||||
" print(f\"{name} on sensor2: Accuracy = {res['accuracy']:.2f}%\")\n",
|
" print(f\"{name} on sensor2: Accuracy = {res['accuracy']:.2f}%\")\n",
|
||||||
"\n",
|
"\n",
|
||||||
@@ -938,6 +940,194 @@
|
|||||||
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm))\n",
|
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm))\n",
|
||||||
"print(classification_report(y, y_pred_svm))"
|
"print(classification_report(y, y_pred_svm))"
|
||||||
]
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"## Cross Dataset Validation"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"---"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"### Training"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"from sklearn.model_selection import train_test_split\n",
|
||||||
|
"\n",
|
||||||
|
"# sensor A\n",
|
||||||
|
"x_train1, x_test1, y_train1, y_test1 = train_test_split(X1b, y, test_size=0.2, random_state=2)\n",
|
||||||
|
"# sensor B\n",
|
||||||
|
"x_train2, x_test2, y_train2, y_test2 = train_test_split(X2b, y, test_size=0.2, random_state=2)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"results_sensor1 = []\n",
|
||||||
|
"for name, model in models_sensor1.items():\n",
|
||||||
|
" res = train_and_evaluate_model(model, name, \"Sensor A\", x_train1, y_train1, x_test1, y_test1, \n",
|
||||||
|
" export='D:/thesis/datasetB/models/Sensor A')\n",
|
||||||
|
" results_sensor1.append(res)\n",
|
||||||
|
" print(f\"{name} on sensor1: Accuracy = {res['accuracy']:.2f}%\")\n",
|
||||||
|
"\n",
|
||||||
|
"# Display result\n",
|
||||||
|
"results_sensor1"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"results_sensor2 = []\n",
|
||||||
|
"for name, model in models_sensor2.items():\n",
|
||||||
|
" res = train_and_evaluate_model(model, name, \"sensor2\", x_train2, y_train2, x_test2, y_test2, \n",
|
||||||
|
" export='D:/thesis/datasetB/models/sensor2')\n",
|
||||||
|
" results_sensor2.append(res)\n",
|
||||||
|
" print(f\"{name} on sensor2: Accuracy = {res['accuracy']:.2f}%\")\n",
|
||||||
|
"\n",
|
||||||
|
"# Display result\n",
|
||||||
|
"results_sensor2"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"### Testing"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"#### Sensor A"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"# 4. Sensor A Validate on Dataset A\n",
|
||||||
|
"from joblib import load\n",
|
||||||
|
"svm_model = load('D:/thesis/datasetB/models/sensor1/SVM.joblib')\n",
|
||||||
|
"y_pred_svm_1 = svm_model.predict(X1a)"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"from sklearn.metrics import accuracy_score, classification_report\n",
|
||||||
|
"\n",
|
||||||
|
"# 5. Evaluate\n",
|
||||||
|
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm_1))\n",
|
||||||
|
"print(classification_report(y, y_pred_svm_1))"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"#### Confusion Matrix Sensor A"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"import matplotlib.pyplot as plt\n",
|
||||||
|
"from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n",
|
||||||
|
"\n",
|
||||||
|
"\n",
|
||||||
|
"cm = confusion_matrix(y, y_pred_svm_1) # -> ndarray\n",
|
||||||
|
"\n",
|
||||||
|
"# get the class labels\n",
|
||||||
|
"labels = svm_model.classes_\n",
|
||||||
|
"\n",
|
||||||
|
"# Plot\n",
|
||||||
|
"disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)\n",
|
||||||
|
"disp.plot(cmap=plt.cm.Blues) # You can change colormap\n",
|
||||||
|
"plt.title(\"Confusion Matrix of Sensor A Test on Dataset B\")\n",
|
||||||
|
"plt.show()"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"#### Sensor B"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"svm_model = load('D:/thesis/datasetB/models/sensor2/SVM.joblib')\n",
|
||||||
|
"# svm_model = load('D:/thesis/models/sensor2/SVM with StandardScaler and PCA.joblib')\n",
|
||||||
|
"y_pred_svm_2 = svm_model.predict(X2a)\n",
|
||||||
|
"\n",
|
||||||
|
"# 5. Evaluate\n",
|
||||||
|
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm_2))\n",
|
||||||
|
"print(classification_report(y, y_pred_svm_2))"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"#### Confusion Matrix Sensor B"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"import matplotlib.pyplot as plt\n",
|
||||||
|
"from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n",
|
||||||
|
"\n",
|
||||||
|
"\n",
|
||||||
|
"cm = confusion_matrix(y, y_pred_svm_2) # -> ndarray\n",
|
||||||
|
"\n",
|
||||||
|
"# get the class labels\n",
|
||||||
|
"labels = svm_model.classes_\n",
|
||||||
|
"\n",
|
||||||
|
"# Plot\n",
|
||||||
|
"disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)\n",
|
||||||
|
"disp.plot(cmap=plt.cm.Blues) # You can change colormap\n",
|
||||||
|
"plt.title(\"Confusion Matrix of Sensor B Test on Dataset B\")\n",
|
||||||
|
"plt.show()"
|
||||||
|
]
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
"metadata": {
|
"metadata": {
|
||||||
|
|||||||
@@ -35,8 +35,8 @@ def complement_pairs(n, prefix, extension):
|
|||||||
if a != orig_a: # skip original a
|
if a != orig_a: # skip original a
|
||||||
yield (filename, [a, a + 25]) # use yield instead of return to return a generator of tuples
|
yield (filename, [a, a + 25]) # use yield instead of return to return a generator of tuples
|
||||||
|
|
||||||
def generate_df_tuples(total_dfs, prefix, extension, first_col_start, last_col_offset,
|
def generate_df_tuples(prefix: str, total_dfs: int=30, extension: str="TXT", first_col_start: int=1, last_col_offset: int=25,
|
||||||
group_size=5, special_groups=None, group=True):
|
group_size: int=5, special_groups: list=None, group: bool=True):
|
||||||
"""
|
"""
|
||||||
Generate a structured list of tuples containing DataFrame references and column indices.
|
Generate a structured list of tuples containing DataFrame references and column indices.
|
||||||
|
|
||||||
|
|||||||
@@ -1,16 +1,190 @@
|
|||||||
from src.ml.model_selection import inference_model
|
|
||||||
from joblib import load
|
from joblib import load
|
||||||
|
import pandas as pd
|
||||||
|
from src.data_preprocessing import *
|
||||||
|
from src.process_stft import compute_stft
|
||||||
|
from typing import List, Tuple
|
||||||
|
from sklearn.base import BaseEstimator
|
||||||
|
import json
|
||||||
|
|
||||||
x = 30
|
def probability_damage(pred: Tuple[np.ndarray, np.ndarray], model_classes: BaseEstimator, percentage=False) -> Dict[str, int]:
|
||||||
file = f"D:/thesis/data/dataset_B/zzzBD{x}.TXT"
|
"""
|
||||||
sensor = 1
|
Process the prediction output to return unique labels and their counts.
|
||||||
model = {"SVM": f"D:/thesis/models/sensor{sensor}/SVM.joblib",
|
"""
|
||||||
"SVM with PCA": f"D:/thesis/models/sensor{sensor}/SVM with StandardScaler and PCA.joblib",
|
labels, counts = np.unique(pred, return_counts=True)
|
||||||
"XGBoost": f"D:/thesis/models/sensor{sensor}/XGBoost.joblib"}
|
label_counts = dict(zip(labels, counts))
|
||||||
|
|
||||||
|
# init all models classes probability of damage with 0 in dictionary
|
||||||
|
pod: Dict[np.ndarray, int] = dict.fromkeys(model_classes.classes_, 0)
|
||||||
|
|
||||||
|
# update corresponding data
|
||||||
|
pod.update(label_counts)
|
||||||
|
|
||||||
|
# turn the value into ratio instead of prediction counts
|
||||||
|
for label, count in pod.items():
|
||||||
|
|
||||||
|
ratio: float = count/np.sum(counts)
|
||||||
|
|
||||||
|
if percentage:
|
||||||
|
pod[label] = ratio * 100
|
||||||
|
else:
|
||||||
|
pod[label] = ratio
|
||||||
|
return pod
|
||||||
|
|
||||||
|
def convert_keys_to_strings(obj):
|
||||||
|
"""
|
||||||
|
Recursively convert all dictionary keys to strings.
|
||||||
|
"""
|
||||||
|
if isinstance(obj, dict):
|
||||||
|
return {str(key): convert_keys_to_strings(value) for key, value in obj["data"].items()}
|
||||||
|
elif isinstance(obj, list):
|
||||||
|
return [convert_keys_to_strings(item) for item in obj["data"]]
|
||||||
|
else:
|
||||||
|
return obj
|
||||||
|
|
||||||
|
def inference(model_sensor_A_path: str, model_sensor_B_path: str, file_path: str):
|
||||||
|
|
||||||
|
# Generate column indices
|
||||||
|
column_index: List[Tuple[int, int]] = [
|
||||||
|
(i + 1, i + 26)
|
||||||
|
for i in range(5)
|
||||||
|
]
|
||||||
|
# Load a single case data
|
||||||
|
df: pd.DataFrame = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True)
|
||||||
|
# Take case name
|
||||||
|
case_name: str = file_path.split("/")[-1].split(".")[0]
|
||||||
|
# Extract relevant columns for each sensor
|
||||||
|
column_data: List[Tuple[pd.Series[float], pd.Series[float]]] = [
|
||||||
|
(df.iloc[:, i[0]], df.iloc[:, i[1]])
|
||||||
|
for i in column_index
|
||||||
|
]
|
||||||
|
|
||||||
|
column_data_stft: List[Tuple[pd.DataFrame, pd.DataFrame]] = [
|
||||||
|
(compute_stft(sensor_A), compute_stft(sensor_B))
|
||||||
|
for (sensor_A, sensor_B) in column_data
|
||||||
|
]
|
||||||
|
|
||||||
|
# Load the model
|
||||||
|
model_sensor_A = load(model_sensor_A_path)
|
||||||
|
model_sensor_B = load(model_sensor_B_path)
|
||||||
|
|
||||||
|
res = {}
|
||||||
|
|
||||||
|
for i, (stft_A, stft_B) in enumerate(column_data_stft):
|
||||||
|
# Make predictions using the model
|
||||||
|
pred_A: list[int] = model_sensor_A.predict(stft_A)
|
||||||
|
pred_B: list[int] = model_sensor_B.predict(stft_B)
|
||||||
|
|
||||||
|
|
||||||
|
percentage_A = probability_damage(pred_A, model_sensor_A)
|
||||||
|
percentage_B = probability_damage(pred_B, model_sensor_B)
|
||||||
|
|
||||||
|
|
||||||
|
res[f"Column_{i+1}"] = {
|
||||||
|
"Sensor_A": {
|
||||||
|
# "Predictions": pred_A,
|
||||||
|
"PoD": percentage_A
|
||||||
|
},
|
||||||
|
"Sensor_B": {
|
||||||
|
# "Predictions": pred_B,
|
||||||
|
"PoD": percentage_B
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final_res = {"data": res, "case": case_name}
|
||||||
|
return final_res
|
||||||
|
|
||||||
|
def heatmap(result, damage_classes: list[int] = [1, 2, 3, 4, 5, 6]):
|
||||||
|
from scipy.interpolate import RectBivariateSpline
|
||||||
|
resolution = 300
|
||||||
|
y = list(range(1, len(damage_classes)+1))
|
||||||
|
|
||||||
|
# length of column
|
||||||
|
x = list(range(len(result["data"])))
|
||||||
|
|
||||||
|
# X, Y = np.meshgrid(x, y)
|
||||||
|
Z = []
|
||||||
|
for _, column_data in result["data"].items():
|
||||||
|
sensor_a_pod = column_data['Sensor_A']['PoD']
|
||||||
|
Z.append([sensor_a_pod.get(cls, 0) for cls in damage_classes])
|
||||||
|
Z = np.array(Z).T
|
||||||
|
|
||||||
|
y2 = np.linspace(1, len(damage_classes), resolution)
|
||||||
|
x2 = np.linspace(0,4,resolution)
|
||||||
|
f = RectBivariateSpline(x, y, Z.T, kx=2, ky=2) # 2nd degree quadratic spline interpolation
|
||||||
|
|
||||||
|
Z2 = f(x2, y2).T.clip(0, 1) # clip to ignores negative values from cubic interpolation
|
||||||
|
|
||||||
|
X2, Y2 = np.meshgrid(x2, y2)
|
||||||
|
# breakpoint()
|
||||||
|
c = plt.pcolormesh(X2, Y2, Z2, cmap='jet', shading='auto')
|
||||||
|
|
||||||
|
# Add a colorbar
|
||||||
|
plt.colorbar(c, label='Probability of Damage (PoD)')
|
||||||
|
plt.gca().invert_xaxis()
|
||||||
|
plt.grid(True, linestyle='-', alpha=0.7)
|
||||||
|
plt.xticks(np.arange(int(X2.min()), int(X2.max())+1, 1))
|
||||||
|
plt.xlabel("Column Index")
|
||||||
|
plt.ylabel("Damage Index")
|
||||||
|
plt.title(result["case"])
|
||||||
|
# plt.xticks(ticks=x2, labels=[f'Col_{i+1}' for i in range(len(result))])
|
||||||
|
# plt.gca().xaxis.set_major_locator(MultipleLocator(65/4))
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
import json
|
||||||
|
from scipy.interpolate import UnivariateSpline
|
||||||
|
|
||||||
|
|
||||||
|
result = inference(
|
||||||
|
"D:/thesis/models/Sensor A/SVM with StandardScaler and PCA.joblib",
|
||||||
|
"D:/thesis/models/Sensor B/SVM with StandardScaler and PCA.joblib",
|
||||||
|
"D:/thesis/data/dataset_B/zzzBD19.TXT"
|
||||||
|
)
|
||||||
|
|
||||||
|
# heatmap(result)
|
||||||
|
# Convert all keys to strings before dumping to JSON
|
||||||
|
# result_with_string_keys = convert_keys_to_strings(result)
|
||||||
|
# print(json.dumps(result_with_string_keys, indent=4))
|
||||||
|
|
||||||
|
# Create a 5x2 subplot grid (5 rows for each column, 2 columns for sensors)
|
||||||
|
fig, axes = plt.subplots(nrows=5, ncols=2, figsize=(5, 50))
|
||||||
|
|
||||||
|
# # Define damage class labels for x-axis
|
||||||
|
damage_classes = [1, 2, 3, 4, 5, 6]
|
||||||
|
|
||||||
|
# # Loop through each column in the data
|
||||||
|
for row_idx, (column_name, column_data) in enumerate(result['data'].items()):
|
||||||
|
# Plot Sensor A in the first column of subplots
|
||||||
|
sensor_a_pod = column_data['Sensor_A']['PoD']
|
||||||
|
x_values = list(range(len(damage_classes)))
|
||||||
|
y_values = [sensor_a_pod.get(cls, 0) for cls in damage_classes]
|
||||||
|
|
||||||
|
# x2 = np.linspace(1, 6, 100)
|
||||||
|
# interp = UnivariateSpline(x_values, y_values, s=0)
|
||||||
|
axes[row_idx, 0].plot(x_values, y_values, '-', linewidth=2, markersize=8)
|
||||||
|
axes[row_idx, 0].set_title(f"{column_name} - Sensor A", fontsize=10)
|
||||||
|
axes[row_idx, 0].set_xticks(x_values)
|
||||||
|
axes[row_idx, 0].set_xticklabels(damage_classes)
|
||||||
|
axes[row_idx, 0].set_ylim(0, 1.05)
|
||||||
|
axes[row_idx, 0].set_ylabel('Probability')
|
||||||
|
axes[row_idx, 0].set_xlabel('Damage Class')
|
||||||
|
axes[row_idx, 0].grid(True, linestyle='-', alpha=0.5)
|
||||||
|
|
||||||
|
# Plot Sensor B in the second column of subplots
|
||||||
|
sensor_b_pod = column_data['Sensor_B']['PoD']
|
||||||
|
y_values = [sensor_b_pod.get(cls, 0) for cls in damage_classes]
|
||||||
|
axes[row_idx, 1].plot(x_values, y_values, '-', linewidth=2, markersize=8)
|
||||||
|
axes[row_idx, 1].set_title(f"{column_name} - Sensor B", fontsize=10)
|
||||||
|
axes[row_idx, 1].set_xticks(x_values)
|
||||||
|
axes[row_idx, 1].set_xticklabels(damage_classes)
|
||||||
|
axes[row_idx, 1].set_ylim(0, 1.05)
|
||||||
|
axes[row_idx, 1].set_ylabel('Probability')
|
||||||
|
axes[row_idx, 1].set_xlabel('Damage Class')
|
||||||
|
axes[row_idx, 1].grid(True, linestyle='-', alpha=0.5)
|
||||||
|
|
||||||
|
# Adjust layout to prevent overlap
|
||||||
|
fig.tight_layout(rect=[0, 0, 1, 0.96]) # Leave space for suptitle
|
||||||
|
plt.subplots_adjust(hspace=1, wspace=0.3) # Adjust spacing between subplots
|
||||||
|
plt.suptitle(f"Case {result['case']}", fontsize=16, y=0.98) # Adjust suptitle position
|
||||||
|
plt.show()
|
||||||
|
|
||||||
index = ((x-1) % 5) + 1
|
|
||||||
inference_model(model["SVM"], file, column_question=index)
|
|
||||||
print("---")
|
|
||||||
inference_model(model["SVM with PCA"], file, column_question=index)
|
|
||||||
print("---")
|
|
||||||
inference_model(model["XGBoost"], file, column_question=index)
|
|
||||||
@@ -1,9 +1,11 @@
|
|||||||
import os
|
import os
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from scipy.signal import stft, hann
|
from scipy.signal import stft
|
||||||
|
from scipy.signal.windows import hann
|
||||||
import glob
|
import glob
|
||||||
import multiprocessing # Added import for multiprocessing
|
import multiprocessing # Added import for multiprocessing
|
||||||
|
from typing import Union, Tuple
|
||||||
|
|
||||||
# Define the base directory where DAMAGE_X folders are located
|
# Define the base directory where DAMAGE_X folders are located
|
||||||
damage_base_path = 'D:/thesis/data/converted/raw'
|
damage_base_path = 'D:/thesis/data/converted/raw'
|
||||||
@@ -19,16 +21,38 @@ for dir_path in output_dirs.values():
|
|||||||
os.makedirs(dir_path, exist_ok=True)
|
os.makedirs(dir_path, exist_ok=True)
|
||||||
|
|
||||||
# Define STFT parameters
|
# Define STFT parameters
|
||||||
|
|
||||||
|
# Number of damage cases (adjust as needed)
|
||||||
|
num_damage_cases = 6 # Change to 30 if you have 30 damage cases
|
||||||
|
|
||||||
|
# Function to perform STFT and return magnitude
|
||||||
|
def compute_stft(vibration_data: np.ndarray, return_param: bool = False) -> Union[pd.DataFrame, Tuple[pd.DataFrame, list[int, int, int]]]:
|
||||||
|
"""
|
||||||
|
Computes the Short-Time Fourier Transform (STFT) magnitude of the input vibration data.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
vibration_data : numpy.ndarray
|
||||||
|
The input vibration data as a 1D NumPy array.
|
||||||
|
return_param : bool, optional
|
||||||
|
If True, the function returns additional STFT parameters (window size, hop size, and sampling frequency).
|
||||||
|
Defaults to False.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
pd.DataFrame
|
||||||
|
The transposed STFT magnitude, with frequencies as columns, if `return_param` is False.
|
||||||
|
tuple
|
||||||
|
If `return_param` is True, returns a tuple containing:
|
||||||
|
- pd.DataFrame: The transposed STFT magnitude, with frequencies as columns.
|
||||||
|
- list[int, int, int]: A list of STFT parameters [window_size, hop_size, Fs].
|
||||||
|
"""
|
||||||
|
|
||||||
window_size = 1024
|
window_size = 1024
|
||||||
hop_size = 512
|
hop_size = 512
|
||||||
window = hann(window_size)
|
window = hann(window_size)
|
||||||
Fs = 1024
|
Fs = 1024
|
||||||
|
|
||||||
# Number of damage cases (adjust as needed)
|
|
||||||
num_damage_cases = 0 # Change to 30 if you have 30 damage cases
|
|
||||||
|
|
||||||
# Function to perform STFT and return magnitude
|
|
||||||
def compute_stft(vibration_data, Fs=Fs, window_size=window_size, hop_size=hop_size):
|
|
||||||
frequencies, times, Zxx = stft(
|
frequencies, times, Zxx = stft(
|
||||||
vibration_data,
|
vibration_data,
|
||||||
fs=Fs,
|
fs=Fs,
|
||||||
@@ -37,9 +61,19 @@ def compute_stft(vibration_data, Fs=Fs, window_size=window_size, hop_size=hop_si
|
|||||||
noverlap=window_size - hop_size
|
noverlap=window_size - hop_size
|
||||||
)
|
)
|
||||||
stft_magnitude = np.abs(Zxx)
|
stft_magnitude = np.abs(Zxx)
|
||||||
return stft_magnitude.T # Transpose to have frequencies as columns
|
|
||||||
|
|
||||||
def process_damage_case(damage_num, Fs=Fs, window_size=window_size, hop_size=hop_size, output_dirs=output_dirs):
|
# Convert STFT result to DataFrame
|
||||||
|
df_stft = pd.DataFrame(
|
||||||
|
stft_magnitude.T,
|
||||||
|
columns=[f"Freq_{freq:.2f}" for freq in np.linspace(0, Fs/2, stft_magnitude.shape[1])]
|
||||||
|
)
|
||||||
|
# breakpoint()
|
||||||
|
if return_param:
|
||||||
|
return df_stft, [window_size, hop_size, Fs]
|
||||||
|
else:
|
||||||
|
return df_stft
|
||||||
|
|
||||||
|
def process_damage_case(damage_num):
|
||||||
damage_folder = os.path.join(damage_base_path, f'DAMAGE_{damage_num}')
|
damage_folder = os.path.join(damage_base_path, f'DAMAGE_{damage_num}')
|
||||||
if damage_num == 0:
|
if damage_num == 0:
|
||||||
# Number of test runs per damage case
|
# Number of test runs per damage case
|
||||||
@@ -83,13 +117,8 @@ def process_damage_case(damage_num, Fs=Fs, window_size=window_size, hop_size=hop
|
|||||||
vibration_data = df.iloc[:, 1].values
|
vibration_data = df.iloc[:, 1].values
|
||||||
|
|
||||||
# Perform STFT
|
# Perform STFT
|
||||||
stft_magnitude = compute_stft(vibration_data, Fs=Fs, window_size=window_size, hop_size=hop_size)
|
df_stft = compute_stft(vibration_data)
|
||||||
|
|
||||||
# Convert STFT result to DataFrame
|
|
||||||
df_stft = pd.DataFrame(
|
|
||||||
stft_magnitude,
|
|
||||||
columns=[f"Freq_{freq:.2f}" for freq in np.linspace(0, Fs/2, stft_magnitude.shape[1])]
|
|
||||||
)
|
|
||||||
# only inlcude 21 samples vector features for first 45 num_test_runs else include 22 samples vector features
|
# only inlcude 21 samples vector features for first 45 num_test_runs else include 22 samples vector features
|
||||||
if damage_num == 0:
|
if damage_num == 0:
|
||||||
print(f"Processing damage_num = 0, test_num = {test_num}")
|
print(f"Processing damage_num = 0, test_num = {test_num}")
|
||||||
@@ -117,11 +146,11 @@ def process_damage_case(damage_num, Fs=Fs, window_size=window_size, hop_size=hop
|
|||||||
# Save the aggregated STFT to CSV
|
# Save the aggregated STFT to CSV
|
||||||
with open(output_file, 'w') as file:
|
with open(output_file, 'w') as file:
|
||||||
file.write('sep=,\n')
|
file.write('sep=,\n')
|
||||||
df_aggregated.to_csv(output_file, index=False)
|
df_aggregated.to_csv(file, index=False)
|
||||||
print(f"Saved aggregated STFT for Sensor {sensor_num}, Damage {damage_num} to {output_file}")
|
print(f"Saved aggregated STFT for Sensor {sensor_num}, Damage {damage_num} to {output_file}")
|
||||||
else:
|
else:
|
||||||
print(f"No STFT data aggregated for Sensor {sensor_num}, Damage {damage_num}.")
|
print(f"No STFT data aggregated for Sensor {sensor_num}, Damage {damage_num}.")
|
||||||
|
|
||||||
if __name__ == "__main__": # Added main guard for multiprocessing
|
if __name__ == "__main__": # Added main guard for multiprocessing
|
||||||
with multiprocessing.Pool() as pool:
|
with multiprocessing.Pool() as pool:
|
||||||
pool.map(process_damage_case, range(0, num_damage_cases + 1))
|
pool.map(process_damage_case, range(num_damage_cases + 1))
|
||||||
|
|||||||
Reference in New Issue
Block a user