feat: Implement STFT verification for individual test runs against aggregated data
This commit is contained in:
133
code/src/verify_stft.py
Normal file
133
code/src/verify_stft.py
Normal file
@@ -0,0 +1,133 @@
|
||||
import os
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from scipy.signal import stft, hann
|
||||
import glob
|
||||
|
||||
# Define the base directory where DAMAGE_X folders are located
|
||||
damage_base_path = 'D:/thesis/data/converted/raw/'
|
||||
|
||||
# Define sensor directories
|
||||
sensor_dirs = {
|
||||
'sensor1': os.path.join(damage_base_path, 'sensor1'),
|
||||
'sensor2': os.path.join(damage_base_path, 'sensor2')
|
||||
}
|
||||
|
||||
# Define STFT parameters
|
||||
window_size = 1024
|
||||
hop_size = 512
|
||||
window = hann(window_size)
|
||||
Fs = 1024
|
||||
|
||||
def verify_stft(damage_num, test_num, sensor_num):
|
||||
"""
|
||||
Verifies the STFT of an individual test run against the aggregated STFT data.
|
||||
|
||||
Parameters:
|
||||
- damage_num (int): Damage case number.
|
||||
- test_num (int): Test run number.
|
||||
- sensor_num (int): Sensor number (1 or 2).
|
||||
"""
|
||||
# Mapping sensor number to suffix
|
||||
sensor_suffix = f'_0{sensor_num}'
|
||||
|
||||
# Construct the file name for the individual test run
|
||||
individual_file_name = f'DAMAGE_{damage_num}_TEST{test_num}{sensor_suffix}.csv'
|
||||
individual_file_path = os.path.join(damage_base_path, f'DAMAGE_{damage_num}', individual_file_name)
|
||||
|
||||
# Check if the individual file exists
|
||||
if not os.path.isfile(individual_file_path):
|
||||
print(f"File {individual_file_path} does not exist. Skipping verification for this test run.")
|
||||
return
|
||||
|
||||
# Read the individual test run CSV
|
||||
try:
|
||||
df_individual = pd.read_csv(individual_file_path)
|
||||
except Exception as e:
|
||||
print(f"Error reading {individual_file_path}: {e}. Skipping verification for this test run.")
|
||||
return
|
||||
|
||||
# Ensure the CSV has exactly two columns: 'Timestamp (s)' and 'Sensor X'
|
||||
if df_individual.shape[1] != 2:
|
||||
print(f"Unexpected number of columns in {individual_file_path}. Expected 2, got {df_individual.shape[1]}. Skipping.")
|
||||
return
|
||||
|
||||
# Extract vibration data
|
||||
vibration_data = df_individual.iloc[:, 1].values
|
||||
|
||||
# Perform STFT
|
||||
frequencies, times, Zxx = stft(
|
||||
vibration_data,
|
||||
fs=Fs,
|
||||
window=window,
|
||||
nperseg=window_size,
|
||||
noverlap=window_size - hop_size
|
||||
)
|
||||
|
||||
# Compute magnitude and transpose
|
||||
stft_magnitude = np.abs(Zxx).T # Shape: (513, 513)
|
||||
|
||||
# Select random row indices to verify (e.g., 3 random rows)
|
||||
np.random.seed(42) # For reproducibility
|
||||
sample_row_indices = np.random.choice(stft_magnitude.shape[0], size=3, replace=False)
|
||||
|
||||
# Read the aggregated STFT CSV
|
||||
aggregated_file_name = f'stft_data{sensor_num}_{damage_num}.csv'
|
||||
aggregated_file_path = os.path.join(sensor_dirs[f'sensor{sensor_num}'], aggregated_file_name)
|
||||
|
||||
if not os.path.isfile(aggregated_file_path):
|
||||
print(f"Aggregated file {aggregated_file_path} does not exist. Skipping verification for this test run.")
|
||||
return
|
||||
|
||||
try:
|
||||
df_aggregated = pd.read_csv(aggregated_file_path)
|
||||
except Exception as e:
|
||||
print(f"Error reading {aggregated_file_path}: {e}. Skipping verification for this test run.")
|
||||
return
|
||||
|
||||
# Calculate the starting row index in the aggregated CSV
|
||||
# Each test run contributes 513 rows
|
||||
start_row = (test_num - 1) * 513
|
||||
end_row = start_row + 513 # Exclusive
|
||||
|
||||
# Ensure the aggregated CSV has enough rows
|
||||
if df_aggregated.shape[0] < end_row:
|
||||
print(f"Aggregated file {aggregated_file_path} does not have enough rows for Test {test_num}. Skipping.")
|
||||
return
|
||||
|
||||
# Extract the corresponding STFT block from the aggregated CSV
|
||||
df_aggregated_block = df_aggregated.iloc[start_row:end_row].values # Shape: (513, 513)
|
||||
|
||||
# Compare selected rows
|
||||
all_match = True
|
||||
for row_idx in sample_row_indices:
|
||||
individual_row = stft_magnitude[row_idx]
|
||||
aggregated_row = df_aggregated_block[row_idx]
|
||||
|
||||
# Check if the rows are almost equal within a tolerance
|
||||
if np.allclose(individual_row, aggregated_row, atol=1e-6):
|
||||
verification_status = "MATCH"
|
||||
else:
|
||||
verification_status = "MISMATCH"
|
||||
all_match = False
|
||||
|
||||
# Print the comparison details
|
||||
print(f"Comparing Damage {damage_num}, Test {test_num}, Sensor {sensor_num}, Row {row_idx}: {verification_status}")
|
||||
print(f"Individual STFT Row {row_idx}: {individual_row[:5]} ... {individual_row[-5:]}")
|
||||
print(f"Aggregated STFT Row {row_idx + start_row}: {aggregated_row[:5]} ... {aggregated_row[-5:]}\n")
|
||||
|
||||
# If all sampled rows match, print a verification success message
|
||||
if all_match:
|
||||
print(f"STFT of DAMAGE_{damage_num}_TEST{test_num}_{sensor_num}.csv is verified. On `stft_data{sensor_num}_{damage_num}.csv` start at rows {start_row} to {end_row} with 513 rows.\n")
|
||||
else:
|
||||
print(f"STFT of DAMAGE_{damage_num}_TEST{test_num}_{sensor_num}.csv has discrepancies in `stft_data{sensor_num}_{damage_num}.csv` start at rows {start_row} to {end_row} with 513 rows.\n")
|
||||
|
||||
# Define the number of damage cases and test runs
|
||||
num_damage_cases = 6 # Adjust to 30 as per your dataset
|
||||
num_test_runs = 5
|
||||
|
||||
# Iterate through all damage cases, test runs, and sensors
|
||||
for damage_num in range(1, num_damage_cases + 1):
|
||||
for test_num in range(1, num_test_runs + 1):
|
||||
for sensor_num in [1, 2]:
|
||||
verify_stft(damage_num, test_num, sensor_num)
|
||||
Reference in New Issue
Block a user