import pandas as pd import os import re import sys from colorama import Fore, Style, init from typing import TypedDict, Dict, List from joblib import load from pprint import pprint # class DamageFilesIndex(TypedDict): # damage_index: int # files: list[int] DamageFilesIndex = Dict[int, List[str]] def generate_damage_files_index(**kwargs) -> DamageFilesIndex: prefix = kwargs.get("prefix", "zzzAD") extension = kwargs.get("extension", ".TXT") num_damage = kwargs.get("num_damage") file_index_start = kwargs.get("file_index_start") col = kwargs.get("col") base_path = kwargs.get("base_path") damage_scenarios = {} a = file_index_start b = col + 1 for i in range(1, num_damage + 1): damage_scenarios[i] = range(a, b) a += col b += col # return damage_scenarios x = {} for damage, files in damage_scenarios.items(): x[damage] = [] # Initialize each key with an empty list for i, file_index in enumerate(files, start=1): if base_path: x[damage].append( os.path.normpath( os.path.join(base_path, f"{prefix}{file_index}{extension}") ) ) # if not os.path.exists(file_path): # print(Fore.RED + f"File {file_path} does not exist.") # continue else: x[damage].append(f"{prefix}{file_index}{extension}") return x # file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT") # df = pd.read_csv( file_path, sep="\t", skiprows=10) # Read with explicit column names class DataProcessor: def __init__(self, file_index: Dict[int, List[str]], cache_path: str = None): self.file_index = file_index if cache_path: self.data = load(cache_path) else: self.data = self._load_all_data() def _extract_column_names(self, file_path: str) -> List[str]: """ Extracts column names from the header of the given file. Assumes the 6th line contains column names. :param file_path: Path to the data file. :return: List of column names. """ with open(file_path, "r") as f: header_lines = [next(f) for _ in range(12)] # Extract column names from the 6th line channel_line = header_lines[10].strip() tokens = re.findall(r'"([^"]+)"', channel_line) if not channel_line.startswith('"'): first_token = channel_line.split()[0] tokens = [first_token] + tokens return tokens # Prepend 'Time' column if applicable def _load_dataframe(self, file_path: str) -> pd.DataFrame: """ Loads a single data file into a pandas DataFrame. :param file_path: Path to the data file. :return: DataFrame containing the numerical data. """ col_names = self._extract_column_names(file_path) df = pd.read_csv( file_path, delim_whitespace=True, skiprows=11, header=None, memory_map=True ) df.columns = col_names return df def _load_all_data(self) -> List[List[pd.DataFrame]]: """ Loads all data files based on the grouping dictionary and returns a nested list. :return: A nested list of DataFrames where the outer index corresponds to group_idx - 1. """ data = [] # Find the maximum group index to determine the list size max_group_idx = max(self.file_index.keys()) if self.file_index else 0 # Initialize empty lists for _ in range(max_group_idx): data.append([]) # Fill the list with data for group_idx, file_list in self.file_index.items(): # Adjust index to be 0-based list_idx = group_idx - 1 data[list_idx] = [self._load_dataframe(file) for file in file_list] return data def get_group_data(self, group_idx: int) -> List[pd.DataFrame]: """ Returns the list of DataFrames for the given group index. :param group_idx: Index of the group. :return: List of DataFrames. """ return self.data.get([group_idx, []]) def get_column_names(self, group_idx: int, file_idx: int = 0) -> List[str]: """ Returns the column names for the given group and file indices. :param group_idx: Index of the group. :param file_idx: Index of the file in the group. :return: List of column names. """ if group_idx in self.data and len(self.data[group_idx]) > file_idx: return self.data[group_idx][file_idx].columns.tolist() return [] def get_data_info(self): """ Print information about the loaded data structure. Adapted for when self.data is a List instead of a Dictionary. """ if isinstance(self.data, list): # For each sublist in self.data, get the type names of all elements pprint( [ ( [type(item).__name__ for item in sublist] if isinstance(sublist, list) else type(sublist).__name__ ) for sublist in self.data ] ) else: pprint( { key: [type(df).__name__ for df in value] for key, value in self.data.items() } if isinstance(self.data, dict) else type(self.data).__name__ ) def _create_vector_column_index(self): vector_col_idx = [] y = 0 for data_group in self.data: # len(data_group[i]) = 5 for j in data_group: # len(j[i]) = c = [] # column vector c_{j} x = 0 for _ in range(6): # TODO: range(6) should be dynamic and parameterized c.append(x + y) x += 5 vector_col_idx.append(c) y += 1 return vector_col_idx def create_vector_column(self, overwrite=True): """ Create a vector column from the loaded data. :param overwrite: Overwrite the original data with vector column-based data. """ idx = self._create_vector_column_index() # if overwrite: for i in range(len(self.data)): for j in range(len(self.data[i])): # Get the appropriate indices for slicing from idx indices = idx[j] # Get the current DataFrame df = self.data[i][j] # Keep the 'Time' column and select only specified 'Real' columns # First, we add 1 to all indices to account for 'Time' being at position 0 real_indices = [index + 1 for index in indices] # Create list with Time column index (0) and the adjusted Real indices all_indices = [0] + real_indices # Apply the slicing self.data[i][j] = df.iloc[:, all_indices] # TODO: if !overwrite: def create_limited_sensor_vector_column(self, overwrite=True): """ Create a vector column from the loaded data. :param overwrite: Overwrite the original data with vector column-based data. """ idx = self._create_vector_column_index() # if overwrite: for i in range(len(self.data)): for j in range(len(self.data[i])): # Get the appropriate indices for slicing from idx indices = idx[j] # Get the current DataFrame df = self.data[i][j] # Keep the 'Time' column and select only specified 'Real' columns # First, we add 1 to all indices to account for 'Time' being at position 0 real_indices = [index + 1 for index in indices] # Create list with Time column index (0) and the adjusted Real indices all_indices = [0] + [real_indices[0]] + [real_indices[-1]] # Apply the slicing self.data[i][j] = df.iloc[:, all_indices] # TODO: if !overwrite: def create_damage_files(base_path, output_base, prefix): # Initialize colorama init(autoreset=True) # Generate column labels based on expected duplication in input files columns = ["Real"] + [ f"Real.{i}" for i in range(1, 30) ] # Explicitly setting column names sensor_end_map = { 1: "Real.25", 2: "Real.26", 3: "Real.27", 4: "Real.28", 5: "Real.29", } # Define the damage scenarios and the corresponding original file indices damage_scenarios = { 1: range(1, 6), # Damage 1 files from zzzAD1.csv to zzzAD5.csv 2: range(6, 11), # Damage 2 files from zzzAD6.csv to zzzAD10.csv 3: range(11, 16), # Damage 3 files from zzzAD11.csv to zzzAD15.csvs 4: range(16, 21), # Damage 4 files from zzzAD16.csv to zzzAD20.csv 5: range(21, 26), # Damage 5 files from zzzAD21.csv to zzzAD25.csv 6: range(26, 31), # Damage 6 files from zzzAD26.csv to zzzAD30.csv } damage_pad = len(str(len(damage_scenarios))) test_pad = len(str(30)) for damage, files in damage_scenarios.items(): for i, file_index in enumerate(files, start=1): # Load original data file file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT") df = pd.read_csv( file_path, sep="\t", skiprows=10 ) # Read with explicit column names top_sensor = columns[i - 1] print(top_sensor, type(top_sensor)) output_file_1 = os.path.join( output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_01.csv" ) print(f"Creating {output_file_1} from taking zzz{prefix}D{file_index}.TXT") print("Taking datetime column on index 0...") print(f"Taking `{top_sensor}`...") os.makedirs(os.path.dirname(output_file_1), exist_ok=True) df[["Time", top_sensor]].to_csv(output_file_1, index=False) print(Fore.GREEN + "Done") bottom_sensor = sensor_end_map[i] output_file_2 = os.path.join( output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_02.csv" ) print(f"Creating {output_file_2} from taking zzz{prefix}D{file_index}.TXT") print("Taking datetime column on index 0...") print(f"Taking `{bottom_sensor}`...") os.makedirs(os.path.dirname(output_file_2), exist_ok=True) df[["Time", bottom_sensor]].to_csv(output_file_2, index=False) print(Fore.GREEN + "Done") print("---") def main(): if len(sys.argv) < 2: print("Usage: python convert.py ") sys.exit(1) base_path = sys.argv[1] output_base = sys.argv[2] prefix = sys.argv[3] # Define output directory # Create output folders if they don't exist # for i in range(1, 7): # os.makedirs(os.path.join(output_base, f'DAMAGE_{i}'), exist_ok=True) create_damage_files(base_path, output_base, prefix) print(Fore.YELLOW + Style.BRIGHT + "All files have been created successfully.") if __name__ == "__main__": main()