import pandas as pd import os import re import sys import numpy as np from colorama import Fore, Style, init from typing import TypedDict, Dict, List from joblib import load from pprint import pprint # class DamageFilesIndices(TypedDict): # damage_index: int # files: list[int] OriginalSingleDamageScenarioFilePath = str DamageScenarioGroupIndex = int OriginalSingleDamageScenario = pd.DataFrame SensorIndex = int VectorColumnIndex = List[SensorIndex] VectorColumnIndices = List[VectorColumnIndex] DamageScenarioGroup = List[OriginalSingleDamageScenario] GroupDataset = List[DamageScenarioGroup] class DamageFilesIndices(TypedDict): damage_index: int files: List[str] def generate_df_tuples(total_dfs=30, group_size=5, prefix="zzzAD", ext="TXT", first_col_start=1, last_col_offset=25, special_groups=None, group=True): """ Generate a structured list of tuples containing DataFrame references and column indices. Parameters: ----------- total_dfs : int, default 30 Total number of DataFrames to include in the tuples group_size : int, default 5 Number of DataFrames in each group (determines the pattern repeat) prefix : str, default "df" Prefix for DataFrame variable names first_col_start : int, default 1 Starting value for the first column index (1-indexed) last_col_offset : int, default 25 Offset to add to first_col_start to get the last column index special_groups : list of dict, optional List of special groups to insert, each dict should contain: - 'df_name': The DataFrame name to use for all tuples in this group - 'position': Where to insert this group (0 for beginning) - 'size': Size of this group (default: same as group_size) Returns: -------- list List of tuples, where each tuple contains (df_name, [first_col, last_col]) """ tuples = [] # Add regular groups for i in range(1, total_dfs + 1): # for _ in range(group_size): # group tuple # temporary list to hold tuples for this group # list = [] # Calculate the position within the group (1 to group_size) position_in_group = ((i - 1) % group_size) + 1 # Calculate column indices based on position in group first_col = first_col_start + position_in_group - 1 last_col = first_col + last_col_offset # Create the tuple with DataFrame reference and column indices df_name = f"{prefix}{i}.{ext}" tuples.append((df_name, [first_col, last_col])) if group: # Group tuples into sublists of group_size grouped_tuples = [] for i in range(0, len(tuples), group_size): grouped_tuples.append(tuples[i:i + group_size]) tuples = grouped_tuples # tuples.append(list) # Add special groups at specified positions (other than beginning) if special_groups: for group in special_groups: position = group.get('position', 0) # default value is 0 if not specified df_name = group['df_name'] size = group.get('size', group_size) # Create the special group tuples special_tuples = [] for i in range(size): first_col = first_col_start + i last_col = first_col + last_col_offset special_tuples.append((df_name, [first_col, last_col])) tuples.insert(position, special_tuples) return tuples # file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT") # df = pd.read_csv(file_path, sep="\t", skiprows=10) # Read with explicit column names class DataProcessor: def __init__(self, file_index, cache_path: str = None, base_path: str = None): self.file_index = file_index self.base_path = base_path if cache_path: self.data = load(cache_path) else: self.data = self.load_data() def load_data(self): for idxs, group in enumerate(self.file_index): for idx, tuple in enumerate(group): file_path = os.path.join(self.base_path, tuple[0]) # ('zzzAD1.TXT') col_indices = tuple[1] # [1, 26] try: # Read the CSV file df = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True) self.file_index[idxs][idx] = df.iloc[:, col_indices].copy() # Extract the specified columns print(f"Processed {file_path}, extracted columns: {col_indices}") except Exception as e: print(f"Error processing {file_path}: {str(e)}") def _load_dataframe(self, file_path: str) -> OriginalSingleDamageScenario: """ Loads a single data file into a pandas DataFrame. :param file_path: Path to the data file. :return: DataFrame containing the numerical data. """ df = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True, nrows=1) return df def _load_all_data(self) -> GroupDataset: """ Loads all data files based on the grouping dictionary and returns a nested list. :return: A nested list of DataFrames where the outer index corresponds to group_idx - 1. """ data = [] # Find the maximum group index to determine the list size max_group_idx = len(self.file_index) if self.file_index else 0 # Handle case when file_index is empty if max_group_idx == 0: raise ValueError("No file index provided; file_index is empty.") # Initialize empty lists for _ in range(max_group_idx): data.append([]) # Fill the list with data for group_idx, file_list in self.file_index.items(): group_idx -= 1 # adjust due to undamage file data[group_idx] = [self._load_dataframe(file) for file in file_list] return data def get_group_data(self, group_idx: int) -> List[pd.DataFrame]: """ Returns the list of DataFrames for the given group index. :param group_idx: Index of the group. :return: List of DataFrames. """ return self.data.get([group_idx, []]) def get_column_names(self, group_idx: int, file_idx: int = 0) -> List[str]: """ Returns the column names for the given group and file indices. :param group_idx: Index of the group. :param file_idx: Index of the file in the group. :return: List of column names. """ if group_idx in self.data and len(self.data[group_idx]) > file_idx: return self.data[group_idx][file_idx].columns.tolist() return [] def get_data_info(self): """ Print information about the loaded data structure. Adapted for when self.data is a List instead of a Dictionary. """ if isinstance(self.data, list): # For each sublist in self.data, get the type names of all elements pprint( [ ( [type(item).__name__ for item in sublist] if isinstance(sublist, list) else type(sublist).__name__ ) for sublist in self.data ] ) else: pprint( { key: [type(df).__name__ for df in value] for key, value in self.data.items() } if isinstance(self.data, dict) else type(self.data).__name__ ) def _create_vector_column_index(self) -> VectorColumnIndices: vector_col_idx: VectorColumnIndices = [] y = 0 for data_group in self.data: # len(data_group[i]) = 5 for j in data_group: # len(j[i]) = c: VectorColumnIndex = [] x = 0 for _ in range(6): # TODO: range(6) should be dynamic and parameterized c.append(x + y) x += 5 vector_col_idx.append(c) y += 1 return vector_col_idx # TODO: refactor this so that it returns just from first data_group without using for loops through the self.data that seems unnecessary def create_vector_column(self, overwrite=True) -> List[List[List[pd.DataFrame]]]: """ Create a vector column from the loaded data. :param overwrite: Overwrite the original data with vector column-based data. """ idxs = self._create_vector_column_index() for i, group in enumerate(self.data): # add 1 to all indices to account for 'Time' being at position 0 for j, df in enumerate(group): idx = [_ + 1 for _ in idxs[j]] # slice out the desired columns, copy into a fresh DataFrame, # then overwrite self.data[i][j] with it self.data[i][j] = df.iloc[:, idx].copy() # TODO: if !overwrite: def create_limited_sensor_vector_column(self, overwrite=True): """ Create a vector column from the loaded data. :param overwrite: Overwrite the original data with vector column-based data. """ idx = self._create_vector_column_index() # if overwrite: for i in range(len(self.data)): # damage(s) for j in range(len(self.data[i])): # col(s) # Get the appropriate indices for slicing from idx indices = idx[j] # Get the current DataFrame df = self.data[i][j] # Keep the 'Time' column and select only specifid 'Real' colmns # First, we add 1 to all indices to acount for 'Time' being at positiion 0 real_indices = [index + 1 for index in indices] # Create list with Time column index (0) and the adjustedd Real indices all_indices = [0] + [real_indices[0]] + [real_indices[-1]] # Apply the slicing self.data[i][j] = df.iloc[:, all_indices] # TODO: if !overwrite: def export_to_csv(self, output_dir: str, file_prefix: str = "DAMAGE"): """ Export the processed data to CSV files in the required folder structure. :param output_dir: Directory to save the CSV files. :param file_prefix: Prefix for the output filenames. """ for group_idx, group in enumerate(self.data, start=0): group_folder = os.path.join(output_dir, f"{file_prefix}_{group_idx}") os.makedirs(group_folder, exist_ok=True) for test_idx, df in enumerate(group, start=1): # Ensure columns are named uniquely if duplicated df = df.copy() df.columns = ["Time", "Real_0", "Real_1"] # Rename # Export first Real column out1 = os.path.join( group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_01.csv" ) df[["Time", "Real_0"]].rename(columns={"Real_0": "Real"}).to_csv( out1, index=False ) # Export last Real column out2 = os.path.join( group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_02.csv" ) df[["Time", "Real_1"]].rename(columns={"Real_1": "Real"}).to_csv( out2, index=False ) # def create_damage_files(base_path, output_base, prefix): # # Initialize colorama # init(autoreset=True) # # Generate column labels based on expected duplication in input files # columns = ["Real"] + [ # f"Real.{i}" for i in range(1, 30) # ] # Explicitly setting column names # sensor_end_map = { # 1: "Real.25", # 2: "Real.26", # 3: "Real.27", # 4: "Real.28", # 5: "Real.29", # } # # Define the damage scenarios and the corresponding original file indices # damage_scenarios = { # 1: range(1, 6), # Damage 1 files from zzzAD1.csv to zzzAD5.csv # 2: range(6, 11), # Damage 2 files from zzzAD6.csv to zzzAD10.csv # 3: range(11, 16), # Damage 3 files from zzzAD11.csv to zzzAD15.csvs # 4: range(16, 21), # Damage 4 files from zzzAD16.csv to zzzAD20.csv # 5: range(21, 26), # Damage 5 files from zzzAD21.csv to zzzAD25.csv # 6: range(26, 31), # Damage 6 files from zzzAD26.csv to zzzAD30.csv # } # damage_pad = len(str(len(damage_scenarios))) # test_pad = len(str(30)) # for damage, files in damage_scenarios.items(): # for i, file_index in enumerate(files, start=1): # # Load original data file # file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT") # df = pd.read_csv( # file_path, sep="\t", skiprows=10 # ) # Read with explicit column names # top_sensor = columns[i - 1] # print(top_sensor, type(top_sensor)) # output_file_1 = os.path.join( # output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_01.csv" # ) # print(f"Creating {output_file_1} from taking zzz{prefix}D{file_index}.TXT") # print("Taking datetime column on index 0...") # print(f"Taking `{top_sensor}`...") # os.makedirs(os.path.dirname(output_file_1), exist_ok=True) # df[["Time", top_sensor]].to_csv(output_file_1, index=False) # print(Fore.GREEN + "Done") # bottom_sensor = sensor_end_map[i] # output_file_2 = os.path.join( # output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_02.csv" # ) # print(f"Creating {output_file_2} from taking zzz{prefix}D{file_index}.TXT") # print("Taking datetime column on index 0...") # print(f"Taking `{bottom_sensor}`...") # os.makedirs(os.path.dirname(output_file_2), exist_ok=True) # df[["Time", bottom_sensor]].to_csv(output_file_2, index=False) # print(Fore.GREEN + "Done") # print("---") def main(): if len(sys.argv) < 2: print("Usage: python convert.py ") sys.exit(1) base_path = sys.argv[1] output_base = sys.argv[2] prefix = sys.argv[3] # Define output directory # Create output folders if they don't exist # for i in range(1, 7): # os.makedirs(os.path.join(output_base, f'DAMAGE_{i}'), exist_ok=True) create_damage_files(base_path, output_base, prefix) print(Fore.YELLOW + Style.BRIGHT + "All files have been created successfully.") if __name__ == "__main__": main()