361 lines
14 KiB
Python
361 lines
14 KiB
Python
import pandas as pd
|
|
import os
|
|
import re
|
|
import sys
|
|
import numpy as np
|
|
from colorama import Fore, Style, init
|
|
from typing import TypedDict, Dict, List
|
|
from joblib import load
|
|
from pprint import pprint
|
|
|
|
# class DamageFilesIndices(TypedDict):
|
|
# damage_index: int
|
|
# files: list[int]
|
|
OriginalSingleDamageScenarioFilePath = str
|
|
DamageScenarioGroupIndex = int
|
|
OriginalSingleDamageScenario = pd.DataFrame
|
|
SensorIndex = int
|
|
VectorColumnIndex = List[SensorIndex]
|
|
VectorColumnIndices = List[VectorColumnIndex]
|
|
DamageScenarioGroup = List[OriginalSingleDamageScenario]
|
|
GroupDataset = List[DamageScenarioGroup]
|
|
|
|
|
|
class DamageFilesIndices(TypedDict):
|
|
damage_index: int
|
|
files: List[str]
|
|
|
|
|
|
def generate_df_tuples(total_dfs=30, group_size=5, prefix="zzzAD", extension="TXT", first_col_start=1, last_col_offset=25,
|
|
special_groups=None, group=True):
|
|
"""
|
|
Generate a structured list of tuples containing DataFrame references and column indices.
|
|
|
|
Parameters:
|
|
-----------
|
|
total_dfs : int, default 30
|
|
Total number of DataFrames to include in the tuples
|
|
group_size : int, default 5
|
|
Number of DataFrames in each group (determines the pattern repeat)
|
|
prefix : str, default "df"
|
|
Prefix for DataFrame variable names
|
|
first_col_start : int, default 1
|
|
Starting value for the first column index (1-indexed)
|
|
last_col_offset : int, default 25
|
|
Offset to add to first_col_start to get the last column index
|
|
special_groups : list of dict, optional
|
|
List of special groups to insert, each dict should contain:
|
|
- 'df_name': The DataFrame name to use for all tuples in this group
|
|
- 'position': Where to insert this group (0 for beginning)
|
|
- 'size': Size of this group (default: same as group_size)
|
|
|
|
Returns:
|
|
--------
|
|
list
|
|
List of tuples, where each tuple contains (df_name, [first_col, last_col])
|
|
"""
|
|
result = []
|
|
if group:
|
|
# Group tuples into sublists of group_size
|
|
for g in range(6): # TODO: shouldnt be hardcoded
|
|
group = []
|
|
for i in range(1, 6): # TODO: shouldnt be hardcoded
|
|
n = g * 5 + i
|
|
bottom_end = i # 1, 2, 3, 4, 5
|
|
top_end = bottom_end + 25 # 26, 27, 28, 29, 30 # TODO: shouldnt be hardcoded
|
|
group.append((f"{prefix}{n}.{extension}", [bottom_end, top_end]))
|
|
result.append(group)
|
|
|
|
# Add special groups at specified positions (other than beginning)
|
|
if special_groups:
|
|
for group in special_groups:
|
|
position = group.get('position', 0) # default value is 0 if not specified
|
|
df_name = group['df_name']
|
|
size = group.get('size', group_size)
|
|
|
|
# Create the special group tuples
|
|
special_tuples = []
|
|
for i in range(size):
|
|
first_col = first_col_start + i
|
|
last_col = first_col + last_col_offset
|
|
special_tuples.append((df_name, [first_col, last_col]))
|
|
|
|
tuples.insert(position, special_tuples)
|
|
|
|
|
|
return tuples
|
|
|
|
|
|
# file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT")
|
|
# df = pd.read_csv(file_path, sep="\t", skiprows=10) # Read with explicit column names
|
|
|
|
|
|
class DataProcessor:
|
|
def __init__(self, file_index, cache_path: str = None, base_path: str = None, include_time: bool = False):
|
|
self.file_index = file_index
|
|
self.base_path = base_path
|
|
self.include_time = include_time
|
|
if cache_path:
|
|
self.data = load(cache_path)
|
|
else:
|
|
self.data = self.load_data()
|
|
|
|
def load_data(self):
|
|
for idxs, group in enumerate(self.file_index):
|
|
for idx, tuple in enumerate(group):
|
|
file_path = os.path.join(self.base_path, tuple[0]) # ('zzzAD1.TXT')
|
|
if self.include_time:
|
|
col_indices = [0] + tuple[1] # [1, 26] + [0] -> [0, 1, 26]
|
|
else:
|
|
col_indices = tuple[1] # [1, 26]
|
|
try:
|
|
# Read the CSV file
|
|
df = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True)
|
|
self.file_index[idxs][idx] = df.iloc[:, col_indices].copy() # Extract the specified columns
|
|
|
|
print(f"Processed {file_path}, extracted columns: {col_indices}")
|
|
|
|
except Exception as e:
|
|
print(f"Error processing {file_path}: {str(e)}")
|
|
def _load_dataframe(self, file_path: str) -> OriginalSingleDamageScenario:
|
|
"""
|
|
Loads a single data file into a pandas DataFrame.
|
|
|
|
:param file_path: Path to the data file.
|
|
:return: DataFrame containing the numerical data.
|
|
"""
|
|
df = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True, nrows=1)
|
|
return df
|
|
|
|
def _load_all_data(self) -> GroupDataset:
|
|
"""
|
|
Loads all data files based on the grouping dictionary and returns a nested list.
|
|
|
|
:return: A nested list of DataFrames where the outer index corresponds to group_idx - 1.
|
|
"""
|
|
data = []
|
|
# Find the maximum group index to determine the list size
|
|
max_group_idx = len(self.file_index) if self.file_index else 0
|
|
|
|
# Handle case when file_index is empty
|
|
if max_group_idx == 0:
|
|
raise ValueError("No file index provided; file_index is empty.")
|
|
|
|
# Initialize empty lists
|
|
for _ in range(max_group_idx):
|
|
data.append([])
|
|
|
|
# Fill the list with data
|
|
for group_idx, file_list in self.file_index.items():
|
|
group_idx -= 1 # adjust due to undamage file
|
|
data[group_idx] = [self._load_dataframe(file) for file in file_list]
|
|
return data
|
|
|
|
def get_group_data(self, group_idx: int) -> List[pd.DataFrame]:
|
|
"""
|
|
Returns the list of DataFrames for the given group index.
|
|
|
|
:param group_idx: Index of the group.
|
|
:return: List of DataFrames.
|
|
"""
|
|
return self.data.get([group_idx, []])
|
|
|
|
def get_column_names(self, group_idx: int, file_idx: int = 0) -> List[str]:
|
|
"""
|
|
Returns the column names for the given group and file indices.
|
|
|
|
:param group_idx: Index of the group.
|
|
:param file_idx: Index of the file in the group.
|
|
:return: List of column names.
|
|
"""
|
|
if group_idx in self.data and len(self.data[group_idx]) > file_idx:
|
|
return self.data[group_idx][file_idx].columns.tolist()
|
|
return []
|
|
|
|
def get_data_info(self):
|
|
"""
|
|
Print information about the loaded data structure.
|
|
Adapted for when self.data is a List instead of a Dictionary.
|
|
"""
|
|
if isinstance(self.data, list):
|
|
# For each sublist in self.data, get the type names of all elements
|
|
pprint(
|
|
[
|
|
(
|
|
[type(item).__name__ for item in sublist]
|
|
if isinstance(sublist, list)
|
|
else type(sublist).__name__
|
|
)
|
|
for sublist in self.data
|
|
]
|
|
)
|
|
else:
|
|
pprint(
|
|
{
|
|
key: [type(df).__name__ for df in value]
|
|
for key, value in self.data.items()
|
|
}
|
|
if isinstance(self.data, dict)
|
|
else type(self.data).__name__
|
|
)
|
|
|
|
def _create_vector_column_index(self) -> VectorColumnIndices:
|
|
vector_col_idx: VectorColumnIndices = []
|
|
y = 0
|
|
for data_group in self.data: # len(data_group[i]) = 5
|
|
for j in data_group: # len(j[i]) =
|
|
c: VectorColumnIndex = []
|
|
x = 0
|
|
for _ in range(6): # TODO: range(6) should be dynamic and parameterized
|
|
c.append(x + y)
|
|
x += 5
|
|
vector_col_idx.append(c)
|
|
y += 1
|
|
return vector_col_idx # TODO: refactor this so that it returns just from first data_group without using for loops through the self.data that seems unnecessary
|
|
|
|
def create_vector_column(self, overwrite=True) -> List[List[List[pd.DataFrame]]]:
|
|
"""
|
|
Create a vector column from the loaded data.
|
|
|
|
:param overwrite: Overwrite the original data with vector column-based data.
|
|
"""
|
|
idxs = self._create_vector_column_index()
|
|
for i, group in enumerate(self.data):
|
|
# add 1 to all indices to account for 'Time' being at position 0
|
|
for j, df in enumerate(group):
|
|
idx = [_ + 1 for _ in idxs[j]]
|
|
# slice out the desired columns, copy into a fresh DataFrame,
|
|
# then overwrite self.data[i][j] with it
|
|
self.data[i][j] = df.iloc[:, idx].copy()
|
|
|
|
# TODO: if !overwrite:
|
|
|
|
def create_limited_sensor_vector_column(self, overwrite=True):
|
|
"""
|
|
Create a vector column from the loaded data.
|
|
|
|
:param overwrite: Overwrite the original data with vector column-based data.
|
|
"""
|
|
idx = self._create_vector_column_index()
|
|
# if overwrite:
|
|
for i in range(len(self.data)): # damage(s)
|
|
for j in range(len(self.data[i])): # col(s)
|
|
# Get the appropriate indices for slicing from idx
|
|
indices = idx[j]
|
|
|
|
# Get the current DataFrame
|
|
df = self.data[i][j]
|
|
|
|
# Keep the 'Time' column and select only specifid 'Real' colmns
|
|
# First, we add 1 to all indices to acount for 'Time' being at positiion 0
|
|
real_indices = [index + 1 for index in indices]
|
|
|
|
# Create list with Time column index (0) and the adjustedd Real indices
|
|
all_indices = [0] + [real_indices[0]] + [real_indices[-1]]
|
|
|
|
# Apply the slicing
|
|
self.data[i][j] = df.iloc[:, all_indices]
|
|
# TODO: if !overwrite:
|
|
|
|
def export_to_csv(self, output_dir: str, file_prefix: str = "DAMAGE"):
|
|
"""
|
|
Export the processed data to CSV files in the required folder structure.
|
|
|
|
:param output_dir: Directory to save the CSV files.
|
|
:param file_prefix: Prefix for the output filenames.
|
|
"""
|
|
for group_idx, group in enumerate(self.file_index, start=0):
|
|
group_folder = os.path.join(output_dir, f"{file_prefix}_{group_idx}")
|
|
os.makedirs(group_folder, exist_ok=True)
|
|
|
|
for test_idx, df in enumerate(group, start=1):
|
|
out1 = os.path.join(group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_01.csv")
|
|
cols_to_export = [0, 1] if self.include_time else [1]
|
|
df.iloc[:, cols_to_export].to_csv(out1, index=False)
|
|
|
|
out2 = os.path.join(group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_02.csv")
|
|
cols_to_export = [0, 2] if self.include_time else [2]
|
|
df.iloc[:, cols_to_export].to_csv(out2, index=False)
|
|
|
|
# def create_damage_files(base_path, output_base, prefix):
|
|
# # Initialize colorama
|
|
# init(autoreset=True)
|
|
|
|
# # Generate column labels based on expected duplication in input files
|
|
# columns = ["Real"] + [
|
|
# f"Real.{i}" for i in range(1, 30)
|
|
# ] # Explicitly setting column names
|
|
|
|
# sensor_end_map = {
|
|
# 1: "Real.25",
|
|
# 2: "Real.26",
|
|
# 3: "Real.27",
|
|
# 4: "Real.28",
|
|
# 5: "Real.29",
|
|
# }
|
|
|
|
# # Define the damage scenarios and the corresponding original file indices
|
|
# damage_scenarios = {
|
|
# 1: range(1, 6), # Damage 1 files from zzzAD1.csv to zzzAD5.csv
|
|
# 2: range(6, 11), # Damage 2 files from zzzAD6.csv to zzzAD10.csv
|
|
# 3: range(11, 16), # Damage 3 files from zzzAD11.csv to zzzAD15.csvs
|
|
# 4: range(16, 21), # Damage 4 files from zzzAD16.csv to zzzAD20.csv
|
|
# 5: range(21, 26), # Damage 5 files from zzzAD21.csv to zzzAD25.csv
|
|
# 6: range(26, 31), # Damage 6 files from zzzAD26.csv to zzzAD30.csv
|
|
# }
|
|
# damage_pad = len(str(len(damage_scenarios)))
|
|
# test_pad = len(str(30))
|
|
|
|
# for damage, files in damage_scenarios.items():
|
|
# for i, file_index in enumerate(files, start=1):
|
|
# # Load original data file
|
|
# file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT")
|
|
# df = pd.read_csv(
|
|
# file_path, sep="\t", skiprows=10
|
|
# ) # Read with explicit column names
|
|
|
|
# top_sensor = columns[i - 1]
|
|
# print(top_sensor, type(top_sensor))
|
|
# output_file_1 = os.path.join(
|
|
# output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_01.csv"
|
|
# )
|
|
# print(f"Creating {output_file_1} from taking zzz{prefix}D{file_index}.TXT")
|
|
# print("Taking datetime column on index 0...")
|
|
# print(f"Taking `{top_sensor}`...")
|
|
# os.makedirs(os.path.dirname(output_file_1), exist_ok=True)
|
|
# df[["Time", top_sensor]].to_csv(output_file_1, index=False)
|
|
# print(Fore.GREEN + "Done")
|
|
|
|
# bottom_sensor = sensor_end_map[i]
|
|
# output_file_2 = os.path.join(
|
|
# output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_02.csv"
|
|
# )
|
|
# print(f"Creating {output_file_2} from taking zzz{prefix}D{file_index}.TXT")
|
|
# print("Taking datetime column on index 0...")
|
|
# print(f"Taking `{bottom_sensor}`...")
|
|
# os.makedirs(os.path.dirname(output_file_2), exist_ok=True)
|
|
# df[["Time", bottom_sensor]].to_csv(output_file_2, index=False)
|
|
# print(Fore.GREEN + "Done")
|
|
# print("---")
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python convert.py <path_to_csv_files>")
|
|
sys.exit(1)
|
|
|
|
base_path = sys.argv[1]
|
|
output_base = sys.argv[2]
|
|
prefix = sys.argv[3] # Define output directory
|
|
|
|
# Create output folders if they don't exist
|
|
# for i in range(1, 7):
|
|
# os.makedirs(os.path.join(output_base, f'DAMAGE_{i}'), exist_ok=True)
|
|
|
|
create_damage_files(base_path, output_base, prefix)
|
|
print(Fore.YELLOW + Style.BRIGHT + "All files have been created successfully.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|