feat(data): Enhance DataProcessor to support dynamic base path and improve data loading with error handling and memory efficiency

This commit is contained in:
nuluh
2025-06-16 17:35:27 +07:00
parent 60ff4e0fa9
commit 24c1484300

View File

@@ -104,13 +104,28 @@ def generate_df_tuples(total_dfs=30, group_size=5, prefix="zzzAD", ext="TXT", fi
class DataProcessor: class DataProcessor:
def __init__(self, file_index: DamageFilesIndices, cache_path: str = None): def __init__(self, file_index, cache_path: str = None, base_path: str = None):
self.file_index = file_index self.file_index = file_index
self.base_path = base_path
if cache_path: if cache_path:
self.data = load(cache_path) self.data = load(cache_path)
else: else:
self.data = self._load_all_data() self.data = self.load_data()
def load_data(self):
for idxs, group in enumerate(self.file_index):
for idx, tuple in enumerate(group):
file_path = os.path.join(self.base_path, tuple[0]) # ('zzzAD1.TXT')
col_indices = tuple[1] # [1, 26]
try:
# Read the CSV file
df = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True)
self.file_index[idxs][idx] = df.iloc[:, col_indices].copy() # Extract the specified columns
print(f"Processed {file_path}, extracted columns: {col_indices}")
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
def _load_dataframe(self, file_path: str) -> OriginalSingleDamageScenario: def _load_dataframe(self, file_path: str) -> OriginalSingleDamageScenario:
""" """
Loads a single data file into a pandas DataFrame. Loads a single data file into a pandas DataFrame.
@@ -118,7 +133,7 @@ class DataProcessor:
:param file_path: Path to the data file. :param file_path: Path to the data file.
:return: DataFrame containing the numerical data. :return: DataFrame containing the numerical data.
""" """
df = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True) df = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True, nrows=1)
return df return df
def _load_all_data(self) -> GroupDataset: def _load_all_data(self) -> GroupDataset:
@@ -141,6 +156,7 @@ class DataProcessor:
# Fill the list with data # Fill the list with data
for group_idx, file_list in self.file_index.items(): for group_idx, file_list in self.file_index.items():
group_idx -= 1 # adjust due to undamage file
data[group_idx] = [self._load_dataframe(file) for file in file_list] data[group_idx] = [self._load_dataframe(file) for file in file_list]
return data return data
@@ -204,7 +220,7 @@ class DataProcessor:
x += 5 x += 5
vector_col_idx.append(c) vector_col_idx.append(c)
y += 1 y += 1
return vector_col_idx return vector_col_idx # TODO: refactor this so that it returns just from first data_group without using for loops through the self.data that seems unnecessary
def create_vector_column(self, overwrite=True) -> List[List[List[pd.DataFrame]]]: def create_vector_column(self, overwrite=True) -> List[List[List[pd.DataFrame]]]:
""" """
@@ -212,26 +228,16 @@ class DataProcessor:
:param overwrite: Overwrite the original data with vector column-based data. :param overwrite: Overwrite the original data with vector column-based data.
""" """
idx = self._create_vector_column_index() idxs = self._create_vector_column_index()
# if overwrite: for i, group in enumerate(self.data):
for i in range(len(self.data)): # add 1 to all indices to account for 'Time' being at position 0
for j in range(len(self.data[i])): for j, df in enumerate(group):
# Get the appropriate indices for slicing from idx idx = [_ + 1 for _ in idxs[j]]
indices = idx[j] # slice out the desired columns, copy into a fresh DataFrame,
# then overwrite self.data[i][j] with it
self.data[i][j] = df.iloc[:, idx].copy()
# Get the current DataFrame # TODO: if !overwrite:
df = self.data[i][j]
# Keep the 'Time' column and select only specified 'Real' columns
# First, we add 1 to all indices to account for 'Time' being at position 0
real_indices = [index + 1 for index in indices]
# Create list with Time column index (0) and the adjusted Real indices
all_indices = [0] + real_indices
# Apply the slicing
self.data[i][j] = df.iloc[:, all_indices]
# TODO: if !overwrite:
def create_limited_sensor_vector_column(self, overwrite=True): def create_limited_sensor_vector_column(self, overwrite=True):
""" """