Files
thesis/code/notebooks/03_feature_extraction.ipynb

813 lines
22 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Mockup Data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To test the `FeatureExtractor` class from the `time_domain_features.py` script with a simple mockup dataset of 5 to 10 data points directly in a Python notebook."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Importing Modules\n",
"\n",
"Use relative imports or modify the path to include the directory where the module is stored. In this example, well simulate the relative import setup."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create Mockup Data\n",
"\n",
"Create a small dataset with 5 to 10 data points. Simulate importing the `FeatureExtractor` from its relative path in the notebooks directory."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create mockup data\n",
"np.random.seed(42) # For reproducibility\n",
"mock_data = np.random.randn(10) # Generate 10 random data points\n",
"\n",
"# Convert to DataFrame (simulating processed data input)\n",
"mock_df = pd.DataFrame(mock_data, columns=['SampleData'])\n",
"mock_df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Visualize Data Points"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"# Plotting the data points\n",
"plt.figure(figsize=(8, 6))\n",
"plt.plot(mock_df.index, mock_df['SampleData'], marker='o', color='blue', label='Data Points')\n",
"plt.title('Scatter Plot of Data Points')\n",
"plt.xlabel('Time')\n",
"plt.ylabel('SampleData')\n",
"plt.legend()\n",
"plt.grid(True)\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Print Time-domain Features (Single Mockup Data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import sys\n",
"import os\n",
"# Assuming the src directory is one level up from the notebooks directory\n",
"sys.path.append('../src/features')\n",
"from time_domain_features import FeatureExtractor\n",
"\n",
"\n",
"# Extract features\n",
"extracted = FeatureExtractor(mock_df['SampleData'])\n",
"\n",
"# Format with pandas DataFramw\n",
"features = pd.DataFrame(extracted.features, index=[0])\n",
"features\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Print Time-domain Features (Multiple CSV Mockup Data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Importing modules"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import sys\n",
"import os\n",
"import re\n",
"# Assuming the src directory is one level up from the notebooks directory\n",
"sys.path.append('../src/features')\n",
"from time_domain_features import ExtractTimeFeatures # use wrapper function instead of class for easy use\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### The function"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define a function to extract numbers from a filename that later used as labels features\n",
"def extract_numbers(filename):\n",
" '''\n",
" Extract numbers from a filename\n",
"\n",
" Parameters\n",
" ----------\n",
" filename : str\n",
" The filename to extract numbers from\n",
"\n",
" Returns\n",
" -------\n",
" list\n",
" A list of extracted numbers: [damage_number, test_number, sensor_number]\n",
" '''\n",
" # Find all occurrences of one or more digits in the filename\n",
" numbers = re.findall(r'\\d+', filename)\n",
" # Convert the list of number strings to integers\n",
" numbers = [int(num) for num in numbers]\n",
" # Convert to a tuple and return\n",
" return numbers\n",
"\n",
"def build_features(input_dir:str, sensor:int=None):\n",
" all_features = []\n",
" for nth_damage in os.listdir(input_dir):\n",
" nth_damage_path = os.path.join(input_dir, nth_damage)\n",
" print(f'Extracting features from damage folder {nth_damage_path}')\n",
" if os.path.isdir(nth_damage_path):\n",
" for nth_test in os.listdir(nth_damage_path):\n",
" nth_test_path = os.path.join(nth_damage_path, nth_test)\n",
" if sensor is not None:\n",
" # Check if the file has the specified sensor suffix\n",
" if not nth_test.endswith(f'_{sensor}.csv'):\n",
" continue\n",
" features = ExtractTimeFeatures(nth_test_path) # return the one csv file feature in dictionary {}\n",
" features['label'] = extract_numbers(nth_test)[0] # add labels to the dictionary\n",
" features['filename'] = nth_test # add filename to the dictionary\n",
" all_features.append(features)\n",
"\n",
" # Create a DataFrame from the list of dictionaries\n",
" df = pd.DataFrame(all_features)\n",
" return df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Execute the automation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data_dir = \"../../data/raw\"\n",
"# Extract features\n",
"df1 = build_features(data_dir, sensor=1)\n",
"df2 = build_features(data_dir, sensor=2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Sensor 1 Extracted Features"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df1.head(5)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Sensor 2 Extracted Features"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df2.head(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Perform division"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Separate the label column\n",
"label_column = df1.iloc[:, -2]\n",
"\n",
"# Perform the relative value by operate division on all the features\n",
"df_relative = df2.iloc[:, :-2] / df1.iloc[:, :-2]\n",
"\n",
"# Add the label column back to the resulting DataFrame\n",
"df_relative['label'] = label_column\n",
"\n",
"# Append a string to all column names\n",
"suffix = '_rel'\n",
"df_relative.columns = [col + suffix if col != 'label' else col for col in df_relative.columns]\n",
"\n",
"# Display the first 5 rows of the resulting DataFrame\n",
"df_relative\n",
"df_relative.info()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Subsetting DataFrame to see the pair plots due to many features"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import seaborn as sns\n",
"import matplotlib.pyplot as plt\n",
"\n",
"# Assuming your DataFrame is named 'df'\n",
"\n",
"# Subsetting the DataFrame to include only the first 3 columns and the label\n",
"subset_df = df_relative[['Mean_rel', 'Max_rel', 'Peak (Pm)_rel', 'label']]\n",
"\n",
"# Plotting the pairplot\n",
"g = sns.pairplot(subset_df, hue='label', diag_kind='kde')\n",
"\n",
"# Adjusting the axis limits\n",
"for ax in g.axes.flatten():\n",
" ax.set_xlim(-10, 10) # Adjust these limits based on your data\n",
" ax.set_ylim(-10, 10) # Adjust these limits based on your data\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Standard Scaler"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.preprocessing import StandardScaler\n",
"\n",
"# Assuming 'subset_df' is your DataFrame\n",
"features = subset_df.columns[:-1] # Select all columns except the label\n",
"scaler = StandardScaler()\n",
"subset_df[features] = scaler.fit_transform(subset_df[features])\n",
"\n",
"# Plotting the pairplot\n",
"sns.pairplot(subset_df, hue='label', diag_kind='kde')\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Min-Max Scaler"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.preprocessing import MinMaxScaler\n",
"\n",
"# Assuming 'subset_df' is your DataFrame\n",
"features = subset_df.columns[:-1] # Select all columns except the label\n",
"scaler = MinMaxScaler()\n",
"subset_df[features] = scaler.fit_transform(subset_df[features])\n",
"\n",
"# Plotting the pairplot\n",
"sns.pairplot(subset_df, hue='label', diag_kind='kde')\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## QUGS Data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To test the `FeatureExtractor` class from the `time_domain_features.py` script with real data from QUGS that has been converted purposed for the thesis."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Importing Modules\n",
"\n",
"Use relative imports or modify the path to include the directory where the module is stored. In this example, well simulate the relative import setup."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create Real DataFrame\n",
"\n",
"Create one DataFrame from one of the raw data file. Simulate importing the `FeatureExtractor` from its relative path in the notebooks directory."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Convert to DataFrame (simulating processed data input)\n",
"single_data_dir = \"D:/thesis/data/converted/raw/DAMAGE_2/D2_TEST05_01.csv\"\n",
"df = pd.read_csv(single_data_dir)\n",
"df.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Absolute the data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df[df.columns[-1]] = df[df.columns[-1]].abs()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Visualize Data Points"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"# Plotting the data points\n",
"plt.figure(figsize=(8, 6))\n",
"plt.plot(df['Time'], df[df.columns[-1]], marker='o', color='blue', label='Data Points')\n",
"plt.title('Scatter Plot of Data Points')\n",
"plt.xlabel('Time')\n",
"plt.ylabel('Amp')\n",
"plt.legend()\n",
"plt.grid(True)\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Downsampled Plot with Alpha Blending\n",
"\n",
"Reduce the number of data points by sampling a subset of the data and use transparency to help visualize the density of overlapping points."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"# Downsample the data by taking every nth point\n",
"n = 1 # Adjust this value as needed\n",
"downsampled_df = df.iloc[::n, :]\n",
"\n",
"# Plotting the downsampled data points with alpha blending\n",
"plt.figure(figsize=(8, 6))\n",
"plt.plot(downsampled_df['Time'], downsampled_df[downsampled_df.columns[-1]], alpha=0.5, color='blue', label='Data Points')\n",
"plt.title('Scatter Plot of Downsampled Data Points')\n",
"plt.xlabel('Time')\n",
"plt.ylabel('Amp')\n",
"plt.legend()\n",
"plt.grid(True)\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Line Plot with Rolling Avg"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"# Calculate the rolling average\n",
"window_size = 50 # Adjust this value as needed\n",
"rolling_avg = df[df.columns[-1]].rolling(window=window_size).mean()\n",
"\n",
"# Plotting the original data points and the rolling average\n",
"plt.figure(figsize=(8, 6))\n",
"plt.plot(df['Time'], df[df.columns[-1]], alpha=0.3, color='blue', label='Original Data')\n",
"plt.plot(df['Time'], rolling_avg, color='red', label='Rolling Average')\n",
"plt.title('Line Plot with Rolling Average')\n",
"plt.xlabel('Time')\n",
"plt.ylabel('Amp')\n",
"plt.legend()\n",
"plt.grid(True)\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Print Time-domain Features (Single CSV Real Data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import sys\n",
"import os\n",
"# Assuming the src directory is one level up from the notebooks directory\n",
"sys.path.append('../src/features')\n",
"from time_domain_features import FeatureExtractor\n",
"\n",
"\n",
"# Extract features\n",
"extracted = FeatureExtractor(df[df.columns[-1]])\n",
"\n",
"# Format with pandas DataFramw\n",
"features = pd.DataFrame(extracted.features, index=[0])\n",
"features\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Print Time-domain Features (Multiple CSV Real Data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import sys\n",
"import os\n",
"import re\n",
"# Assuming the src directory is one level up from the notebooks directory\n",
"sys.path.append('../src/features')\n",
"from time_domain_features import ExtractTimeFeatures # use wrapper function instead of class for easy use\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### The function"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Define a function to extract numbers from a filename that later used as labels features\n",
"def extract_numbers(filename):\n",
" '''\n",
" Extract numbers from a filename\n",
"\n",
" Parameters\n",
" ----------\n",
" filename : str\n",
" The filename to extract numbers from\n",
"\n",
" Returns\n",
" -------\n",
" list\n",
" A list of extracted numbers: [damage_number, test_number, sensor_number]\n",
" '''\n",
" # Find all occurrences of one or more digits in the filename\n",
" numbers = re.findall(r'\\d+', filename)\n",
" # Convert the list of number strings to integers\n",
" numbers = [int(num) for num in numbers]\n",
" # Convert to a tuple and return\n",
" return numbers\n",
"\n",
"def build_features(input_dir:str, sensor:int=None, verbose:bool=False, absolute:bool=False):\n",
" all_features = []\n",
" for nth_damage in os.listdir(input_dir):\n",
" nth_damage_path = os.path.join(input_dir, nth_damage)\n",
" if verbose:\n",
" print(f'Extracting features from damage folder {nth_damage_path}')\n",
" if os.path.isdir(nth_damage_path):\n",
" for nth_test in os.listdir(nth_damage_path):\n",
" nth_test_path = os.path.join(nth_damage_path, nth_test)\n",
" # if verbose:\n",
" # print(f'Extracting features from {nth_test_path}')\n",
" if sensor is not None:\n",
" # Check if the file has the specified sensor suffix\n",
" if not nth_test.endswith(f'_{sensor:02}.csv'):\n",
" continue\n",
" # if verbose:\n",
" # print(f'Extracting features from {nth_test_path}')\n",
" features = ExtractTimeFeatures(nth_test_path, absolute=absolute) # return the one csv file feature in dictionary {}\n",
" if verbose:\n",
" print(features)\n",
" features['label'] = extract_numbers(nth_test)[0] # add labels to the dictionary\n",
" features['filename'] = nth_test # add filename to the dictionary\n",
" all_features.append(features)\n",
"\n",
" # Create a DataFrame from the list of dictionaries\n",
" df = pd.DataFrame(all_features)\n",
" return df"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Execute the automation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data_dir = \"D:/thesis/data/converted/raw\"\n",
"# Extract features\n",
"df1 = build_features(data_dir, sensor=1, verbose=True, absolute=True)\n",
"df2 = build_features(data_dir, sensor=2, verbose=True, absolute=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df1.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import seaborn as sns\n",
"import matplotlib.pyplot as plt\n",
"\n",
"# Assuming your DataFrame is named 'df'\n",
"\n",
"# Subsetting the DataFrame to include only the first 3 columns and the label\n",
"subset_df = df1[['Mean', 'Max', 'Peak (Pm)', 'label']]\n",
"\n",
"# Plotting the pairplot\n",
"g = sns.pairplot(subset_df, hue='label', diag_kind='kde')\n",
"\n",
"# Adjusting the axis limits\n",
"# for ax in g.axes.flatten():\n",
"# ax.set_xlim(-10, 10) # Adjust these limits based on your data\n",
"# ax.set_ylim(-10, 10) # Adjust these limits based on your data\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df2.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import seaborn as sns\n",
"import matplotlib.pyplot as plt\n",
"\n",
"# Assuming your DataFrame is named 'df'\n",
"\n",
"# Subsetting the DataFrame to include only the first 3 columns and the label\n",
"subset_df = df2[['Mean', 'Max', 'Standard Deviation', 'Kurtosis', 'label']]\n",
"\n",
"# Plotting the pairplot\n",
"g = sns.pairplot(subset_df, hue='label', diag_kind='kde')\n",
"\n",
"# Adjusting the axis limits\n",
"# for ax in g.axes.flatten():\n",
"# ax.set_xlim(-10, 10) # Adjust these limits based on your data\n",
"# ax.set_ylim(-10, 10) # Adjust these limits based on your data\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Perform division"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Separate the label column\n",
"label_column = df1.iloc[:, -2]\n",
"\n",
"# Perform the relative value by operate division on all the features\n",
"df_relative = df2.iloc[:, :-2] / df1.iloc[:, :-2]\n",
"\n",
"# Add the label column back to the resulting DataFrame\n",
"df_relative['label'] = label_column\n",
"\n",
"# Append a string to all column names\n",
"suffix = '_rel'\n",
"df_relative.columns = [col + suffix if col != 'label' else col for col in df_relative.columns]\n",
"\n",
"# Display the first 5 rows of the resulting DataFrame\n",
"df_relative"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Subsetting DataFrame to see the pair plots due to many features"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import seaborn as sns\n",
"import matplotlib.pyplot as plt\n",
"\n",
"# Assuming your DataFrame is named 'df'\n",
"\n",
"# Subsetting the DataFrame to include only the first 3 columns and the label\n",
"subset_df = df_relative[['Mean_rel', 'Max_rel', 'Peak (Pm)_rel', 'label']]\n",
"\n",
"# Plotting the pairplot\n",
"g = sns.pairplot(subset_df, hue='label', diag_kind='kde')\n",
"\n",
"# Adjusting the axis limits\n",
"# for ax in g.axes.flatten():\n",
"# ax.set_xlim(-10, 10) # Adjust these limits based on your data\n",
"# ax.set_ylim(-10, 10) # Adjust these limits based on your data\n",
"\n",
"plt.show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
}
},
"nbformat": 4,
"nbformat_minor": 2
}