Files
thesis/code/src/process_stft.py

157 lines
5.9 KiB
Python

import os
import pandas as pd
import numpy as np
from scipy.signal import stft
from scipy.signal.windows import hann
import glob
import multiprocessing # Added import for multiprocessing
from typing import Union, Tuple
# Define the base directory where DAMAGE_X folders are located
damage_base_path = 'D:/thesis/data/converted/raw_B'
# Define output directories for each sensor
output_dirs = {
'sensor1': os.path.join(damage_base_path, 'sensor1'),
'sensor2': os.path.join(damage_base_path, 'sensor2')
}
# Create output directories if they don't exist
for dir_path in output_dirs.values():
os.makedirs(dir_path, exist_ok=True)
# Define STFT parameters
# Number of damage cases (adjust as needed)
num_damage_cases = 6 # Change to 30 if you have 30 damage cases
# Function to perform STFT and return magnitude
def compute_stft(vibration_data: np.ndarray, return_param: bool = False) -> Union[pd.DataFrame, Tuple[pd.DataFrame, list[int, int, int]]]:
"""
Computes the Short-Time Fourier Transform (STFT) magnitude of the input vibration data.
Parameters
----------
vibration_data : numpy.ndarray
The input vibration data as a 1D NumPy array.
return_param : bool, optional
If True, the function returns additional STFT parameters (window size, hop size, and sampling frequency).
Defaults to False.
Returns
-------
pd.DataFrame
The transposed STFT magnitude, with frequencies as columns, if `return_param` is False.
tuple
If `return_param` is True, returns a tuple containing:
- pd.DataFrame: The transposed STFT magnitude, with frequencies as columns.
- list[int, int, int]: A list of STFT parameters [window_size, hop_size, Fs].
"""
window_size = 1024
hop_size = 512
window = hann(window_size)
Fs = 1024
frequencies, times, Zxx = stft(
vibration_data,
fs=Fs,
window=window,
nperseg=window_size,
noverlap=window_size - hop_size
)
stft_magnitude = np.abs(Zxx)
# Convert STFT result to DataFrame
df_stft = pd.DataFrame(
stft_magnitude.T,
columns=[f"Freq_{freq:.2f}" for freq in np.linspace(0, Fs/2, stft_magnitude.shape[1])]
)
# breakpoint()
if return_param:
return df_stft, [window_size, hop_size, Fs]
else:
return df_stft
def process_damage_case(damage_num):
damage_folder = os.path.join(damage_base_path, f'DAMAGE_{damage_num}')
if damage_num == 0:
# Number of test runs per damage case
num_test_runs = 125
else:
num_test_runs = 5
# Check if the damage folder exists
if not os.path.isdir(damage_folder):
print(f"Folder {damage_folder} does not exist. Skipping...")
return
# Process Sensor 1 and Sensor 2 separately
for sensor_num in [1, 2]:
aggregated_stft = [] # List to hold STFTs from all test runs
# Iterate over all test runs
for test_num in range(1, num_test_runs + 1):
# Construct the filename based on sensor number
# Sensor 1 corresponds to '_01', Sensor 2 corresponds to '_02'
sensor_suffix = f'_0{sensor_num}'
file_name = f'DAMAGE_{damage_num}_TEST{test_num}{sensor_suffix}.csv'
file_path = os.path.join(damage_folder, file_name)
# Check if the file exists
if not os.path.isfile(file_path):
print(f"File {file_path} does not exist. Skipping...")
continue
# Read the CSV file
try:
df = pd.read_csv(file_path)
except Exception as e:
print(f"Error reading {file_path}: {e}. Skipping...")
continue
# Ensure the CSV has exactly two columns: 'Timestamp (s)' and 'Sensor X'
if df.shape[1] != 2:
print(f"Unexpected number of columns in {file_path}. Expected 2, got {df.shape[1]}. Skipping...")
continue
vibration_data = df.iloc[:, 1].values
# Perform STFT
df_stft = compute_stft(vibration_data)
# only inlcude 21 samples vector features for first 45 num_test_runs else include 22 samples vector features
if damage_num == 0:
print(f"Processing damage_num = 0, test_num = {test_num}")
if test_num <= 60:
df_stft = df_stft.iloc[:20, :]
print(f"Reduced df_stft shape (20 samples): {df_stft.shape}")
else:
df_stft = df_stft.iloc[:21, :]
print(f"Reduced df_stft shape (21 samples): {df_stft.shape}")
# Append to the aggregated list
aggregated_stft.append(df_stft)
print(sum(df.shape[0] for df in aggregated_stft))
# Concatenate all STFT DataFrames vertically
if aggregated_stft:
df_aggregated = pd.concat(aggregated_stft, ignore_index=True)
# Define output filename
output_file = os.path.join(
output_dirs[f'sensor{sensor_num}'],
f'stft_data{sensor_num}_{damage_num}.csv'
)
# Save the aggregated STFT to CSV
with open(output_file, 'w') as file:
file.write('sep=,\n')
df_aggregated.to_csv(file, index=False)
print(f"Saved aggregated STFT for Sensor {sensor_num}, Damage {damage_num} to {output_file}")
else:
print(f"No STFT data aggregated for Sensor {sensor_num}, Damage {damage_num}.")
if __name__ == "__main__": # Added main guard for multiprocessing
with multiprocessing.Pool() as pool:
pool.map(process_damage_case, range(num_damage_cases + 1))