Compare commits

...

44 Commits

Author SHA1 Message Date
nuluh
855114d633 refactor(notebooks): clean up imports, adjust damage case processing, and improve model training structure
- Removed unnecessary imports (os, pandas, numpy) from the STFT notebook.
- Adjusted the number of damage cases in the multiprocessing pool to correctly reflect the range.
- Updated model training code for Sensor B to ensure consistent naming and structure.
- Cleaned up commented-out code for clarity and maintainability.
2025-08-17 23:39:57 +07:00
nuluh
4a1c0ed83e feat(src): implement inference function with damage probability calculations and visualization
Closes #103
2025-08-17 22:21:17 +07:00
nuluh
274cd60d27 refactor(src): update generate_df_tuples function signature to include type hints for better clarity 2025-08-11 18:49:41 +07:00
nuluh
9f23d82fab fix(src): correct file writing method in process.stft.process_damage_case function to fix incorrect first column name
Closes #104
2025-08-11 13:17:46 +07:00
nuluh
a8288b1426 refactor(src): enhance compute_stft function with type hints, improved documentation by moving column renaming process from process_damage_case to compute_stft 2025-08-11 13:15:48 +07:00
nuluh
860542f3f9 refactor(src): restructure compute_stft function to be pure function and include return parameters and improve clarity 2025-08-10 20:02:45 +07:00
nuluh
0e28ed6dd0 feat(notebooks): add cross-dataset validation for Sensor A and Sensor B models
Closes #74
2025-07-28 16:41:54 +07:00
Rifqi D. Panuluh
80d4a66925 Merge pull request #100 from nuluh/feature/99-exp-alternative-undamage-case-data
[EXP] Alterntive Undamage Case Data
2025-07-24 18:09:05 +07:00
nuluh
9b018efc15 refactor(notebooks): update STFT notebook to improve clarity and structure of sensor evaluation sections 2025-07-24 17:00:31 +07:00
nuluh
2fbdeac1eb refactor(test): update import statement to use data_preprocessing module 2025-07-18 19:29:02 +07:00
nuluh
086032c250 refactor(notebooks): clean up to be more readable notebooks 2025-07-18 19:28:43 +07:00
nuluh
f6c71739df refactor(ml): clean up model_selection.py by removing unused code and improving function structure 2025-07-18 19:27:46 +07:00
Rifqi D. Panuluh
2dc915949b chore(.gitignore): add additional LaTeX file types to ignore list 2025-07-17 14:05:59 +00:00
nuluh
18824e05c0 refactor(ml): update inference calls to use new model structure and improve clarity 2025-07-17 00:18:01 +07:00
nuluh
2504157b29 feat(src): replace convert.py to src/data_preprocessing.py and fix some functions prefix parameter 2025-07-02 03:25:18 +07:00
nuluh
5ba628b678 refactor(src): make compute_stft and process_damage_case to be pure function that explicitly need STFT arguments to be passed 2025-07-01 14:32:52 +07:00
nuluh
a93adc8af3 feat(notebooks): minimize stft.ipynb notebooks and add STFT data preview plot.
- Consolidated import statements for pandas and matplotlib.
- Updated STFT plotting for Sensor 1 and Sensor 2 datasets with improved visualization using pcolormesh.
- Enhanced subplot organization for better clarity in visual representation.
- Added titles and adjusted layout for all plots.
2025-06-30 01:36:44 +07:00
nuluh
c2df42cc2b feat(ml): add XGBoost model to inference options and update commented inference calls 2025-06-27 10:35:27 +07:00
nuluh
465ed121f9 feat(notebooks): training model with new alternative undamaged (label 0) data 2025-06-27 10:34:23 +07:00
nuluh
d6975b4817 feat(src): update damage base path and adjust test run logic for damage case processing for undamage case new method 2025-06-27 10:33:54 +07:00
nuluh
79070921d7 feat(data): add complement_pairs function to generate complement tuples for implementing alternative undamage case method 2025-06-27 10:33:36 +07:00
nuluh
e8eb07a91b refactor(data): improve variable naming in generate_df_tuples function for clarity 2025-06-26 10:53:10 +07:00
nuluh
c98c6a091b refactor(data): update generate_df_tuples function for improved readibility code 2025-06-26 10:51:29 +07:00
nuluh
9921d7663b feat(src): add inference script for model evaluation 2025-06-24 14:08:38 +07:00
nuluh
459fbcc17a refactor(notebooks): visualization for sensor analysis and streamline data processing 2025-06-24 14:08:02 +07:00
nuluh
5041ee3feb feat(src): add confusion matrix plotting and label percentage calculation 2025-06-24 14:06:56 +07:00
nuluh
114ab849b9 feat(src): Add confusion matrix plotting function for model evaluation 2025-06-24 00:27:15 +07:00
nuluh
6196523ea0 feat(notebooks): Add confusion matrix plotting loop for Sensor 1 models 2025-06-21 01:10:03 +07:00
Rifqi D. Panuluh
46b66e0a90 Merge pull request #98 from nuluh/feat/53-feat-include-undamaged-node-classification
Closes #53
2025-06-18 09:06:04 +07:00
nuluh
18892c1188 WIP(notebooks): Add SVM with StandardScaler and PCA to sensor model definitions 2025-06-18 08:31:55 +07:00
nuluh
d0b603ba9f fix(data): Update DataProcessor instantiation for new data preprocessing implementation 2025-06-18 08:30:12 +07:00
nuluh
a7d8f1ef56 fix(data): Fix pool mapping to include undamaged case and add csv header separator line for Excel compatibility 2025-06-18 08:25:01 +07:00
nuluh
1164627bac fix(data): Fix export_to_csv to adapt new added undamaged scenario and add new parameter include_time to include 'Time' data 2025-06-18 01:54:12 +07:00
nuluh
58a672a680 fix(data): Fix generate_df_tuples function output bug when special_groups args is passed 2025-06-17 13:20:27 +07:00
nuluh
24c1484300 feat(data): Enhance DataProcessor to support dynamic base path and improve data loading with error handling and memory efficiency 2025-06-16 17:35:27 +07:00
nuluh
60ff4e0fa9 feat(data): Propose new damage file index generation to improve structure and flexibility in DataFrame handling 2025-06-16 03:13:07 +07:00
nuluh
3e652accfb refactor(data): remove unnecessary variable declaration in DataProcessor for loading dataframes 2025-06-14 04:02:42 +07:00
nuluh
66a09e0ddf feat(data): Enhance damage file index generation with undamaged file handling and improved error management (WIP) 2025-06-14 04:02:42 +07:00
nuluh
195f8143f0 refactor(data): remove redundant column extraction method and simplify dataframe loading 2025-06-14 00:57:54 +07:00
nuluh
e7332252a6 Merge branch 'feat/90-feat-preserve-trained-model' into dev 2025-06-12 03:38:15 +07:00
nuluh
4b0819f94e feat(notebooks): Enhance STFT notebook and model selection functionality
- Updated paths in the STFT notebook to reflect new data files.
- Improved plotting aesthetics for combined plots and added grid lines.
- Introduced a 3D spectrogram visualization for better data representation.
- Refactored model training function to include error handling and model export functionality.
- Adjusted model training calls to include export paths for saved models. Closes #90
- Added additional markdown cells for better documentation and clarity in the notebook.
2025-06-12 03:35:21 +07:00
nuluh
7613c08ebd feat(figures): add data preprocessing illustration diagram 2025-06-10 17:21:49 +07:00
nuluh
ad6cda4270 fix(notebooks): update sensor data paths and improve plotting aesthetics 2025-06-10 17:20:13 +07:00
nuluh
ebaa263781 chore(convert): comment out create_damage_files obsolete function 2025-06-09 18:59:51 +07:00
10 changed files with 1808 additions and 1090 deletions

15
.gitignore vendored
View File

@@ -2,4 +2,17 @@
data/**/*.csv
.venv/
*.pyc
*.egg-info/
*.egg-info/
# Latex
*.aux
*.log
*.out
*.toc
*.bbl
*.blg
*.fdb_latexmk
*.fls
*.synctex.gz
*.dvi

View File

@@ -1,4 +1,7 @@
{
"python.analysis.extraPaths": ["./code/src/features"],
"python.analysis.extraPaths": [
"./code/src/features",
"${workspaceFolder}/code/src"
],
"jupyter.notebookFileRoot": "${workspaceFolder}/code"
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,357 @@
import pandas as pd
import os
import re
import sys
import numpy as np
from colorama import Fore, Style, init
from typing import TypedDict, Dict, List
from joblib import load
from pprint import pprint
# class DamageFilesIndices(TypedDict):
# damage_index: int
# files: list[int]
OriginalSingleDamageScenarioFilePath = str
DamageScenarioGroupIndex = int
OriginalSingleDamageScenario = pd.DataFrame
SensorIndex = int
VectorColumnIndex = List[SensorIndex]
VectorColumnIndices = List[VectorColumnIndex]
DamageScenarioGroup = List[OriginalSingleDamageScenario]
GroupDataset = List[DamageScenarioGroup]
class DamageFilesIndices(TypedDict):
damage_index: int
files: List[str]
def complement_pairs(n, prefix, extension):
"""
Return the four complement tuples for zzzBD<n>.TXT
"""
filename = f"{prefix}{n}.{extension}" # TODO: shouldnt be hardcoded
orig_a = (n - 1) % 5 + 1 # 1 … 5
for a in range(1, 6): # a = 1 … 5
if a != orig_a: # skip original a
yield (filename, [a, a + 25]) # use yield instead of return to return a generator of tuples
def generate_df_tuples(prefix: str, total_dfs: int=30, extension: str="TXT", first_col_start: int=1, last_col_offset: int=25,
group_size: int=5, special_groups: list=None, group: bool=True):
"""
Generate a structured list of tuples containing DataFrame references and column indices.
Parameters:
-----------
total_dfs : int, default 30
Total number of DataFrames to include in the tuples
group_size : int, default 5
Number of DataFrames in each group (determines the pattern repeat)
prefix : str
Prefix for DataFrame variable names
first_col_start : int, default 1
Starting value for the first column index (1-indexed)
last_col_offset : int, default 25
Offset to add to first_col_start to get the last column index
special_groups : list of dict, optional
List of special groups to insert, each dict should contain:
- 'df_name': The DataFrame name to use for all tuples in this group
- 'position': Where to insert this group (0 for beginning)
- 'size': Size of this group (default: same as group_size)
Returns:
--------
list
List of tuples, where each tuple contains (df_name, [first_col, last_col])
"""
result = []
if group:
# Group tuples into sublists of group_size
for g in range(6): # TODO: shouldnt be hardcoded
group = []
for i in range(1, 6): # TODO: shouldnt be hardcoded
n = g * 5 + i
bottom_end = i # 1, 2, 3, 4, 5
top_end = bottom_end + 25 # 26, 27, 28, 29, 30 # TODO: shouldnt be hardcoded
group.append((f"{prefix}{n}.{extension}", [bottom_end, top_end]))
result.append(group)
# Add special groups at specified positions (other than beginning)
if special_groups:
result.insert(0, special_groups)
return result
# file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT")
# df = pd.read_csv(file_path, sep="\t", skiprows=10) # Read with explicit column names
class DataProcessor:
def __init__(self, file_index, cache_path: str = None, base_path: str = None, include_time: bool = False):
self.file_index = file_index
self.base_path = base_path
self.include_time = include_time
if cache_path:
self.data = load(cache_path)
else:
self.data = self.load_data()
def load_data(self):
for idxs, group in enumerate(self.file_index):
for idx, tuple in enumerate(group):
file_path = os.path.join(self.base_path, tuple[0]) # ('zzzAD1.TXT')
if self.include_time:
col_indices = [0] + tuple[1] # [1, 26] + [0] -> [0, 1, 26]
else:
col_indices = tuple[1] # [1, 26]
try:
# Read the CSV file
df = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True)
self.file_index[idxs][idx] = df.iloc[:, col_indices].copy() # Extract the specified columns
print(f"Processed {file_path}, extracted columns: {col_indices}")
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
def _load_dataframe(self, file_path: str) -> OriginalSingleDamageScenario:
"""
Loads a single data file into a pandas DataFrame.
:param file_path: Path to the data file.
:return: DataFrame containing the numerical data.
"""
df = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True, nrows=1)
return df
def _load_all_data(self) -> GroupDataset:
"""
Loads all data files based on the grouping dictionary and returns a nested list.
:return: A nested list of DataFrames where the outer index corresponds to group_idx - 1.
"""
data = []
# Find the maximum group index to determine the list size
max_group_idx = len(self.file_index) if self.file_index else 0
# Handle case when file_index is empty
if max_group_idx == 0:
raise ValueError("No file index provided; file_index is empty.")
# Initialize empty lists
for _ in range(max_group_idx):
data.append([])
# Fill the list with data
for group_idx, file_list in self.file_index.items():
group_idx -= 1 # adjust due to undamage file
data[group_idx] = [self._load_dataframe(file) for file in file_list]
return data
def get_group_data(self, group_idx: int) -> List[pd.DataFrame]:
"""
Returns the list of DataFrames for the given group index.
:param group_idx: Index of the group.
:return: List of DataFrames.
"""
return self.data.get([group_idx, []])
def get_column_names(self, group_idx: int, file_idx: int = 0) -> List[str]:
"""
Returns the column names for the given group and file indices.
:param group_idx: Index of the group.
:param file_idx: Index of the file in the group.
:return: List of column names.
"""
if group_idx in self.data and len(self.data[group_idx]) > file_idx:
return self.data[group_idx][file_idx].columns.tolist()
return []
def get_data_info(self):
"""
Print information about the loaded data structure.
Adapted for when self.data is a List instead of a Dictionary.
"""
if isinstance(self.data, list):
# For each sublist in self.data, get the type names of all elements
pprint(
[
(
[type(item).__name__ for item in sublist]
if isinstance(sublist, list)
else type(sublist).__name__
)
for sublist in self.data
]
)
else:
pprint(
{
key: [type(df).__name__ for df in value]
for key, value in self.data.items()
}
if isinstance(self.data, dict)
else type(self.data).__name__
)
def _create_vector_column_index(self) -> VectorColumnIndices:
vector_col_idx: VectorColumnIndices = []
y = 0
for data_group in self.data: # len(data_group[i]) = 5
for j in data_group: # len(j[i]) =
c: VectorColumnIndex = []
x = 0
for _ in range(6): # TODO: range(6) should be dynamic and parameterized
c.append(x + y)
x += 5
vector_col_idx.append(c)
y += 1
return vector_col_idx # TODO: refactor this so that it returns just from first data_group without using for loops through the self.data that seems unnecessary
def create_vector_column(self, overwrite=True) -> List[List[List[pd.DataFrame]]]:
"""
Create a vector column from the loaded data.
:param overwrite: Overwrite the original data with vector column-based data.
"""
idxs = self._create_vector_column_index()
for i, group in enumerate(self.data):
# add 1 to all indices to account for 'Time' being at position 0
for j, df in enumerate(group):
idx = [_ + 1 for _ in idxs[j]]
# slice out the desired columns, copy into a fresh DataFrame,
# then overwrite self.data[i][j] with it
self.data[i][j] = df.iloc[:, idx].copy()
# TODO: if !overwrite:
def create_limited_sensor_vector_column(self, overwrite=True):
"""
Create a vector column from the loaded data.
:param overwrite: Overwrite the original data with vector column-based data.
"""
idx = self._create_vector_column_index()
# if overwrite:
for i in range(len(self.data)): # damage(s)
for j in range(len(self.data[i])): # col(s)
# Get the appropriate indices for slicing from idx
indices = idx[j]
# Get the current DataFrame
df = self.data[i][j]
# Keep the 'Time' column and select only specifid 'Real' colmns
# First, we add 1 to all indices to acount for 'Time' being at positiion 0
real_indices = [index + 1 for index in indices]
# Create list with Time column index (0) and the adjustedd Real indices
all_indices = [0] + [real_indices[0]] + [real_indices[-1]]
# Apply the slicing
self.data[i][j] = df.iloc[:, all_indices]
# TODO: if !overwrite:
def export_to_csv(self, output_dir: str, file_prefix: str = "DAMAGE"):
"""
Export the processed data to CSV files in the required folder structure.
:param output_dir: Directory to save the CSV files.
:param file_prefix: Prefix for the output filenames.
"""
for group_idx, group in enumerate(self.file_index, start=0):
group_folder = os.path.join(output_dir, f"{file_prefix}_{group_idx}")
os.makedirs(group_folder, exist_ok=True)
for test_idx, df in enumerate(group, start=1):
out1 = os.path.join(group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_01.csv")
cols_to_export = [0, 1] if self.include_time else [1]
df.iloc[:, cols_to_export].to_csv(out1, index=False)
out2 = os.path.join(group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_02.csv")
cols_to_export = [0, 2] if self.include_time else [2]
df.iloc[:, cols_to_export].to_csv(out2, index=False)
# def create_damage_files(base_path, output_base, prefix):
# # Initialize colorama
# init(autoreset=True)
# # Generate column labels based on expected duplication in input files
# columns = ["Real"] + [
# f"Real.{i}" for i in range(1, 30)
# ] # Explicitly setting column names
# sensor_end_map = {
# 1: "Real.25",
# 2: "Real.26",
# 3: "Real.27",
# 4: "Real.28",
# 5: "Real.29",
# }
# # Define the damage scenarios and the corresponding original file indices
# damage_scenarios = {
# 1: range(1, 6), # Damage 1 files from zzzAD1.csv to zzzAD5.csv
# 2: range(6, 11), # Damage 2 files from zzzAD6.csv to zzzAD10.csv
# 3: range(11, 16), # Damage 3 files from zzzAD11.csv to zzzAD15.csvs
# 4: range(16, 21), # Damage 4 files from zzzAD16.csv to zzzAD20.csv
# 5: range(21, 26), # Damage 5 files from zzzAD21.csv to zzzAD25.csv
# 6: range(26, 31), # Damage 6 files from zzzAD26.csv to zzzAD30.csv
# }
# damage_pad = len(str(len(damage_scenarios)))
# test_pad = len(str(30))
# for damage, files in damage_scenarios.items():
# for i, file_index in enumerate(files, start=1):
# # Load original data file
# file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT")
# df = pd.read_csv(
# file_path, sep="\t", skiprows=10
# ) # Read with explicit column names
# top_sensor = columns[i - 1]
# print(top_sensor, type(top_sensor))
# output_file_1 = os.path.join(
# output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_01.csv"
# )
# print(f"Creating {output_file_1} from taking zzz{prefix}D{file_index}.TXT")
# print("Taking datetime column on index 0...")
# print(f"Taking `{top_sensor}`...")
# os.makedirs(os.path.dirname(output_file_1), exist_ok=True)
# df[["Time", top_sensor]].to_csv(output_file_1, index=False)
# print(Fore.GREEN + "Done")
# bottom_sensor = sensor_end_map[i]
# output_file_2 = os.path.join(
# output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_02.csv"
# )
# print(f"Creating {output_file_2} from taking zzz{prefix}D{file_index}.TXT")
# print("Taking datetime column on index 0...")
# print(f"Taking `{bottom_sensor}`...")
# os.makedirs(os.path.dirname(output_file_2), exist_ok=True)
# df[["Time", bottom_sensor]].to_csv(output_file_2, index=False)
# print(Fore.GREEN + "Done")
# print("---")
def main():
if len(sys.argv) < 2:
print("Usage: python convert.py <path_to_csv_files>")
sys.exit(1)
base_path = sys.argv[1]
output_base = sys.argv[2]
prefix = sys.argv[3] # Define output directory
# Create output folders if they don't exist
# for i in range(1, 7):
# os.makedirs(os.path.join(output_base, f'DAMAGE_{i}'), exist_ok=True)
create_damage_files(base_path, output_base, prefix)
print(Fore.YELLOW + Style.BRIGHT + "All files have been created successfully.")
if __name__ == "__main__":
main()

190
code/src/ml/inference.py Normal file
View File

@@ -0,0 +1,190 @@
from joblib import load
import pandas as pd
from src.data_preprocessing import *
from src.process_stft import compute_stft
from typing import List, Tuple
from sklearn.base import BaseEstimator
import json
def probability_damage(pred: Tuple[np.ndarray, np.ndarray], model_classes: BaseEstimator, percentage=False) -> Dict[str, int]:
"""
Process the prediction output to return unique labels and their counts.
"""
labels, counts = np.unique(pred, return_counts=True)
label_counts = dict(zip(labels, counts))
# init all models classes probability of damage with 0 in dictionary
pod: Dict[np.ndarray, int] = dict.fromkeys(model_classes.classes_, 0)
# update corresponding data
pod.update(label_counts)
# turn the value into ratio instead of prediction counts
for label, count in pod.items():
ratio: float = count/np.sum(counts)
if percentage:
pod[label] = ratio * 100
else:
pod[label] = ratio
return pod
def convert_keys_to_strings(obj):
"""
Recursively convert all dictionary keys to strings.
"""
if isinstance(obj, dict):
return {str(key): convert_keys_to_strings(value) for key, value in obj["data"].items()}
elif isinstance(obj, list):
return [convert_keys_to_strings(item) for item in obj["data"]]
else:
return obj
def inference(model_sensor_A_path: str, model_sensor_B_path: str, file_path: str):
# Generate column indices
column_index: List[Tuple[int, int]] = [
(i + 1, i + 26)
for i in range(5)
]
# Load a single case data
df: pd.DataFrame = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True)
# Take case name
case_name: str = file_path.split("/")[-1].split(".")[0]
# Extract relevant columns for each sensor
column_data: List[Tuple[pd.Series[float], pd.Series[float]]] = [
(df.iloc[:, i[0]], df.iloc[:, i[1]])
for i in column_index
]
column_data_stft: List[Tuple[pd.DataFrame, pd.DataFrame]] = [
(compute_stft(sensor_A), compute_stft(sensor_B))
for (sensor_A, sensor_B) in column_data
]
# Load the model
model_sensor_A = load(model_sensor_A_path)
model_sensor_B = load(model_sensor_B_path)
res = {}
for i, (stft_A, stft_B) in enumerate(column_data_stft):
# Make predictions using the model
pred_A: list[int] = model_sensor_A.predict(stft_A)
pred_B: list[int] = model_sensor_B.predict(stft_B)
percentage_A = probability_damage(pred_A, model_sensor_A)
percentage_B = probability_damage(pred_B, model_sensor_B)
res[f"Column_{i+1}"] = {
"Sensor_A": {
# "Predictions": pred_A,
"PoD": percentage_A
},
"Sensor_B": {
# "Predictions": pred_B,
"PoD": percentage_B
}
}
final_res = {"data": res, "case": case_name}
return final_res
def heatmap(result, damage_classes: list[int] = [1, 2, 3, 4, 5, 6]):
from scipy.interpolate import RectBivariateSpline
resolution = 300
y = list(range(1, len(damage_classes)+1))
# length of column
x = list(range(len(result["data"])))
# X, Y = np.meshgrid(x, y)
Z = []
for _, column_data in result["data"].items():
sensor_a_pod = column_data['Sensor_A']['PoD']
Z.append([sensor_a_pod.get(cls, 0) for cls in damage_classes])
Z = np.array(Z).T
y2 = np.linspace(1, len(damage_classes), resolution)
x2 = np.linspace(0,4,resolution)
f = RectBivariateSpline(x, y, Z.T, kx=2, ky=2) # 2nd degree quadratic spline interpolation
Z2 = f(x2, y2).T.clip(0, 1) # clip to ignores negative values from cubic interpolation
X2, Y2 = np.meshgrid(x2, y2)
# breakpoint()
c = plt.pcolormesh(X2, Y2, Z2, cmap='jet', shading='auto')
# Add a colorbar
plt.colorbar(c, label='Probability of Damage (PoD)')
plt.gca().invert_xaxis()
plt.grid(True, linestyle='-', alpha=0.7)
plt.xticks(np.arange(int(X2.min()), int(X2.max())+1, 1))
plt.xlabel("Column Index")
plt.ylabel("Damage Index")
plt.title(result["case"])
# plt.xticks(ticks=x2, labels=[f'Col_{i+1}' for i in range(len(result))])
# plt.gca().xaxis.set_major_locator(MultipleLocator(65/4))
plt.show()
if __name__ == "__main__":
import matplotlib.pyplot as plt
import json
from scipy.interpolate import UnivariateSpline
result = inference(
"D:/thesis/models/Sensor A/SVM with StandardScaler and PCA.joblib",
"D:/thesis/models/Sensor B/SVM with StandardScaler and PCA.joblib",
"D:/thesis/data/dataset_B/zzzBD19.TXT"
)
# heatmap(result)
# Convert all keys to strings before dumping to JSON
# result_with_string_keys = convert_keys_to_strings(result)
# print(json.dumps(result_with_string_keys, indent=4))
# Create a 5x2 subplot grid (5 rows for each column, 2 columns for sensors)
fig, axes = plt.subplots(nrows=5, ncols=2, figsize=(5, 50))
# # Define damage class labels for x-axis
damage_classes = [1, 2, 3, 4, 5, 6]
# # Loop through each column in the data
for row_idx, (column_name, column_data) in enumerate(result['data'].items()):
# Plot Sensor A in the first column of subplots
sensor_a_pod = column_data['Sensor_A']['PoD']
x_values = list(range(len(damage_classes)))
y_values = [sensor_a_pod.get(cls, 0) for cls in damage_classes]
# x2 = np.linspace(1, 6, 100)
# interp = UnivariateSpline(x_values, y_values, s=0)
axes[row_idx, 0].plot(x_values, y_values, '-', linewidth=2, markersize=8)
axes[row_idx, 0].set_title(f"{column_name} - Sensor A", fontsize=10)
axes[row_idx, 0].set_xticks(x_values)
axes[row_idx, 0].set_xticklabels(damage_classes)
axes[row_idx, 0].set_ylim(0, 1.05)
axes[row_idx, 0].set_ylabel('Probability')
axes[row_idx, 0].set_xlabel('Damage Class')
axes[row_idx, 0].grid(True, linestyle='-', alpha=0.5)
# Plot Sensor B in the second column of subplots
sensor_b_pod = column_data['Sensor_B']['PoD']
y_values = [sensor_b_pod.get(cls, 0) for cls in damage_classes]
axes[row_idx, 1].plot(x_values, y_values, '-', linewidth=2, markersize=8)
axes[row_idx, 1].set_title(f"{column_name} - Sensor B", fontsize=10)
axes[row_idx, 1].set_xticks(x_values)
axes[row_idx, 1].set_xticklabels(damage_classes)
axes[row_idx, 1].set_ylim(0, 1.05)
axes[row_idx, 1].set_ylabel('Probability')
axes[row_idx, 1].set_xlabel('Damage Class')
axes[row_idx, 1].grid(True, linestyle='-', alpha=0.5)
# Adjust layout to prevent overlap
fig.tight_layout(rect=[0, 0, 1, 0.96]) # Leave space for suptitle
plt.subplots_adjust(hspace=1, wspace=0.3) # Adjust spacing between subplots
plt.suptitle(f"Case {result['case']}", fontsize=16, y=0.98) # Adjust suptitle position
plt.show()

View File

@@ -1,13 +1,14 @@
import numpy as np
import pandas as pd
import os
from sklearn.model_selection import train_test_split as sklearn_split
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from joblib import load
def create_ready_data(
stft_data_path: str,
stratify: np.ndarray = None,
) -> tuple:
) -> tuple[pd.DataFrame, np.ndarray]:
"""
Create a stratified train-test split from STFT data.
@@ -21,13 +22,13 @@ def create_ready_data(
Returns:
--------
tuple
(X_train, X_test, y_train, y_test) - Split datasets
(pd.DataFrame, np.ndarray) - Combined data and corresponding labels
"""
ready_data = []
for file in os.listdir(stft_data_path):
ready_data.append(pd.read_csv(os.path.join(stft_data_path, file)))
ready_data.append(pd.read_csv(os.path.join(stft_data_path, file), skiprows=1))
y_data = [i for i in range(len(ready_data))]
y_data = [i for i in range(len(ready_data))] # TODO: Should be replaced with actual desired labels
# Combine all dataframes in ready_data into a single dataframe
if ready_data: # Check if the list is not empty
@@ -55,3 +56,207 @@ def create_ready_data(
y = np.array([])
return X, y
def train_and_evaluate_model(
model, model_name, sensor_label, x_train, y_train, x_test, y_test, export=None
):
"""
Train a machine learning model, evaluate its performance, and optionally export it.
This function trains the provided model on the training data, evaluates its
performance on test data using accuracy score, and can save the trained model
to disk if an export path is provided.
Parameters
----------
model : estimator object
The machine learning model to train.
model_name : str
Name of the model, used for the export filename and in the returned results.
sensor_label : str
Label identifying which sensor's data the model is being trained on.
x_train : array-like or pandas.DataFrame
The training input samples.
y_train : array-like
The target values for training.
x_test : array-like or pandas.DataFrame
The test input samples.
y_test : array-like
The target values for testing.
export : str, optional
Directory path where the trained model should be saved. If None, model won't be saved.
Returns
-------
dict
Dictionary containing:
- 'model': model_name (str)
- 'sensor': sensor_label (str)
- 'accuracy': accuracy percentage (float)
Example
-------
>>> from sklearn.svm import SVC
>>> from sklearn.model_selection import train_test_split
>>> X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2)
>>> result = train_and_evaluate_model(
... SVC(),
... "SVM",
... "sensor1",
... X_train,
... y_train,
... X_test,
... y_test,
... export="models/sensor1"
... )
>>> print(f"Model accuracy: {result['accuracy']:.2f}%")
"""
from sklearn.metrics import accuracy_score
result = {"model": model_name, "sensor": sensor_label, "success": False}
try:
# Train the model
model.fit(x_train, y_train)
try:
y_pred = model.predict(x_test)
result["y_pred"] = y_pred # Convert to numpy array
except Exception as e:
result["error"] = f"Prediction error: {str(e)}"
return result
# Calculate accuracy
try:
accuracy = accuracy_score(y_test, y_pred) * 100
result["accuracy"] = accuracy
except Exception as e:
result["error"] = f"Accuracy calculation error: {str(e)}"
return result
# Export model if requested
if export:
try:
import joblib
full_path = os.path.join(export, f"{model_name}.joblib")
os.makedirs(os.path.dirname(full_path), exist_ok=True)
joblib.dump(model, full_path)
print(f"Model saved to {full_path}")
except Exception as e:
print(f"Warning: Failed to export model to {export}: {str(e)}")
result["export_error"] = str(e)
# Continue despite export error
result["success"] = True
return result
except Exception as e:
result["error"] = f"Training error: {str(e)}"
return result
def plot_confusion_matrix(results_sensor, y_test, title):
"""
Plot confusion matrices for each model in results_sensor1.
Parameters:
-----------
results_sensor1 : list
List of dictionaries containing model results.
x_test1 : array-like
Test input samples.
y_test : array-like
True labels for the test samples.
Returns:
--------
None
This function will display confusion matrices for each model in results_sensor1.
Example
-------
>>> results_sensor1 = [
... {'model': 'model1', 'accuracy': 95.0},
... {'model': 'model2', 'accuracy': 90.0}
... ]
>>> x_test1 = np.random.rand(100, 10) # Example test data
>>> y_test = np.random.randint(0, 2, size=100) # Example true labels
>>> plot_confusion_matrix(results_sensor1, x_test1, y_test)
"""
# Iterate through each model result and plot confusion matrix
for i in results_sensor:
model = load(f"D:/thesis/models/{i['sensor']}/{i['model']}.joblib")
cm = confusion_matrix(y_test, i['y_pred']) # -> ndarray
# get the class labels
labels = model.classes_
# Plot
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
disp.plot(cmap=plt.cm.Blues) # You can change colormap
plt.title(f"{title}")
def calculate_label_percentages(labels):
"""
Calculate and print the percentage distribution of unique labels in a numpy array.
Parameters:
labels (np.array): Input array of labels.
Returns:
None
"""
# Count occurrences of each unique label
unique, counts = np.unique(labels, return_counts=True)
# Calculate percentages
percentages = (counts / len(labels)) * 100
# Build and print the result string
result = "\n".join([f"Label {label}: {percentage:.2f}%" for label, percentage in zip(unique, percentages)])
return print(result)
def inference_model(
models, raw_file, column_question: int = None
):
"""
Perform inference using a trained machine learning model on a raw vibration data file with questioned column grid.
Parameters
----------
model : dict with some exported model path
The trained machine learning model to use for inference.
x_test : array-like or pandas.DataFrame
The input samples for which predictions are to be made.
export : str, optional
Directory path where the predictions should be saved. If None, predictions won't be saved.
Returns
-------
np.ndarray
Array of predicted values.
Example
-------
>>> from sklearn.svm import SVC
>>> model = {"SVM": "models/sensor1/SVM.joblib", "SVM with PCA": "models/sensor1/SVM_with_PCA.joblib"}
>>> inference_model(model["SVM"], "zzzAD1.TXT", column_question=1)
"""
df = pd.read_csv(raw_file, delim_whitespace=True, skiprows=10, header=0, memory_map=True)
col_idx = []
for i in range(1,6):
idx = [i, i+5, i+10, i+15, i+20, i+25]
col_idx.append(idx)
vibration_data = df.iloc[:, column_question].values
# Perform STFT
from scipy.signal import stft, hann
freq, times, Zxx = stft(
vibration_data,
fs=1024,
window=hann(1024),
nperseg=1024,
noverlap=1024-512
)
data = pd.DataFrame(np.abs(Zxx).T, columns=[f"Freq_{freq:.2f}" for freq in np.linspace(0, 1024/2, Zxx.shape[1])])
data = data.rename(columns={"Freq_0.00": "00"}) # To match the model input format
model = load(models) # Load the model from the provided path
return calculate_label_percentages(model.predict(data.iloc[:21,:]))

View File

@@ -1,9 +1,11 @@
import os
import pandas as pd
import numpy as np
from scipy.signal import stft, hann
from scipy.signal import stft
from scipy.signal.windows import hann
import glob
import multiprocessing # Added import for multiprocessing
from typing import Union, Tuple
# Define the base directory where DAMAGE_X folders are located
damage_base_path = 'D:/thesis/data/converted/raw'
@@ -19,32 +21,65 @@ for dir_path in output_dirs.values():
os.makedirs(dir_path, exist_ok=True)
# Define STFT parameters
window_size = 1024
hop_size = 512
window = hann(window_size)
Fs = 1024
# Number of damage cases (adjust as needed)
num_damage_cases = 6 # Change to 30 if you have 30 damage cases
# Number of test runs per damage case
num_test_runs = 5
# Function to perform STFT and return magnitude
def compute_stft(vibration_data):
def compute_stft(vibration_data: np.ndarray, return_param: bool = False) -> Union[pd.DataFrame, Tuple[pd.DataFrame, list[int, int, int]]]:
"""
Computes the Short-Time Fourier Transform (STFT) magnitude of the input vibration data.
Parameters
----------
vibration_data : numpy.ndarray
The input vibration data as a 1D NumPy array.
return_param : bool, optional
If True, the function returns additional STFT parameters (window size, hop size, and sampling frequency).
Defaults to False.
Returns
-------
pd.DataFrame
The transposed STFT magnitude, with frequencies as columns, if `return_param` is False.
tuple
If `return_param` is True, returns a tuple containing:
- pd.DataFrame: The transposed STFT magnitude, with frequencies as columns.
- list[int, int, int]: A list of STFT parameters [window_size, hop_size, Fs].
"""
window_size = 1024
hop_size = 512
window = hann(window_size)
Fs = 1024
frequencies, times, Zxx = stft(
vibration_data,
fs=Fs,
window=window,
nperseg=window_size,
noverlap=window_size - hop_size
)
vibration_data,
fs=Fs,
window=window,
nperseg=window_size,
noverlap=window_size - hop_size
)
stft_magnitude = np.abs(Zxx)
return stft_magnitude.T # Transpose to have frequencies as columns
# Convert STFT result to DataFrame
df_stft = pd.DataFrame(
stft_magnitude.T,
columns=[f"Freq_{freq:.2f}" for freq in np.linspace(0, Fs/2, stft_magnitude.shape[1])]
)
# breakpoint()
if return_param:
return df_stft, [window_size, hop_size, Fs]
else:
return df_stft
def process_damage_case(damage_num):
damage_folder = os.path.join(damage_base_path, f'DAMAGE_{damage_num}')
if damage_num == 0:
# Number of test runs per damage case
num_test_runs = 120
else:
num_test_runs = 5
# Check if the damage folder exists
if not os.path.isdir(damage_folder):
print(f"Folder {damage_folder} does not exist. Skipping...")
@@ -79,20 +114,24 @@ def process_damage_case(damage_num):
print(f"Unexpected number of columns in {file_path}. Expected 2, got {df.shape[1]}. Skipping...")
continue
# Extract vibration data (assuming the second column is sensor data)
vibration_data = df.iloc[:, 1].values
# Perform STFT
stft_magnitude = compute_stft(vibration_data)
# Convert STFT result to DataFrame
df_stft = pd.DataFrame(
stft_magnitude,
columns=[f"Freq_{freq:.2f}" for freq in np.linspace(0, Fs/2, stft_magnitude.shape[1])]
)
df_stft = compute_stft(vibration_data)
# only inlcude 21 samples vector features for first 45 num_test_runs else include 22 samples vector features
if damage_num == 0:
print(f"Processing damage_num = 0, test_num = {test_num}")
if test_num <= 45:
df_stft = df_stft.iloc[:22, :]
print(f"Reduced df_stft shape (21 samples): {df_stft.shape}")
else:
df_stft = df_stft.iloc[:21, :]
print(f"Reduced df_stft shape (22 samples): {df_stft.shape}")
# Append to the aggregated list
aggregated_stft.append(df_stft)
print(sum(df.shape[0] for df in aggregated_stft))
# Concatenate all STFT DataFrames vertically
if aggregated_stft:
@@ -105,11 +144,13 @@ def process_damage_case(damage_num):
)
# Save the aggregated STFT to CSV
df_aggregated.to_csv(output_file, index=False)
with open(output_file, 'w') as file:
file.write('sep=,\n')
df_aggregated.to_csv(file, index=False)
print(f"Saved aggregated STFT for Sensor {sensor_num}, Damage {damage_num} to {output_file}")
else:
print(f"No STFT data aggregated for Sensor {sensor_num}, Damage {damage_num}.")
if __name__ == "__main__": # Added main guard for multiprocessing
with multiprocessing.Pool() as pool:
pool.map(process_damage_case, range(1, num_damage_cases + 1))
pool.map(process_damage_case, range(num_damage_cases + 1))

View File

@@ -1,360 +0,0 @@
import pandas as pd
import os
import re
import sys
import numpy as np
from colorama import Fore, Style, init
from typing import TypedDict, Dict, List
from joblib import load
from pprint import pprint
# class DamageFilesIndices(TypedDict):
# damage_index: int
# files: list[int]
OriginalSingleDamageScenarioFilePath = str
DamageScenarioGroupIndex = int
OriginalSingleDamageScenario = pd.DataFrame
SensorIndex = int
VectorColumnIndex = List[SensorIndex]
VectorColumnIndices = List[VectorColumnIndex]
DamageScenarioGroup = List[OriginalSingleDamageScenario]
GroupDataset = List[DamageScenarioGroup]
class DamageFilesIndices(TypedDict):
damage_index: int
files: List[str]
def generate_damage_files_index(**kwargs) -> DamageFilesIndices:
prefix: str = kwargs.get("prefix", "zzzAD")
extension: str = kwargs.get("extension", ".TXT")
num_damage: int = kwargs.get("num_damage")
file_index_start: int = kwargs.get("file_index_start")
col: int = kwargs.get("col")
base_path: str = kwargs.get("base_path")
damage_scenarios = {}
a = file_index_start
b = col + 1
for i in range(1, num_damage + 1):
damage_scenarios[i] = range(a, b)
a += col
b += col
# return damage_scenarios
x = {}
for damage, files in damage_scenarios.items():
x[damage] = [] # Initialize each key with an empty list
for i, file_index in enumerate(files, start=1):
if base_path:
x[damage].append(
os.path.normpath(
os.path.join(base_path, f"{prefix}{file_index}{extension}")
)
)
# if not os.path.exists(file_path):
# print(Fore.RED + f"File {file_path} does not exist.")
# continue
else:
x[damage].append(f"{prefix}{file_index}{extension}")
return x
# file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT")
# df = pd.read_csv( file_path, sep="\t", skiprows=10) # Read with explicit column names
class DataProcessor:
def __init__(self, file_index: DamageFilesIndices, cache_path: str = None):
self.file_index = file_index
if cache_path:
self.data = load(cache_path)
else:
self.data = self._load_all_data()
def _extract_column_names(self, file_path: str) -> List[str]:
"""
Extracts column names from the header of the given file.
Assumes the 6th line contains column names.
:param file_path: Path to the data file.
:return: List of column names.
"""
with open(file_path, "r") as f:
header_lines = [next(f) for _ in range(12)]
# Extract column names from the 6th line
channel_line = header_lines[10].strip()
tokens = re.findall(r'"([^"]+)"', channel_line)
if not channel_line.startswith('"'):
first_token = channel_line.split()[0]
tokens = [first_token] + tokens
return tokens # Prepend 'Time' column if applicable
def _load_dataframe(self, file_path: str) -> OriginalSingleDamageScenario:
"""
Loads a single data file into a pandas DataFrame.
:param file_path: Path to the data file.
:return: DataFrame containing the numerical data.
"""
col_names = self._extract_column_names(file_path)
df = pd.read_csv(
file_path, delim_whitespace=True, skiprows=11, header=None, memory_map=True
)
df.columns = col_names
return df
def _load_all_data(self) -> GroupDataset:
"""
Loads all data files based on the grouping dictionary and returns a nested list.
:return: A nested list of DataFrames where the outer index corresponds to group_idx - 1.
"""
data = []
# Find the maximum group index to determine the list size
max_group_idx = max(self.file_index.keys()) if self.file_index else 0
# Initialize empty lists
for _ in range(max_group_idx):
data.append([])
# Fill the list with data
for group_idx, file_list in self.file_index.items():
# Adjust index to be 0-based
list_idx = group_idx - 1
data[list_idx] = [self._load_dataframe(file) for file in file_list]
return data
def get_group_data(self, group_idx: int) -> List[pd.DataFrame]:
"""
Returns the list of DataFrames for the given group index.
:param group_idx: Index of the group.
:return: List of DataFrames.
"""
return self.data.get([group_idx, []])
def get_column_names(self, group_idx: int, file_idx: int = 0) -> List[str]:
"""
Returns the column names for the given group and file indices.
:param group_idx: Index of the group.
:param file_idx: Index of the file in the group.
:return: List of column names.
"""
if group_idx in self.data and len(self.data[group_idx]) > file_idx:
return self.data[group_idx][file_idx].columns.tolist()
return []
def get_data_info(self):
"""
Print information about the loaded data structure.
Adapted for when self.data is a List instead of a Dictionary.
"""
if isinstance(self.data, list):
# For each sublist in self.data, get the type names of all elements
pprint(
[
(
[type(item).__name__ for item in sublist]
if isinstance(sublist, list)
else type(sublist).__name__
)
for sublist in self.data
]
)
else:
pprint(
{
key: [type(df).__name__ for df in value]
for key, value in self.data.items()
}
if isinstance(self.data, dict)
else type(self.data).__name__
)
def _create_vector_column_index(self) -> VectorColumnIndices:
vector_col_idx: VectorColumnIndices = []
y = 0
for data_group in self.data: # len(data_group[i]) = 5
for j in data_group: # len(j[i]) =
c: VectorColumnIndex = [] # column vector c_{j}
x = 0
for _ in range(6): # TODO: range(6) should be dynamic and parameterized
c.append(x + y)
x += 5
vector_col_idx.append(c)
y += 1
return vector_col_idx
def create_vector_column(self, overwrite=True) -> List[List[List[pd.DataFrame]]]:
"""
Create a vector column from the loaded data.
:param overwrite: Overwrite the original data with vector column-based data.
"""
idx = self._create_vector_column_index()
# if overwrite:
for i in range(len(self.data)):
for j in range(len(self.data[i])):
# Get the appropriate indices for slicing from idx
indices = idx[j]
# Get the current DataFrame
df = self.data[i][j]
# Keep the 'Time' column and select only specified 'Real' columns
# First, we add 1 to all indices to account for 'Time' being at position 0
real_indices = [index + 1 for index in indices]
# Create list with Time column index (0) and the adjusted Real indices
all_indices = [0] + real_indices
# Apply the slicing
self.data[i][j] = df.iloc[:, all_indices]
# TODO: if !overwrite:
def create_limited_sensor_vector_column(self, overwrite=True):
"""
Create a vector column from the loaded data.
:param overwrite: Overwrite the original data with vector column-based data.
"""
idx = self._create_vector_column_index()
# if overwrite:
for i in range(len(self.data)): # damage(s)
for j in range(len(self.data[i])): # col(s)
# Get the appropriate indices for slicing from idx
indices = idx[j]
# Get the current DataFrame
df = self.data[i][j]
# Keep the 'Time' column and select only specifid 'Real' colmns
# First, we add 1 to all indices to acount for 'Time' being at positiion 0
real_indices = [index + 1 for index in indices]
# Create list with Time column index (0) and the adjustedd Real indices
all_indices = [0] + [real_indices[0]] + [real_indices[-1]]
# Apply the slicing
self.data[i][j] = df.iloc[:, all_indices]
# TODO: if !overwrite:
def export_to_csv(self, output_dir: str, file_prefix: str = "DAMAGE"):
"""
Export the processed data to CSV files in the required folder structure.
:param output_dir: Directory to save the CSV files.
:param file_prefix: Prefix for the output filenames.
"""
for group_idx, group in enumerate(self.data, start=1):
group_folder = os.path.join(output_dir, f"{file_prefix}_{group_idx}")
os.makedirs(group_folder, exist_ok=True)
for test_idx, df in enumerate(group, start=1):
# Ensure columns are named uniquely if duplicated
df = df.copy()
df.columns = ["Time", "Real_0", "Real_1"] # Rename
# Export first Real column
out1 = os.path.join(
group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_01.csv"
)
df[["Time", "Real_0"]].rename(columns={"Real_0": "Real"}).to_csv(
out1, index=False
)
# Export last Real column
out2 = os.path.join(
group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_02.csv"
)
df[["Time", "Real_1"]].rename(columns={"Real_1": "Real"}).to_csv(
out2, index=False
)
def create_damage_files(base_path, output_base, prefix):
# Initialize colorama
init(autoreset=True)
# Generate column labels based on expected duplication in input files
columns = ["Real"] + [
f"Real.{i}" for i in range(1, 30)
] # Explicitly setting column names
sensor_end_map = {
1: "Real.25",
2: "Real.26",
3: "Real.27",
4: "Real.28",
5: "Real.29",
}
# Define the damage scenarios and the corresponding original file indices
damage_scenarios = {
1: range(1, 6), # Damage 1 files from zzzAD1.csv to zzzAD5.csv
2: range(6, 11), # Damage 2 files from zzzAD6.csv to zzzAD10.csv
3: range(11, 16), # Damage 3 files from zzzAD11.csv to zzzAD15.csvs
4: range(16, 21), # Damage 4 files from zzzAD16.csv to zzzAD20.csv
5: range(21, 26), # Damage 5 files from zzzAD21.csv to zzzAD25.csv
6: range(26, 31), # Damage 6 files from zzzAD26.csv to zzzAD30.csv
}
damage_pad = len(str(len(damage_scenarios)))
test_pad = len(str(30))
for damage, files in damage_scenarios.items():
for i, file_index in enumerate(files, start=1):
# Load original data file
file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT")
df = pd.read_csv(
file_path, sep="\t", skiprows=10
) # Read with explicit column names
top_sensor = columns[i - 1]
print(top_sensor, type(top_sensor))
output_file_1 = os.path.join(
output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_01.csv"
)
print(f"Creating {output_file_1} from taking zzz{prefix}D{file_index}.TXT")
print("Taking datetime column on index 0...")
print(f"Taking `{top_sensor}`...")
os.makedirs(os.path.dirname(output_file_1), exist_ok=True)
df[["Time", top_sensor]].to_csv(output_file_1, index=False)
print(Fore.GREEN + "Done")
bottom_sensor = sensor_end_map[i]
output_file_2 = os.path.join(
output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_02.csv"
)
print(f"Creating {output_file_2} from taking zzz{prefix}D{file_index}.TXT")
print("Taking datetime column on index 0...")
print(f"Taking `{bottom_sensor}`...")
os.makedirs(os.path.dirname(output_file_2), exist_ok=True)
df[["Time", bottom_sensor]].to_csv(output_file_2, index=False)
print(Fore.GREEN + "Done")
print("---")
def main():
if len(sys.argv) < 2:
print("Usage: python convert.py <path_to_csv_files>")
sys.exit(1)
base_path = sys.argv[1]
output_base = sys.argv[2]
prefix = sys.argv[3] # Define output directory
# Create output folders if they don't exist
# for i in range(1, 7):
# os.makedirs(os.path.join(output_base, f'DAMAGE_{i}'), exist_ok=True)
create_damage_files(base_path, output_base, prefix)
print(Fore.YELLOW + Style.BRIGHT + "All files have been created successfully.")
if __name__ == "__main__":
main()

View File

@@ -1,25 +1,52 @@
from convert import *
from data_preprocessing import *
from joblib import dump, load
# b = generate_damage_files_index(
# num_damage=6,
# file_index_start=1,
# col=5,
# base_path="D:/thesis/data/dataset_B",
# prefix="zzzBD",
# # undamage_file="zzzBU.TXT"
# )
# Example: Generate tuples with a special group of df0 at the beginning
special_groups_A = [
{'df_name': 'zzzAU.TXT', 'position': 0, 'size': 5} # Add at beginning
]
special_groups_B = [
{'df_name': 'zzzBU.TXT', 'position': 0, 'size': 5} # Add at beginning
]
# Generate the tuples with the special group
a_complement = [(comp)
for n in range(1, 31)
for comp in complement_pairs(n)]
a = generate_df_tuples(special_groups=a_complement, prefix="zzzAD")
# b_complement = [(comp)
# for n in range(1, 31)
# for comp in complement_pairs(n)]
# b = generate_df_tuples(special_groups=b_complement, prefix="zzzBD")
# a = generate_damage_files_index(
# num_damage=6, file_index_start=1, col=5, base_path="D:/thesis/data/dataset_A"
# num_damage=6,
# file_index_start=1,
# col=5,
# base_path="D:/thesis/data/dataset_A",
# prefix="zzzAD",
# # undamage_file="zzzBU.TXT"
# )
b = generate_damage_files_index(
num_damage=6,
file_index_start=1,
col=5,
base_path="D:/thesis/data/dataset_B",
prefix="zzzBD",
)
# data_A = DataProcessor(file_index=a)
# # data.create_vector_column(overwrite=True)
# data_A.create_limited_sensor_vector_column(overwrite=True)
# data_A.export_to_csv("D:/thesis/data/converted/raw")
data_A = DataProcessor(file_index=a, base_path="D:/thesis/data/dataset_A", include_time=True)
# data_A.create_vector_column(overwrite=True)
# # data_A.create_limited_sensor_vector_column(overwrite=True)
data_A.export_to_csv("D:/thesis/data/converted/raw")
data_B = DataProcessor(file_index=b)
# data.create_vector_column(overwrite=True)
data_B.create_limited_sensor_vector_column(overwrite=True)
data_B.export_to_csv("D:/thesis/data/converted/raw_B")
# data_B = DataProcessor(file_index=b, base_path="D:/thesis/data/dataset_B", include_time=True)
# data_B.create_vector_column(overwrite=True)
# # data_B.create_limited_sensor_vector_column(overwrite=True)
# data_B.export_to_csv("D:/thesis/data/converted/raw_B")
# a = load("D:/cache.joblib")
# breakpoint()
# breakpoint()

BIN
latex/figures/A4 - 4.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 188 KiB