Merge pull request #98 from nuluh/feat/53-feat-include-undamaged-node-classification

Closes #53
This commit was merged in pull request #98.
This commit is contained in:
Rifqi D. Panuluh
2025-06-18 09:06:04 +07:00
committed by GitHub
7 changed files with 445 additions and 217 deletions

View File

@@ -1,4 +1,7 @@
{
"python.analysis.extraPaths": ["./code/src/features"],
"python.analysis.extraPaths": [
"./code/src/features",
"${workspaceFolder}/code/src"
],
"jupyter.notebookFileRoot": "${workspaceFolder}/code"
}

View File

@@ -17,8 +17,8 @@
"metadata": {},
"outputs": [],
"source": [
"sensor1 = pd.read_csv('D:/thesis/data/converted/raw/DAMAGE_1/DAMAGE_1_TEST1_01.csv',sep=',')\n",
"sensor2 = pd.read_csv('D:/thesis/data/converted/raw/DAMAGE_1/DAMAGE_1_TEST1_02.csv',sep=',')"
"sensor1 = pd.read_csv('D:/thesis/data/converted/raw/DAMAGE_1/DAMAGE_0_TEST1_01.csv',sep=',')\n",
"sensor2 = pd.read_csv('D:/thesis/data/converted/raw/DAMAGE_1/DAMAGE_0_TEST1_02.csv',sep=',')"
]
},
{
@@ -101,13 +101,16 @@
"source": [
"# Combined Plot for sensor 1 and sensor 2 from data1 file in which motor is operated at 800 rpm\n",
"\n",
"plt.plot(df1['s2'], label='sensor 2')\n",
"plt.plot(df1['s1'], label='sensor 1', alpha=0.5)\n",
"plt.plot(df1['s2'], label='Sensor 1', color='C1', alpha=0.6)\n",
"plt.plot(df1['s1'], label='Sensor 2', color='C0', alpha=0.6)\n",
"plt.xlabel(\"Number of samples\")\n",
"plt.ylabel(\"Amplitude\")\n",
"plt.title(\"Raw vibration signal\")\n",
"plt.ylim(-7.5, 5)\n",
"plt.legend()\n",
"plt.locator_params(axis='x', nbins=8)\n",
"plt.ylim(-1, 1) # Adjust range as needed\n",
"plt.grid(True, linestyle='--', alpha=0.5)\n",
"plt.show()"
]
},
@@ -334,9 +337,44 @@
"metadata": {},
"outputs": [],
"source": [
"# len(ready_data1a)\n",
"# plt.pcolormesh(ready_data1[0])\n",
"ready_data1a[0].max().max()"
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from mpl_toolkits.mplot3d import Axes3D\n",
"\n",
"# Assuming ready_data1a[0] is a DataFrame or 2D array\n",
"spectrogram_data = ready_data1a[0].values # Convert to NumPy array if it's a DataFrame\n",
"\n",
"# Get the dimensions of the spectrogram\n",
"num_frequencies, num_time_frames = spectrogram_data.shape\n",
"\n",
"# Create frequency and time arrays\n",
"frequencies = np.arange(num_frequencies) # Replace with actual frequency values if available\n",
"time_frames = np.arange(num_time_frames) # Replace with actual time values if available\n",
"\n",
"# Create a meshgrid for plotting\n",
"T, F = np.meshgrid(time_frames, frequencies)\n",
"\n",
"# Create a 3D plot\n",
"fig = plt.figure(figsize=(12, 8))\n",
"ax = fig.add_subplot(111, projection='3d')\n",
"\n",
"# Plot the surface\n",
"surf = ax.plot_surface(T, F, spectrogram_data, cmap='bwr', edgecolor='none')\n",
"\n",
"# Add labels and a color bar\n",
"ax.set_xlabel('Time Frames')\n",
"ax.set_ylabel('Frequency [Hz]')\n",
"ax.set_zlabel('Magnitude')\n",
"ax.set_title('3D Spectrogram')\n",
"# Resize the z-axis (shrink it)\n",
"z_min, z_max = 0, 0.1 # Replace with your desired range\n",
"ax.set_zlim(z_min, z_max)\n",
"ax.get_proj = lambda: np.dot(Axes3D.get_proj(ax), np.diag([1, 1, 0.5, 1])) # Shrink z-axis by 50%\n",
"ax.set_facecolor('white')\n",
"fig.colorbar(surf, ax=ax, shrink=0.5, aspect=10)\n",
"\n",
"# Show the plot\n",
"plt.show()"
]
},
{
@@ -345,13 +383,32 @@
"metadata": {},
"outputs": [],
"source": [
"from cmcrameri import cm\n",
"# Create a figure and subplots\n",
"fig, axes = plt.subplots(2, 3, figsize=(15, 8), sharex=True, sharey=True)\n",
"\n",
"# Flatten the axes array for easier iteration\n",
"axes = axes.flatten()\n",
"\n",
"# Loop through each subplot and plot the data\n",
"for i in range(6):\n",
" plt.pcolormesh(ready_data1a[i], cmap=\"jet\", vmax=0.03, vmin=0.0)\n",
" plt.colorbar() \n",
" plt.title(f'STFT Magnitude for case {i} sensor 1')\n",
" plt.xlabel(f'Frequency [Hz]')\n",
" plt.ylabel(f'Time [sec]')\n",
" plt.show()"
" pcm = axes[i].pcolormesh(ready_data1a[i].transpose(), cmap='bwr', vmax=0.03, vmin=0.0)\n",
" axes[i].set_title(f'Case {i} Sensor A', fontsize=12)\n",
"\n",
"# Add a single color bar for all subplots\n",
"# Use the first `pcolormesh` object (or any valid one) for the color bar\n",
"cbar = fig.colorbar(pcm, ax=axes, orientation='vertical')\n",
"# cbar.set_label('Magnitude')\n",
"\n",
"# Set shared labels\n",
"fig.text(0.5, 0.04, 'Time Frames', ha='center', fontsize=12)\n",
"fig.text(0.04, 0.5, 'Frequency [Hz]', va='center', rotation='vertical', fontsize=12)\n",
"\n",
"# Adjust layout\n",
"# plt.tight_layout(rect=[0.05, 0.05, 1, 1]) # Leave space for shared labels\n",
"plt.subplots_adjust(left=0.1, right=0.75, top=0.9, bottom=0.1, wspace=0.2, hspace=0.2)\n",
"\n",
"plt.show()"
]
},
{
@@ -576,6 +633,16 @@
"X2a, y = create_ready_data('D:/thesis/data/converted/raw/sensor2')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"X1a.iloc[-1,:]\n",
"# y[2565]"
]
},
{
"cell_type": "code",
"execution_count": null,
@@ -621,23 +688,12 @@
"metadata": {},
"outputs": [],
"source": [
"def train_and_evaluate_model(model, model_name, sensor_label, x_train, y_train, x_test, y_test):\n",
" model.fit(x_train, y_train)\n",
" y_pred = model.predict(x_test)\n",
" accuracy = accuracy_score(y_test, y_pred) * 100\n",
" return {\n",
" \"model\": model_name,\n",
" \"sensor\": sensor_label,\n",
" \"accuracy\": accuracy\n",
" }"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from src.ml.model_selection import train_and_evaluate_model\n",
"from sklearn.svm import SVC\n",
"from sklearn.pipeline import make_pipeline\n",
"from sklearn.preprocessing import StandardScaler\n",
"from sklearn.svm import SVC\n",
"from sklearn.decomposition import PCA\n",
"# Define models for sensor1\n",
"models_sensor1 = {\n",
" # \"Random Forest\": RandomForestClassifier(),\n",
@@ -646,12 +702,18 @@
" # \"KNN\": KNeighborsClassifier(),\n",
" # \"LDA\": LinearDiscriminantAnalysis(),\n",
" \"SVM\": SVC(),\n",
" \"XGBoost\": XGBClassifier()\n",
" \"SVM with StandardScaler and PCA\": make_pipeline(\n",
" StandardScaler(),\n",
" PCA(n_components=10),\n",
" SVC(kernel='rbf')\n",
" ),\n",
"\n",
" # \"XGBoost\": XGBClassifier()\n",
"}\n",
"\n",
"results_sensor1 = []\n",
"for name, model in models_sensor1.items():\n",
" res = train_and_evaluate_model(model, name, \"sensor1\", x_train1, y_train, x_test1, y_test)\n",
" res = train_and_evaluate_model(model, name, \"sensor1\", x_train1, y_train, x_test1, y_test, export='D:/thesis/models/sensor1')\n",
" results_sensor1.append(res)\n",
" print(f\"{name} on sensor1: Accuracy = {res['accuracy']:.2f}%\")\n"
]
@@ -669,12 +731,17 @@
" # \"KNN\": KNeighborsClassifier(),\n",
" # \"LDA\": LinearDiscriminantAnalysis(),\n",
" \"SVM\": SVC(),\n",
" \"XGBoost\": XGBClassifier()\n",
" \"SVM with StandardScaler and PCA\": make_pipeline(\n",
" StandardScaler(),\n",
" PCA(n_components=10),\n",
" SVC(kernel='rbf')\n",
" ),\n",
" # \"XGBoost\": XGBClassifier()\n",
"}\n",
"\n",
"results_sensor2 = []\n",
"for name, model in models_sensor2.items():\n",
" res = train_and_evaluate_model(model, name, \"sensor2\", x_train2, y_train, x_test2, y_test)\n",
" res = train_and_evaluate_model(model, name, \"sensor2\", x_train2, y_train, x_test2, y_test, export='D:/thesis/models/sensor2')\n",
" results_sensor2.append(res)\n",
" print(f\"{name} on sensor2: Accuracy = {res['accuracy']:.2f}%\")\n"
]
@@ -787,6 +854,8 @@
"source": [
"from sklearn.metrics import accuracy_score, classification_report\n",
"# 4. Validate on Dataset B\n",
"from joblib import load\n",
"svm_model = load('D:/thesis/models/sensor1/SVM.joblib')\n",
"y_pred_svm = svm_model.predict(X1b)\n",
"\n",
"# 5. Evaluate\n",
@@ -794,6 +863,30 @@
"print(classification_report(y, y_pred_svm))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Model sensor 1 to predict sensor 2 data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.metrics import accuracy_score, classification_report\n",
"# 4. Validate on Dataset B\n",
"from joblib import load\n",
"svm_model = load('D:/thesis/models/sensor1/SVM.joblib')\n",
"y_pred_svm = svm_model.predict(X2b)\n",
"\n",
"# 5. Evaluate\n",
"print(\"Accuracy on Dataset B:\", accuracy_score(y, y_pred_svm))\n",
"print(classification_report(y, y_pred_svm))"
]
},
{
"cell_type": "code",
"execution_count": null,
@@ -853,7 +946,7 @@
"# Plot\n",
"disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)\n",
"disp.plot(cmap=plt.cm.Blues) # You can change colormap\n",
"plt.title(\"SVM Sensor1 CM Train w/ Dataset A Val w/ Dataset B\")\n",
"plt.title(\"SVM Sensor1 CM Train w/ Dataset A Val w/ Dataset B from Sensor2 readings\")\n",
"plt.show()"
]
},
@@ -871,14 +964,14 @@
"outputs": [],
"source": [
"# 1. Predict sensor 1 on Dataset A\n",
"y_train_pred = svm_model.predict(x_train1)\n",
"y_test_pred = svm_model.predict(x_test1)\n",
"\n",
"# 2. Import confusion matrix tools\n",
"from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n",
"import matplotlib.pyplot as plt\n",
"\n",
"# 3. Create and plot confusion matrix\n",
"cm_train = confusion_matrix(y_train, y_train_pred)\n",
"cm_train = confusion_matrix(y_test, y_test_pred)\n",
"labels = svm_model.classes_\n",
"\n",
"disp = ConfusionMatrixDisplay(confusion_matrix=cm_train, display_labels=labels)\n",

View File

@@ -25,9 +25,9 @@ def create_ready_data(
"""
ready_data = []
for file in os.listdir(stft_data_path):
ready_data.append(pd.read_csv(os.path.join(stft_data_path, file)))
ready_data.append(pd.read_csv(os.path.join(stft_data_path, file), skiprows=1))
y_data = [i for i in range(len(ready_data))]
y_data = [i for i in range(len(ready_data))] # TODO: Should be replaced with actual desired labels
# Combine all dataframes in ready_data into a single dataframe
if ready_data: # Check if the list is not empty
@@ -55,3 +55,101 @@ def create_ready_data(
y = np.array([])
return X, y
def train_and_evaluate_model(
model, model_name, sensor_label, x_train, y_train, x_test, y_test, export=None
):
"""
Train a machine learning model, evaluate its performance, and optionally export it.
This function trains the provided model on the training data, evaluates its
performance on test data using accuracy score, and can save the trained model
to disk if an export path is provided.
Parameters
----------
model : estimator object
The machine learning model to train.
model_name : str
Name of the model, used for the export filename and in the returned results.
sensor_label : str
Label identifying which sensor's data the model is being trained on.
x_train : array-like or pandas.DataFrame
The training input samples.
y_train : array-like
The target values for training.
x_test : array-like or pandas.DataFrame
The test input samples.
y_test : array-like
The target values for testing.
export : str, optional
Directory path where the trained model should be saved. If None, model won't be saved.
Returns
-------
dict
Dictionary containing:
- 'model': model_name (str)
- 'sensor': sensor_label (str)
- 'accuracy': accuracy percentage (float)
Example
-------
>>> from sklearn.svm import SVC
>>> from sklearn.model_selection import train_test_split
>>> X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2)
>>> result = train_and_evaluate_model(
... SVC(),
... "SVM",
... "sensor1",
... X_train,
... y_train,
... X_test,
... y_test,
... export="models/sensor1"
... )
>>> print(f"Model accuracy: {result['accuracy']:.2f}%")
"""
from sklearn.metrics import accuracy_score
result = {"model": model_name, "sensor": sensor_label, "success": False}
try:
# Train the model
model.fit(x_train, y_train)
try:
y_pred = model.predict(x_test)
except Exception as e:
result["error"] = f"Prediction error: {str(e)}"
return result
# Calculate accuracy
try:
accuracy = accuracy_score(y_test, y_pred) * 100
result["accuracy"] = accuracy
except Exception as e:
result["error"] = f"Accuracy calculation error: {str(e)}"
return result
# Export model if requested
if export:
try:
import joblib
full_path = os.path.join(export, f"{model_name}.joblib")
os.makedirs(os.path.dirname(full_path), exist_ok=True)
joblib.dump(model, full_path)
print(f"Model saved to {full_path}")
except Exception as e:
print(f"Warning: Failed to export model to {export}: {str(e)}")
result["export_error"] = str(e)
# Continue despite export error
result["success"] = True
return result
except Exception as e:
result["error"] = f"Training error: {str(e)}"
return result

View File

@@ -6,7 +6,7 @@ import glob
import multiprocessing # Added import for multiprocessing
# Define the base directory where DAMAGE_X folders are located
damage_base_path = 'D:/thesis/data/converted/raw'
damage_base_path = 'D:/thesis/data/converted/raw_B'
# Define output directories for each sensor
output_dirs = {
@@ -105,6 +105,8 @@ def process_damage_case(damage_num):
)
# Save the aggregated STFT to CSV
with open(output_file, 'w') as file:
file.write('sep=,\n')
df_aggregated.to_csv(output_file, index=False)
print(f"Saved aggregated STFT for Sensor {sensor_num}, Damage {damage_num} to {output_file}")
else:
@@ -112,4 +114,4 @@ def process_damage_case(damage_num):
if __name__ == "__main__": # Added main guard for multiprocessing
with multiprocessing.Pool() as pool:
pool.map(process_damage_case, range(1, num_damage_cases + 1))
pool.map(process_damage_case, range(0, num_damage_cases + 1))

View File

@@ -26,73 +26,109 @@ class DamageFilesIndices(TypedDict):
files: List[str]
def generate_damage_files_index(**kwargs) -> DamageFilesIndices:
prefix: str = kwargs.get("prefix", "zzzAD")
extension: str = kwargs.get("extension", ".TXT")
num_damage: int = kwargs.get("num_damage")
file_index_start: int = kwargs.get("file_index_start")
col: int = kwargs.get("col")
base_path: str = kwargs.get("base_path")
def generate_df_tuples(total_dfs=30, group_size=5, prefix="zzzAD", ext="TXT", first_col_start=1, last_col_offset=25,
special_groups=None, group=True):
"""
Generate a structured list of tuples containing DataFrame references and column indices.
damage_scenarios = {}
a = file_index_start
b = col + 1
for i in range(1, num_damage + 1):
damage_scenarios[i] = range(a, b)
a += col
b += col
Parameters:
-----------
total_dfs : int, default 30
Total number of DataFrames to include in the tuples
group_size : int, default 5
Number of DataFrames in each group (determines the pattern repeat)
prefix : str, default "df"
Prefix for DataFrame variable names
first_col_start : int, default 1
Starting value for the first column index (1-indexed)
last_col_offset : int, default 25
Offset to add to first_col_start to get the last column index
special_groups : list of dict, optional
List of special groups to insert, each dict should contain:
- 'df_name': The DataFrame name to use for all tuples in this group
- 'position': Where to insert this group (0 for beginning)
- 'size': Size of this group (default: same as group_size)
# return damage_scenarios
Returns:
--------
list
List of tuples, where each tuple contains (df_name, [first_col, last_col])
"""
tuples = []
# Add regular groups
for i in range(1, total_dfs + 1):
# for _ in range(group_size): # group tuple
# temporary list to hold tuples for this group
# list = []
# Calculate the position within the group (1 to group_size)
position_in_group = ((i - 1) % group_size) + 1
# Calculate column indices based on position in group
first_col = first_col_start + position_in_group - 1
last_col = first_col + last_col_offset
# Create the tuple with DataFrame reference and column indices
df_name = f"{prefix}{i}.{ext}"
tuples.append((df_name, [first_col, last_col]))
if group:
# Group tuples into sublists of group_size
grouped_tuples = []
for i in range(0, len(tuples), group_size):
grouped_tuples.append(tuples[i:i + group_size])
tuples = grouped_tuples
# tuples.append(list)
# Add special groups at specified positions (other than beginning)
if special_groups:
for group in special_groups:
position = group.get('position', 0) # default value is 0 if not specified
df_name = group['df_name']
size = group.get('size', group_size)
# Create the special group tuples
special_tuples = []
for i in range(size):
first_col = first_col_start + i
last_col = first_col + last_col_offset
special_tuples.append((df_name, [first_col, last_col]))
tuples.insert(position, special_tuples)
return tuples
x = {}
for damage, files in damage_scenarios.items():
x[damage] = [] # Initialize each key with an empty list
for i, file_index in enumerate(files, start=1):
if base_path:
x[damage].append(
os.path.normpath(
os.path.join(base_path, f"{prefix}{file_index}{extension}")
)
)
# if not os.path.exists(file_path):
# print(Fore.RED + f"File {file_path} does not exist.")
# continue
else:
x[damage].append(f"{prefix}{file_index}{extension}")
return x
# file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT")
# df = pd.read_csv( file_path, sep="\t", skiprows=10) # Read with explicit column names
# df = pd.read_csv(file_path, sep="\t", skiprows=10) # Read with explicit column names
class DataProcessor:
def __init__(self, file_index: DamageFilesIndices, cache_path: str = None):
def __init__(self, file_index, cache_path: str = None, base_path: str = None, include_time: bool = False):
self.file_index = file_index
self.base_path = base_path
self.include_time = include_time
if cache_path:
self.data = load(cache_path)
else:
self.data = self._load_all_data()
self.data = self.load_data()
def _extract_column_names(self, file_path: str) -> List[str]:
"""
Extracts column names from the header of the given file.
Assumes the 6th line contains column names.
def load_data(self):
for idxs, group in enumerate(self.file_index):
for idx, tuple in enumerate(group):
file_path = os.path.join(self.base_path, tuple[0]) # ('zzzAD1.TXT')
if self.include_time:
col_indices = [0] + tuple[1] # [1, 26] + [0] -> [0, 1, 26]
else:
col_indices = tuple[1] # [1, 26]
try:
# Read the CSV file
df = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True)
self.file_index[idxs][idx] = df.iloc[:, col_indices].copy() # Extract the specified columns
:param file_path: Path to the data file.
:return: List of column names.
"""
with open(file_path, "r") as f:
header_lines = [next(f) for _ in range(12)]
# Extract column names from the 6th line
channel_line = header_lines[10].strip()
tokens = re.findall(r'"([^"]+)"', channel_line)
if not channel_line.startswith('"'):
first_token = channel_line.split()[0]
tokens = [first_token] + tokens
return tokens # Prepend 'Time' column if applicable
print(f"Processed {file_path}, extracted columns: {col_indices}")
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
def _load_dataframe(self, file_path: str) -> OriginalSingleDamageScenario:
"""
Loads a single data file into a pandas DataFrame.
@@ -100,11 +136,7 @@ class DataProcessor:
:param file_path: Path to the data file.
:return: DataFrame containing the numerical data.
"""
col_names = self._extract_column_names(file_path)
df = pd.read_csv(
file_path, delim_whitespace=True, skiprows=11, header=None, memory_map=True
)
df.columns = col_names
df = pd.read_csv(file_path, delim_whitespace=True, skiprows=10, header=0, memory_map=True, nrows=1)
return df
def _load_all_data(self) -> GroupDataset:
@@ -115,7 +147,11 @@ class DataProcessor:
"""
data = []
# Find the maximum group index to determine the list size
max_group_idx = max(self.file_index.keys()) if self.file_index else 0
max_group_idx = len(self.file_index) if self.file_index else 0
# Handle case when file_index is empty
if max_group_idx == 0:
raise ValueError("No file index provided; file_index is empty.")
# Initialize empty lists
for _ in range(max_group_idx):
@@ -123,10 +159,8 @@ class DataProcessor:
# Fill the list with data
for group_idx, file_list in self.file_index.items():
# Adjust index to be 0-based
list_idx = group_idx - 1
data[list_idx] = [self._load_dataframe(file) for file in file_list]
group_idx -= 1 # adjust due to undamage file
data[group_idx] = [self._load_dataframe(file) for file in file_list]
return data
def get_group_data(self, group_idx: int) -> List[pd.DataFrame]:
@@ -182,14 +216,14 @@ class DataProcessor:
y = 0
for data_group in self.data: # len(data_group[i]) = 5
for j in data_group: # len(j[i]) =
c: VectorColumnIndex = [] # column vector c_{j}
c: VectorColumnIndex = []
x = 0
for _ in range(6): # TODO: range(6) should be dynamic and parameterized
c.append(x + y)
x += 5
vector_col_idx.append(c)
y += 1
return vector_col_idx
return vector_col_idx # TODO: refactor this so that it returns just from first data_group without using for loops through the self.data that seems unnecessary
def create_vector_column(self, overwrite=True) -> List[List[List[pd.DataFrame]]]:
"""
@@ -197,25 +231,15 @@ class DataProcessor:
:param overwrite: Overwrite the original data with vector column-based data.
"""
idx = self._create_vector_column_index()
# if overwrite:
for i in range(len(self.data)):
for j in range(len(self.data[i])):
# Get the appropriate indices for slicing from idx
indices = idx[j]
idxs = self._create_vector_column_index()
for i, group in enumerate(self.data):
# add 1 to all indices to account for 'Time' being at position 0
for j, df in enumerate(group):
idx = [_ + 1 for _ in idxs[j]]
# slice out the desired columns, copy into a fresh DataFrame,
# then overwrite self.data[i][j] with it
self.data[i][j] = df.iloc[:, idx].copy()
# Get the current DataFrame
df = self.data[i][j]
# Keep the 'Time' column and select only specified 'Real' columns
# First, we add 1 to all indices to account for 'Time' being at position 0
real_indices = [index + 1 for index in indices]
# Create list with Time column index (0) and the adjusted Real indices
all_indices = [0] + real_indices
# Apply the slicing
self.data[i][j] = df.iloc[:, all_indices]
# TODO: if !overwrite:
def create_limited_sensor_vector_column(self, overwrite=True):
@@ -252,91 +276,79 @@ class DataProcessor:
:param output_dir: Directory to save the CSV files.
:param file_prefix: Prefix for the output filenames.
"""
for group_idx, group in enumerate(self.data, start=1):
for group_idx, group in enumerate(self.file_index, start=0):
group_folder = os.path.join(output_dir, f"{file_prefix}_{group_idx}")
os.makedirs(group_folder, exist_ok=True)
for test_idx, df in enumerate(group, start=1):
# Ensure columns are named uniquely if duplicated
df = df.copy()
df.columns = ["Time", "Real_0", "Real_1"] # Rename
out1 = os.path.join(group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_01.csv")
cols_to_export = [0, 1] if self.include_time else [1]
df.iloc[:, cols_to_export].to_csv(out1, index=False)
# Export first Real column
out1 = os.path.join(
group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_01.csv"
)
df[["Time", "Real_0"]].rename(columns={"Real_0": "Real"}).to_csv(
out1, index=False
)
out2 = os.path.join(group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_02.csv")
cols_to_export = [0, 2] if self.include_time else [2]
df.iloc[:, cols_to_export].to_csv(out2, index=False)
# Export last Real column
out2 = os.path.join(
group_folder, f"{file_prefix}_{group_idx}_TEST{test_idx}_02.csv"
)
df[["Time", "Real_1"]].rename(columns={"Real_1": "Real"}).to_csv(
out2, index=False
)
# def create_damage_files(base_path, output_base, prefix):
# # Initialize colorama
# init(autoreset=True)
# # Generate column labels based on expected duplication in input files
# columns = ["Real"] + [
# f"Real.{i}" for i in range(1, 30)
# ] # Explicitly setting column names
def create_damage_files(base_path, output_base, prefix):
# Initialize colorama
init(autoreset=True)
# sensor_end_map = {
# 1: "Real.25",
# 2: "Real.26",
# 3: "Real.27",
# 4: "Real.28",
# 5: "Real.29",
# }
# Generate column labels based on expected duplication in input files
columns = ["Real"] + [
f"Real.{i}" for i in range(1, 30)
] # Explicitly setting column names
# # Define the damage scenarios and the corresponding original file indices
# damage_scenarios = {
# 1: range(1, 6), # Damage 1 files from zzzAD1.csv to zzzAD5.csv
# 2: range(6, 11), # Damage 2 files from zzzAD6.csv to zzzAD10.csv
# 3: range(11, 16), # Damage 3 files from zzzAD11.csv to zzzAD15.csvs
# 4: range(16, 21), # Damage 4 files from zzzAD16.csv to zzzAD20.csv
# 5: range(21, 26), # Damage 5 files from zzzAD21.csv to zzzAD25.csv
# 6: range(26, 31), # Damage 6 files from zzzAD26.csv to zzzAD30.csv
# }
# damage_pad = len(str(len(damage_scenarios)))
# test_pad = len(str(30))
sensor_end_map = {
1: "Real.25",
2: "Real.26",
3: "Real.27",
4: "Real.28",
5: "Real.29",
}
# for damage, files in damage_scenarios.items():
# for i, file_index in enumerate(files, start=1):
# # Load original data file
# file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT")
# df = pd.read_csv(
# file_path, sep="\t", skiprows=10
# ) # Read with explicit column names
# Define the damage scenarios and the corresponding original file indices
damage_scenarios = {
1: range(1, 6), # Damage 1 files from zzzAD1.csv to zzzAD5.csv
2: range(6, 11), # Damage 2 files from zzzAD6.csv to zzzAD10.csv
3: range(11, 16), # Damage 3 files from zzzAD11.csv to zzzAD15.csvs
4: range(16, 21), # Damage 4 files from zzzAD16.csv to zzzAD20.csv
5: range(21, 26), # Damage 5 files from zzzAD21.csv to zzzAD25.csv
6: range(26, 31), # Damage 6 files from zzzAD26.csv to zzzAD30.csv
}
damage_pad = len(str(len(damage_scenarios)))
test_pad = len(str(30))
# top_sensor = columns[i - 1]
# print(top_sensor, type(top_sensor))
# output_file_1 = os.path.join(
# output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_01.csv"
# )
# print(f"Creating {output_file_1} from taking zzz{prefix}D{file_index}.TXT")
# print("Taking datetime column on index 0...")
# print(f"Taking `{top_sensor}`...")
# os.makedirs(os.path.dirname(output_file_1), exist_ok=True)
# df[["Time", top_sensor]].to_csv(output_file_1, index=False)
# print(Fore.GREEN + "Done")
for damage, files in damage_scenarios.items():
for i, file_index in enumerate(files, start=1):
# Load original data file
file_path = os.path.join(base_path, f"zzz{prefix}D{file_index}.TXT")
df = pd.read_csv(
file_path, sep="\t", skiprows=10
) # Read with explicit column names
top_sensor = columns[i - 1]
print(top_sensor, type(top_sensor))
output_file_1 = os.path.join(
output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_01.csv"
)
print(f"Creating {output_file_1} from taking zzz{prefix}D{file_index}.TXT")
print("Taking datetime column on index 0...")
print(f"Taking `{top_sensor}`...")
os.makedirs(os.path.dirname(output_file_1), exist_ok=True)
df[["Time", top_sensor]].to_csv(output_file_1, index=False)
print(Fore.GREEN + "Done")
bottom_sensor = sensor_end_map[i]
output_file_2 = os.path.join(
output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_02.csv"
)
print(f"Creating {output_file_2} from taking zzz{prefix}D{file_index}.TXT")
print("Taking datetime column on index 0...")
print(f"Taking `{bottom_sensor}`...")
os.makedirs(os.path.dirname(output_file_2), exist_ok=True)
df[["Time", bottom_sensor]].to_csv(output_file_2, index=False)
print(Fore.GREEN + "Done")
print("---")
# bottom_sensor = sensor_end_map[i]
# output_file_2 = os.path.join(
# output_base, f"DAMAGE_{damage}", f"DAMAGE{damage}_TEST{i}_02.csv"
# )
# print(f"Creating {output_file_2} from taking zzz{prefix}D{file_index}.TXT")
# print("Taking datetime column on index 0...")
# print(f"Taking `{bottom_sensor}`...")
# os.makedirs(os.path.dirname(output_file_2), exist_ok=True)
# df[["Time", bottom_sensor]].to_csv(output_file_2, index=False)
# print(Fore.GREEN + "Done")
# print("---")
def main():

View File

@@ -1,25 +1,45 @@
from convert import *
from joblib import dump, load
# b = generate_damage_files_index(
# num_damage=6,
# file_index_start=1,
# col=5,
# base_path="D:/thesis/data/dataset_B",
# prefix="zzzBD",
# # undamage_file="zzzBU.TXT"
# )
# Example: Generate tuples with a special group of df0 at the beginning
special_groups_A = [
{'df_name': 'zzzAU.TXT', 'position': 0, 'size': 5} # Add at beginning
]
special_groups_B = [
{'df_name': 'zzzBU.TXT', 'position': 0, 'size': 5} # Add at beginning
]
# Generate the tuples with the special group
# a = generate_df_tuples(special_groups=special_groups_A)
b = generate_df_tuples(special_groups=special_groups_B, prefix="zzzBD")
# a = generate_damage_files_index(
# num_damage=6, file_index_start=1, col=5, base_path="D:/thesis/data/dataset_A"
# num_damage=6,
# file_index_start=1,
# col=5,
# base_path="D:/thesis/data/dataset_A",
# prefix="zzzAD",
# # undamage_file="zzzBU.TXT"
# )
b = generate_damage_files_index(
num_damage=6,
file_index_start=1,
col=5,
base_path="D:/thesis/data/dataset_B",
prefix="zzzBD",
)
# data_A = DataProcessor(file_index=a)
# # data.create_vector_column(overwrite=True)
# data_A.create_limited_sensor_vector_column(overwrite=True)
# data_A = DataProcessor(file_index=a, base_path="D:/thesis/data/dataset_A", include_time=True)
# data_A.create_vector_column(overwrite=True)
# # data_A.create_limited_sensor_vector_column(overwrite=True)
# data_A.export_to_csv("D:/thesis/data/converted/raw")
data_B = DataProcessor(file_index=b)
# data.create_vector_column(overwrite=True)
data_B.create_limited_sensor_vector_column(overwrite=True)
data_B = DataProcessor(file_index=b, base_path="D:/thesis/data/dataset_B", include_time=True)
# data_B.create_vector_column(overwrite=True)
# # data_B.create_limited_sensor_vector_column(overwrite=True)
data_B.export_to_csv("D:/thesis/data/converted/raw_B")
# a = load("D:/cache.joblib")
# breakpoint()

BIN
latex/figures/A4 - 4.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 188 KiB