From 58a316d9c80e5eff37865292b4b05686c3651503 Mon Sep 17 00:00:00 2001 From: nuluh Date: Fri, 21 Mar 2025 15:58:50 +0700 Subject: [PATCH 1/2] feat(data): implement damage files index generation and data processing Closes #38 --- data/QUGS/convert.py | 274 ++++++++++++++++++++++++++++++++++++++++--- data/QUGS/test.py | 7 ++ 2 files changed, 267 insertions(+), 14 deletions(-) create mode 100644 data/QUGS/test.py diff --git a/data/QUGS/convert.py b/data/QUGS/convert.py index 1ec5b95..c850ce8 100644 --- a/data/QUGS/convert.py +++ b/data/QUGS/convert.py @@ -1,25 +1,263 @@ import pandas as pd import os +import re import sys from colorama import Fore, Style, init +from typing import TypedDict, Dict, List +from joblib import load +from pprint import pprint + +# class DamageFilesIndex(TypedDict): +# damage_index: int +# files: list[int] +DamageFilesIndex = Dict[int, List[str]] + + +def generate_damage_files_index(**kwargs) -> DamageFilesIndex: + prefix = kwargs.get("prefix", "zzzAD") + extension = kwargs.get("extension", ".TXT") + num_damage = kwargs.get("num_damage") + file_index_start = kwargs.get("file_index_start") + col = kwargs.get("col") + base_path = kwargs.get("base_path") + + damage_scenarios = {} + a = file_index_start + b = col + 1 + for i in range(1, num_damage + 1): + damage_scenarios[i] = range(a, b) + a += col + b += col + + # return damage_scenarios + + x = {} + for damage, files in damage_scenarios.items(): + x[damage] = [] # Initialize each key with an empty list + for i, file_index in enumerate(files, start=1): + if base_path: + x[damage].append( + os.path.normpath( + os.path.join(base_path, f"{prefix}{file_index}{extension}") + ) + ) + # if not os.path.exists(file_path): + # print(Fore.RED + f"File {file_path} does not exist.") + # continue + else: + x[damage].append(f"{prefix}{file_index}{extension}") + return x + + # file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT") + # df = pd.read_csv( file_path, sep="\t", skiprows=10) # Read with explicit column names + + +class DataProcessor: + def __init__(self, file_index: Dict[int, List[str]], cache_path: str = None): + self.file_index = file_index + if cache_path: + self.data = load(cache_path) + else: + self.data = self._load_all_data() + + def _extract_column_names(self, file_path: str) -> List[str]: + """ + Extracts column names from the header of the given file. + Assumes the 6th line contains column names. + + :param file_path: Path to the data file. + :return: List of column names. + """ + with open(file_path, "r") as f: + header_lines = [next(f) for _ in range(12)] + + # Extract column names from the 6th line + channel_line = header_lines[10].strip() + tokens = re.findall(r'"([^"]+)"', channel_line) + if not channel_line.startswith('"'): + first_token = channel_line.split()[0] + tokens = [first_token] + tokens + + return tokens # Prepend 'Time' column if applicable + + def _load_dataframe(self, file_path: str) -> pd.DataFrame: + """ + Loads a single data file into a pandas DataFrame. + + :param file_path: Path to the data file. + :return: DataFrame containing the numerical data. + """ + col_names = self._extract_column_names(file_path) + df = pd.read_csv( + file_path, delim_whitespace=True, skiprows=11, header=None, memory_map=True + ) + df.columns = col_names + return df + + def _load_all_data(self) -> List[List[pd.DataFrame]]: + """ + Loads all data files based on the grouping dictionary and returns a nested list. + + :return: A nested list of DataFrames where the outer index corresponds to group_idx - 1. + """ + data = [] + # Find the maximum group index to determine the list size + max_group_idx = max(self.file_index.keys()) if self.file_index else 0 + + # Initialize empty lists + for _ in range(max_group_idx): + data.append([]) + + # Fill the list with data + for group_idx, file_list in self.file_index.items(): + # Adjust index to be 0-based + list_idx = group_idx - 1 + data[list_idx] = [self._load_dataframe(file) for file in file_list] + + return data + + def get_group_data(self, group_idx: int) -> List[pd.DataFrame]: + """ + Returns the list of DataFrames for the given group index. + + :param group_idx: Index of the group. + :return: List of DataFrames. + """ + return self.data.get([group_idx, []]) + + def get_column_names(self, group_idx: int, file_idx: int = 0) -> List[str]: + """ + Returns the column names for the given group and file indices. + + :param group_idx: Index of the group. + :param file_idx: Index of the file in the group. + :return: List of column names. + """ + if group_idx in self.data and len(self.data[group_idx]) > file_idx: + return self.data[group_idx][file_idx].columns.tolist() + return [] + + def get_data_info(self): + """ + Print information about the loaded data structure. + Adapted for when self.data is a List instead of a Dictionary. + """ + if isinstance(self.data, list): + # For each sublist in self.data, get the type names of all elements + pprint( + [ + ( + [type(item).__name__ for item in sublist] + if isinstance(sublist, list) + else type(sublist).__name__ + ) + for sublist in self.data + ] + ) + else: + pprint( + { + key: [type(df).__name__ for df in value] + for key, value in self.data.items() + } + if isinstance(self.data, dict) + else type(self.data).__name__ + ) + + def _create_vector_column_index(self): + vector_col_idx = [] + y = 0 + for data_group in self.data: # len(data_group[i]) = 5 + for j in data_group: # len(j[i]) = + c = [] # column vector c_{j} + x = 0 + for _ in range(6): # TODO: range(6) should be dynamic and parameterized + c.append(x + y) + x += 5 + vector_col_idx.append(c) + y += 1 + return vector_col_idx + + def create_vector_column(self, overwrite=True): + """ + Create a vector column from the loaded data. + + :param overwrite: Overwrite the original data with vector column-based data. + """ + idx = self._create_vector_column_index() + # if overwrite: + for i in range(len(self.data)): + for j in range(len(self.data[i])): + # Get the appropriate indices for slicing from idx + indices = idx[j] + + # Get the current DataFrame + df = self.data[i][j] + + # Keep the 'Time' column and select only specified 'Real' columns + # First, we add 1 to all indices to account for 'Time' being at position 0 + real_indices = [index + 1 for index in indices] + + # Create list with Time column index (0) and the adjusted Real indices + all_indices = [0] + real_indices + + # Apply the slicing + self.data[i][j] = df.iloc[:, all_indices] + # TODO: if !overwrite: + + def create_limited_sensor_vector_column(self, overwrite=True): + """ + Create a vector column from the loaded data. + + :param overwrite: Overwrite the original data with vector column-based data. + """ + idx = self._create_vector_column_index() + # if overwrite: + for i in range(len(self.data)): + for j in range(len(self.data[i])): + # Get the appropriate indices for slicing from idx + indices = idx[j] + + # Get the current DataFrame + df = self.data[i][j] + + # Keep the 'Time' column and select only specified 'Real' columns + # First, we add 1 to all indices to account for 'Time' being at position 0 + real_indices = [index + 1 for index in indices] + + # Create list with Time column index (0) and the adjusted Real indices + all_indices = [0] + [real_indices[0]] + [real_indices[-1]] + + # Apply the slicing + self.data[i][j] = df.iloc[:, all_indices] + # TODO: if !overwrite: + def create_damage_files(base_path, output_base, prefix): # Initialize colorama init(autoreset=True) - - # Generate column labels based on expected duplication in input files - columns = ['Real'] + [f'Real.{i}' for i in range(1, 30)] # Explicitly setting column names - sensor_end_map = {1: 'Real.25', 2: 'Real.26', 3: 'Real.27', 4: 'Real.28', 5: 'Real.29'} + # Generate column labels based on expected duplication in input files + columns = ["Real"] + [ + f"Real.{i}" for i in range(1, 30) + ] # Explicitly setting column names + + sensor_end_map = { + 1: "Real.25", + 2: "Real.26", + 3: "Real.27", + 4: "Real.28", + 5: "Real.29", + } # Define the damage scenarios and the corresponding original file indices damage_scenarios = { 1: range(1, 6), # Damage 1 files from zzzAD1.csv to zzzAD5.csv 2: range(6, 11), # Damage 2 files from zzzAD6.csv to zzzAD10.csv - 3: range(11, 16), # Damage 3 files from zzzAD11.csv to zzzAD15.csvs - 4: range(16, 21), # Damage 4 files from zzzAD16.csv to zzzAD20.csv + 3: range(11, 16), # Damage 3 files from zzzAD11.csv to zzzAD15.csvs + 4: range(16, 21), # Damage 4 files from zzzAD16.csv to zzzAD20.csv 5: range(21, 26), # Damage 5 files from zzzAD21.csv to zzzAD25.csv - 6: range(26, 31) # Damage 6 files from zzzAD26.csv to zzzAD30.csv + 6: range(26, 31), # Damage 6 files from zzzAD26.csv to zzzAD30.csv } damage_pad = len(str(len(damage_scenarios))) test_pad = len(str(30)) @@ -27,29 +265,36 @@ def create_damage_files(base_path, output_base, prefix): for damage, files in damage_scenarios.items(): for i, file_index in enumerate(files, start=1): # Load original data file - file_path = os.path.join(base_path, f'zzz{prefix}D{file_index}.TXT') - df = pd.read_csv(file_path, sep='\t', skiprows=10) # Read with explicit column names + file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT") + df = pd.read_csv( + file_path, sep="\t", skiprows=10 + ) # Read with explicit column names - top_sensor = columns[i-1] + top_sensor = columns[i - 1] print(top_sensor, type(top_sensor)) - output_file_1 = os.path.join(output_base, f'DAMAGE_{damage}', f'DAMAGE{damage}_TEST{i}_01.csv') + output_file_1 = os.path.join( + output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_01.csv" + ) print(f"Creating {output_file_1} from taking zzz{prefix}D{file_index}.TXT") print("Taking datetime column on index 0...") print(f"Taking `{top_sensor}`...") os.makedirs(os.path.dirname(output_file_1), exist_ok=True) - df[['Time', top_sensor]].to_csv(output_file_1, index=False) + df[["Time", top_sensor]].to_csv(output_file_1, index=False) print(Fore.GREEN + "Done") bottom_sensor = sensor_end_map[i] - output_file_2 = os.path.join(output_base, f'DAMAGE_{damage}', f'DAMAGE{damage}_TEST{i}_02.csv') + output_file_2 = os.path.join( + output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_02.csv" + ) print(f"Creating {output_file_2} from taking zzz{prefix}D{file_index}.TXT") print("Taking datetime column on index 0...") print(f"Taking `{bottom_sensor}`...") os.makedirs(os.path.dirname(output_file_2), exist_ok=True) - df[['Time', bottom_sensor]].to_csv(output_file_2, index=False) + df[["Time", bottom_sensor]].to_csv(output_file_2, index=False) print(Fore.GREEN + "Done") print("---") + def main(): if len(sys.argv) < 2: print("Usage: python convert.py ") @@ -66,5 +311,6 @@ def main(): create_damage_files(base_path, output_base, prefix) print(Fore.YELLOW + Style.BRIGHT + "All files have been created successfully.") + if __name__ == "__main__": main() diff --git a/data/QUGS/test.py b/data/QUGS/test.py new file mode 100644 index 0000000..2345fed --- /dev/null +++ b/data/QUGS/test.py @@ -0,0 +1,7 @@ +from convert import * +from joblib import dump + +a = generate_damage_files_index( + num_damage=6, file_index_start=1, col=5, base_path="D:/thesis/data/dataset_A" +) +dump(DataProcessor(file_index=a), "D:/cache.joblib") From ff64f3a3ab231addb15e18d88353192e2d733415 Mon Sep 17 00:00:00 2001 From: nuluh Date: Sat, 22 Mar 2025 19:48:50 +0700 Subject: [PATCH 2/2] refactor(data): update type annotations for damage files index and related classes. Need better implementation --- data/QUGS/convert.py | 44 ++++++++++++++++++++++++++++---------------- data/QUGS/test.py | 11 ++++++----- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/data/QUGS/convert.py b/data/QUGS/convert.py index c850ce8..85aa0ab 100644 --- a/data/QUGS/convert.py +++ b/data/QUGS/convert.py @@ -7,19 +7,31 @@ from typing import TypedDict, Dict, List from joblib import load from pprint import pprint -# class DamageFilesIndex(TypedDict): +# class DamageFilesIndices(TypedDict): # damage_index: int # files: list[int] -DamageFilesIndex = Dict[int, List[str]] +OriginalSingleDamageScenarioFilePath = str +DamageScenarioGroupIndex = int +OriginalSingleDamageScenario = pd.DataFrame +SensorIndex = int +VectorColumnIndex = List[SensorIndex] +VectorColumnIndices = List[VectorColumnIndex] +DamageScenarioGroup = List[OriginalSingleDamageScenario] +GroupDataset = List[DamageScenarioGroup] -def generate_damage_files_index(**kwargs) -> DamageFilesIndex: - prefix = kwargs.get("prefix", "zzzAD") - extension = kwargs.get("extension", ".TXT") - num_damage = kwargs.get("num_damage") - file_index_start = kwargs.get("file_index_start") - col = kwargs.get("col") - base_path = kwargs.get("base_path") +class DamageFilesIndices(TypedDict): + damage_index: int + files: List[str] + + +def generate_damage_files_index(**kwargs) -> DamageFilesIndices: + prefix: str = kwargs.get("prefix", "zzzAD") + extension: str = kwargs.get("extension", ".TXT") + num_damage: int = kwargs.get("num_damage") + file_index_start: int = kwargs.get("file_index_start") + col: int = kwargs.get("col") + base_path: str = kwargs.get("base_path") damage_scenarios = {} a = file_index_start @@ -53,7 +65,7 @@ def generate_damage_files_index(**kwargs) -> DamageFilesIndex: class DataProcessor: - def __init__(self, file_index: Dict[int, List[str]], cache_path: str = None): + def __init__(self, file_index: DamageFilesIndices, cache_path: str = None): self.file_index = file_index if cache_path: self.data = load(cache_path) @@ -80,7 +92,7 @@ class DataProcessor: return tokens # Prepend 'Time' column if applicable - def _load_dataframe(self, file_path: str) -> pd.DataFrame: + def _load_dataframe(self, file_path: str) -> OriginalSingleDamageScenario: """ Loads a single data file into a pandas DataFrame. @@ -94,7 +106,7 @@ class DataProcessor: df.columns = col_names return df - def _load_all_data(self) -> List[List[pd.DataFrame]]: + def _load_all_data(self) -> GroupDataset: """ Loads all data files based on the grouping dictionary and returns a nested list. @@ -164,12 +176,12 @@ class DataProcessor: else type(self.data).__name__ ) - def _create_vector_column_index(self): - vector_col_idx = [] + def _create_vector_column_index(self) -> VectorColumnIndices: + vector_col_idx: VectorColumnIndices = [] y = 0 for data_group in self.data: # len(data_group[i]) = 5 for j in data_group: # len(j[i]) = - c = [] # column vector c_{j} + c: VectorColumnIndex = [] # column vector c_{j} x = 0 for _ in range(6): # TODO: range(6) should be dynamic and parameterized c.append(x + y) @@ -178,7 +190,7 @@ class DataProcessor: y += 1 return vector_col_idx - def create_vector_column(self, overwrite=True): + def create_vector_column(self, overwrite=True) -> List[List[List[pd.DataFrame]]]: """ Create a vector column from the loaded data. diff --git a/data/QUGS/test.py b/data/QUGS/test.py index 2345fed..95f2d8c 100644 --- a/data/QUGS/test.py +++ b/data/QUGS/test.py @@ -1,7 +1,8 @@ from convert import * -from joblib import dump +from joblib import dump, load -a = generate_damage_files_index( - num_damage=6, file_index_start=1, col=5, base_path="D:/thesis/data/dataset_A" -) -dump(DataProcessor(file_index=a), "D:/cache.joblib") +# a = generate_damage_files_index( +# num_damage=6, file_index_start=1, col=5, base_path="D:/thesis/data/dataset_A" +# ) +# dump(DataProcessor(file_index=a), "D:/cache.joblib") +a = load("D:/cache.joblib")