From a2e339a0a075248b2671f5a63242ade5f13f2248 Mon Sep 17 00:00:00 2001 From: nuluh Date: Fri, 13 Dec 2024 16:30:06 +0700 Subject: [PATCH] feat: Implement STFT verification for individual test runs against aggregated data --- code/src/verify_stft.py | 133 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 code/src/verify_stft.py diff --git a/code/src/verify_stft.py b/code/src/verify_stft.py new file mode 100644 index 0000000..1831c3e --- /dev/null +++ b/code/src/verify_stft.py @@ -0,0 +1,133 @@ +import os +import pandas as pd +import numpy as np +from scipy.signal import stft, hann +import glob + +# Define the base directory where DAMAGE_X folders are located +damage_base_path = 'D:/thesis/data/converted/raw/' + +# Define sensor directories +sensor_dirs = { + 'sensor1': os.path.join(damage_base_path, 'sensor1'), + 'sensor2': os.path.join(damage_base_path, 'sensor2') +} + +# Define STFT parameters +window_size = 1024 +hop_size = 512 +window = hann(window_size) +Fs = 1024 + +def verify_stft(damage_num, test_num, sensor_num): + """ + Verifies the STFT of an individual test run against the aggregated STFT data. + + Parameters: + - damage_num (int): Damage case number. + - test_num (int): Test run number. + - sensor_num (int): Sensor number (1 or 2). + """ + # Mapping sensor number to suffix + sensor_suffix = f'_0{sensor_num}' + + # Construct the file name for the individual test run + individual_file_name = f'DAMAGE_{damage_num}_TEST{test_num}{sensor_suffix}.csv' + individual_file_path = os.path.join(damage_base_path, f'DAMAGE_{damage_num}', individual_file_name) + + # Check if the individual file exists + if not os.path.isfile(individual_file_path): + print(f"File {individual_file_path} does not exist. Skipping verification for this test run.") + return + + # Read the individual test run CSV + try: + df_individual = pd.read_csv(individual_file_path) + except Exception as e: + print(f"Error reading {individual_file_path}: {e}. Skipping verification for this test run.") + return + + # Ensure the CSV has exactly two columns: 'Timestamp (s)' and 'Sensor X' + if df_individual.shape[1] != 2: + print(f"Unexpected number of columns in {individual_file_path}. Expected 2, got {df_individual.shape[1]}. Skipping.") + return + + # Extract vibration data + vibration_data = df_individual.iloc[:, 1].values + + # Perform STFT + frequencies, times, Zxx = stft( + vibration_data, + fs=Fs, + window=window, + nperseg=window_size, + noverlap=window_size - hop_size + ) + + # Compute magnitude and transpose + stft_magnitude = np.abs(Zxx).T # Shape: (513, 513) + + # Select random row indices to verify (e.g., 3 random rows) + np.random.seed(42) # For reproducibility + sample_row_indices = np.random.choice(stft_magnitude.shape[0], size=3, replace=False) + + # Read the aggregated STFT CSV + aggregated_file_name = f'stft_data{sensor_num}_{damage_num}.csv' + aggregated_file_path = os.path.join(sensor_dirs[f'sensor{sensor_num}'], aggregated_file_name) + + if not os.path.isfile(aggregated_file_path): + print(f"Aggregated file {aggregated_file_path} does not exist. Skipping verification for this test run.") + return + + try: + df_aggregated = pd.read_csv(aggregated_file_path) + except Exception as e: + print(f"Error reading {aggregated_file_path}: {e}. Skipping verification for this test run.") + return + + # Calculate the starting row index in the aggregated CSV + # Each test run contributes 513 rows + start_row = (test_num - 1) * 513 + end_row = start_row + 513 # Exclusive + + # Ensure the aggregated CSV has enough rows + if df_aggregated.shape[0] < end_row: + print(f"Aggregated file {aggregated_file_path} does not have enough rows for Test {test_num}. Skipping.") + return + + # Extract the corresponding STFT block from the aggregated CSV + df_aggregated_block = df_aggregated.iloc[start_row:end_row].values # Shape: (513, 513) + + # Compare selected rows + all_match = True + for row_idx in sample_row_indices: + individual_row = stft_magnitude[row_idx] + aggregated_row = df_aggregated_block[row_idx] + + # Check if the rows are almost equal within a tolerance + if np.allclose(individual_row, aggregated_row, atol=1e-6): + verification_status = "MATCH" + else: + verification_status = "MISMATCH" + all_match = False + + # Print the comparison details + print(f"Comparing Damage {damage_num}, Test {test_num}, Sensor {sensor_num}, Row {row_idx}: {verification_status}") + print(f"Individual STFT Row {row_idx}: {individual_row[:5]} ... {individual_row[-5:]}") + print(f"Aggregated STFT Row {row_idx + start_row}: {aggregated_row[:5]} ... {aggregated_row[-5:]}\n") + + # If all sampled rows match, print a verification success message + if all_match: + print(f"STFT of DAMAGE_{damage_num}_TEST{test_num}_{sensor_num}.csv is verified. On `stft_data{sensor_num}_{damage_num}.csv` start at rows {start_row} to {end_row} with 513 rows.\n") + else: + print(f"STFT of DAMAGE_{damage_num}_TEST{test_num}_{sensor_num}.csv has discrepancies in `stft_data{sensor_num}_{damage_num}.csv` start at rows {start_row} to {end_row} with 513 rows.\n") + +# Define the number of damage cases and test runs +num_damage_cases = 6 # Adjust to 30 as per your dataset +num_test_runs = 5 + +# Iterate through all damage cases, test runs, and sensors +for damage_num in range(1, num_damage_cases + 1): + for test_num in range(1, num_test_runs + 1): + for sensor_num in [1, 2]: + verify_stft(damage_num, test_num, sensor_num)