feat(src): enhance heatmap and plotting functions for sensor data visualization
This commit is contained in:
@@ -1,13 +1,12 @@
|
|||||||
from joblib import load
|
from joblib import load
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from data_preprocessing import *
|
import numpy as np
|
||||||
from process_stft import compute_stft
|
from process_stft import compute_stft
|
||||||
from typing import List, Tuple
|
from typing import List, Tuple
|
||||||
from sklearn.base import BaseEstimator
|
from sklearn.base import BaseEstimator
|
||||||
import json
|
|
||||||
|
|
||||||
|
|
||||||
def probability_damage(pred: Tuple[np.ndarray, np.ndarray], model_classes: BaseEstimator, percentage=False) -> Dict[str, int]:
|
def probability_damage(pred: Tuple[np.ndarray, np.ndarray], model_classes: BaseEstimator, percentage=False) -> dict[str, int]:
|
||||||
"""
|
"""
|
||||||
Process the prediction output to return unique labels and their counts.
|
Process the prediction output to return unique labels and their counts.
|
||||||
"""
|
"""
|
||||||
@@ -15,7 +14,7 @@ def probability_damage(pred: Tuple[np.ndarray, np.ndarray], model_classes: BaseE
|
|||||||
label_counts = dict(zip(labels, counts))
|
label_counts = dict(zip(labels, counts))
|
||||||
|
|
||||||
# init all models classes probability of damage with 0 in dictionary
|
# init all models classes probability of damage with 0 in dictionary
|
||||||
pod: Dict[np.ndarray, int] = dict.fromkeys(model_classes.classes_, 0)
|
pod: dict[np.ndarray, int] = dict.fromkeys(model_classes.classes_, 0)
|
||||||
|
|
||||||
# update corresponding data
|
# update corresponding data
|
||||||
pod.update(label_counts)
|
pod.update(label_counts)
|
||||||
@@ -93,7 +92,7 @@ def inference(model_sensor_A_path: str, model_sensor_B_path: str, file_path: str
|
|||||||
final_res = {"data": res, "case": case_name}
|
final_res = {"data": res, "case": case_name}
|
||||||
return final_res
|
return final_res
|
||||||
|
|
||||||
def heatmap(result, damage_classes: list[int] = [1, 2, 3, 4, 5, 6]):
|
def heatmap(result, damage_classes: list[int] = [1, 2, 3, 4, 5, 6], sensor: str = 'Sensor_A'):
|
||||||
from scipy.interpolate import RectBivariateSpline
|
from scipy.interpolate import RectBivariateSpline
|
||||||
resolution = 300
|
resolution = 300
|
||||||
y = list(range(1, len(damage_classes)+1))
|
y = list(range(1, len(damage_classes)+1))
|
||||||
@@ -104,7 +103,7 @@ def heatmap(result, damage_classes: list[int] = [1, 2, 3, 4, 5, 6]):
|
|||||||
# X, Y = np.meshgrid(x, y)
|
# X, Y = np.meshgrid(x, y)
|
||||||
Z = []
|
Z = []
|
||||||
for _, column_data in result["data"].items():
|
for _, column_data in result["data"].items():
|
||||||
sensor_a_pod = column_data['Sensor_A']['PoD']
|
sensor_a_pod = column_data[sensor]['PoD']
|
||||||
Z.append([sensor_a_pod.get(cls, 0) for cls in damage_classes])
|
Z.append([sensor_a_pod.get(cls, 0) for cls in damage_classes])
|
||||||
Z = np.array(Z).T
|
Z = np.array(Z).T
|
||||||
|
|
||||||
@@ -116,19 +115,70 @@ def heatmap(result, damage_classes: list[int] = [1, 2, 3, 4, 5, 6]):
|
|||||||
|
|
||||||
X2, Y2 = np.meshgrid(x2, y2)
|
X2, Y2 = np.meshgrid(x2, y2)
|
||||||
# breakpoint()
|
# breakpoint()
|
||||||
c = plt.pcolormesh(X2, Y2, Z2, cmap='jet', shading='auto')
|
plt.figure(figsize=(9, 6))
|
||||||
|
|
||||||
|
# Change the window title
|
||||||
|
plt.gcf().canvas.manager.set_window_title(f"Heatmap {sensor} - {result['case']}")
|
||||||
|
|
||||||
|
c = plt.pcolormesh(X2, Y2, Z2, cmap='jet', shading='auto', vmin=0, vmax=1)
|
||||||
|
|
||||||
# Add a colorbar
|
# Add a colorbar
|
||||||
plt.colorbar(c, label='Probability of Damage (PoD)')
|
plt.colorbar(c, label='Probability of Damage (PoD)', fraction=0.05)
|
||||||
plt.gca().invert_xaxis()
|
plt.gca().invert_xaxis()
|
||||||
plt.grid(True, linestyle='-', alpha=0.7)
|
plt.grid(True, linestyle='-', alpha=0.7)
|
||||||
plt.xticks(np.arange(int(X2.min()), int(X2.max())+1, 1))
|
plt.xticks(np.arange(0, 5, 1), np.arange(1, 6, 1))
|
||||||
plt.xlabel("Column Index")
|
plt.xlabel("Column Index")
|
||||||
plt.ylabel("Damage Index")
|
plt.ylabel("Damage Index")
|
||||||
plt.title(result["case"])
|
plt.title(result["case"])
|
||||||
# plt.xticks(ticks=x2, labels=[f'Col_{i+1}' for i in range(len(result))])
|
# plt.xticks(ticks=x2, labels=[f'Col_{i+1}' for i in range(len(result))])
|
||||||
# plt.gca().xaxis.set_major_locator(MultipleLocator(65/4))
|
# plt.gca().xaxis.set_major_locator(MultipleLocator(65/4))
|
||||||
plt.show()
|
plt.show()
|
||||||
|
|
||||||
|
def plot_sensor_pod(result, sensor: str = 'Sensor_A', damage_classes: list[int] = [1, 2, 3, 4, 5, 6]):
|
||||||
|
"""
|
||||||
|
Plot Probability of Damage (PoD) for all columns for a specific sensor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
result: Dictionary containing inference results
|
||||||
|
sensor: Sensor name ('Sensor_A' or 'Sensor_B')
|
||||||
|
damage_classes: List of damage class labels
|
||||||
|
"""
|
||||||
|
x_values = list(range(len(damage_classes)))
|
||||||
|
|
||||||
|
# Define colors for different columns
|
||||||
|
colors = plt.cm.tab10(np.linspace(0, 1, len(result['data'])))
|
||||||
|
|
||||||
|
# Create figure
|
||||||
|
plt.figure(figsize=(9, 6))
|
||||||
|
|
||||||
|
# Create a figure
|
||||||
|
|
||||||
|
# Change the window title
|
||||||
|
plt.gcf().canvas.manager.set_window_title(f"PoD {sensor} - {result['case']}")
|
||||||
|
# line_styles = ['-', '--', '-.', ':'] # Solid, dashed, dash-dot, dotted
|
||||||
|
markers = ['o', 's', '^', 'D', 'x'] # Circle, square, triangle, diamond, cross
|
||||||
|
|
||||||
|
# Loop through each column in the data
|
||||||
|
for row_idx, (column_name, column_data) in enumerate(result['data'].items()):
|
||||||
|
sensor_pod = column_data[sensor]['PoD']
|
||||||
|
y_values = [sensor_pod.get(cls, 0) for cls in damage_classes]
|
||||||
|
|
||||||
|
# Cycle through line styles and markers
|
||||||
|
# line_style = line_styles[row_idx % len(line_styles)]
|
||||||
|
marker = markers[row_idx % len(markers)]
|
||||||
|
|
||||||
|
plt.plot(x_values, y_values, linestyle='-', marker=marker, linewidth=2, markersize=8,
|
||||||
|
color=colors[row_idx], label=column_name, alpha=0.8)
|
||||||
|
|
||||||
|
# Configure plot
|
||||||
|
# plt.title(f"{sensor}", fontsize=14, fontweight='bold')
|
||||||
|
plt.xticks(x_values, damage_classes)
|
||||||
|
plt.ylim(0, 1.05)
|
||||||
|
plt.ylabel('Probability', fontsize=12)
|
||||||
|
plt.xlabel('Damage Class', fontsize=12)
|
||||||
|
plt.grid(True, linestyle='-', alpha=0.3)
|
||||||
|
plt.legend(loc='best', fontsize=10)
|
||||||
|
plt.show()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
@@ -137,55 +187,13 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
|
|
||||||
result = inference(
|
result = inference(
|
||||||
"D:/thesis/models/Sensor A/SVM with StandardScaler and PCA.joblib",
|
"D:/thesis/models/Sensor A/finegrid_pca32_c8_g-8.joblib",
|
||||||
"D:/thesis/models/Sensor B/SVM with StandardScaler and PCA.joblib",
|
"D:/thesis/models/Sensor B/finegrid_pca16_c3_g-5.5.joblib",
|
||||||
"D:/thesis/data/dataset_B/zzzBU.TXT"
|
"D:/thesis/data/dataset_B/zzzBD30.TXT"
|
||||||
)
|
)
|
||||||
|
|
||||||
heatmap(result)
|
heatmap(result, sensor='Sensor_A')
|
||||||
# Convert all keys to strings before dumping to JSON
|
heatmap(result, sensor='Sensor_B')
|
||||||
# result_with_string_keys = convert_keys_to_strings(result)
|
|
||||||
# print(json.dumps(result_with_string_keys, indent=4))
|
|
||||||
|
|
||||||
# Create a 5x2 subplot grid (5 rows for each column, 2 columns for sensors)
|
plot_sensor_pod(result, sensor='Sensor_A')
|
||||||
fig, axes = plt.subplots(nrows=5, ncols=2, figsize=(5, 50))
|
plot_sensor_pod(result, sensor='Sensor_B')
|
||||||
|
|
||||||
# # Define damage class labels for x-axis
|
|
||||||
damage_classes = [1, 2, 3, 4, 5, 6]
|
|
||||||
|
|
||||||
# # Loop through each column in the data
|
|
||||||
for row_idx, (column_name, column_data) in enumerate(result['data'].items()):
|
|
||||||
# Plot Sensor A in the first column of subplots
|
|
||||||
sensor_a_pod = column_data['Sensor_A']['PoD']
|
|
||||||
x_values = list(range(len(damage_classes)))
|
|
||||||
y_values = [sensor_a_pod.get(cls, 0) for cls in damage_classes]
|
|
||||||
|
|
||||||
# x2 = np.linspace(1, 6, 100)
|
|
||||||
# interp = UnivariateSpline(x_values, y_values, s=0)
|
|
||||||
axes[row_idx, 0].plot(x_values, y_values, '-', linewidth=2, markersize=8)
|
|
||||||
axes[row_idx, 0].set_title(f"{column_name} - Sensor A", fontsize=10)
|
|
||||||
axes[row_idx, 0].set_xticks(x_values)
|
|
||||||
axes[row_idx, 0].set_xticklabels(damage_classes)
|
|
||||||
axes[row_idx, 0].set_ylim(0, 1.05)
|
|
||||||
axes[row_idx, 0].set_ylabel('Probability')
|
|
||||||
axes[row_idx, 0].set_xlabel('Damage Class')
|
|
||||||
axes[row_idx, 0].grid(True, linestyle='-', alpha=0.5)
|
|
||||||
|
|
||||||
# Plot Sensor B in the second column of subplots
|
|
||||||
sensor_b_pod = column_data['Sensor_B']['PoD']
|
|
||||||
y_values = [sensor_b_pod.get(cls, 0) for cls in damage_classes]
|
|
||||||
axes[row_idx, 1].plot(x_values, y_values, '-', linewidth=2, markersize=8)
|
|
||||||
axes[row_idx, 1].set_title(f"{column_name} - Sensor B", fontsize=10)
|
|
||||||
axes[row_idx, 1].set_xticks(x_values)
|
|
||||||
axes[row_idx, 1].set_xticklabels(damage_classes)
|
|
||||||
axes[row_idx, 1].set_ylim(0, 1.05)
|
|
||||||
axes[row_idx, 1].set_ylabel('Probability')
|
|
||||||
axes[row_idx, 1].set_xlabel('Damage Class')
|
|
||||||
axes[row_idx, 1].grid(True, linestyle='-', alpha=0.5)
|
|
||||||
|
|
||||||
# Adjust layout to prevent overlap
|
|
||||||
fig.tight_layout(rect=[0, 0, 1, 0.96]) # Leave space for suptitle
|
|
||||||
plt.subplots_adjust(hspace=1, wspace=0.3) # Adjust spacing between subplots
|
|
||||||
plt.suptitle(f"Case {result['case']}", fontsize=16, y=0.98) # Adjust suptitle position
|
|
||||||
plt.show()
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user