[FEAT] Redesign convert.py #38

Closed
opened 2025-03-17 13:48:09 +00:00 by nuluh · 0 comments
nuluh commented 2025-03-17 13:48:09 +00:00 (Migrated from github.com)

Problem Statement

Current convert.py seems quite hard to model/illustrate into flow diagram.

Proposed Solution

import pandas as pd
import os
import sys
from colorama import Fore, Style, init
from typing import List, Dict, Tuple, Callable, Optional
from functools import partial, reduce
import pathlib

# Pure function to generate column names
def generate_column_names() -> List[str]:
    return ['Real'] + [f'Real.{i}' for i in range(1, 30)]

# Pure function to create damage scenario mapping
def create_damage_scenarios() -> Dict[int, range]:
    return {
        1: range(1, 6),    # Damage 1 files from zzzAD1.csv to zzzAD5.csv
        2: range(6, 11),   # Damage 2 files from zzzAD6.csv to zzzAD10.csv
        3: range(11, 16),  # Damage 3 files from zzzAD11.csv to zzzAD15.csvs
        4: range(16, 21),  # Damage 4 files from zzzAD16.csv to zzzAD20.csv
        5: range(21, 26),  # Damage 5 files from zzzAD21.csv to zzzAD25.csv
        6: range(26, 31)   # Damage 6 files from zzzAD26.csv to zzzAD30.csv
    }

# Pure function to create sensor mapping
def create_sensor_end_map() -> Dict[int, str]:
    return {1: 'Real.25', 2: 'Real.26', 3: 'Real.27', 4: 'Real.28', 5: 'Real.29'}

# Pure function to create file path
def create_file_path(base_path: str, prefix: str, file_index: int) -> str:
    return os.path.join(base_path, f'zzz{prefix}D{file_index}.TXT')

# Pure function to create output file paths
def create_output_paths(output_base: str, damage: int, test_num: int) -> Tuple[str, str]:
    output_dir = os.path.join(output_base, f'DAMAGE_{damage}')
    output_file_1 = os.path.join(output_dir, f'DAMAGE{damage}_TEST{test_num}_01.csv')
    output_file_2 = os.path.join(output_dir, f'DAMAGE{damage}_TEST{test_num}_02.csv')
    return output_file_1, output_file_2

# Pure function to load dataframe
def load_dataframe(file_path: str) -> pd.DataFrame:
    return pd.read_csv(file_path, sep='\t', skiprows=10)

# Pure function to extract columns from dataframe
def extract_columns(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
    return df[['Time', column_name]]

# Pure function that defines a processing task (returns a function)
def create_processing_task(
    base_path: str, 
    output_base: str, 
    prefix: str, 
    damage: int, 
    file_index: int, 
    test_num: int, 
    top_sensor: str, 
    bottom_sensor: str
) -> Callable[[], List[Tuple[str, pd.DataFrame, str]]]:
    """Returns a function that when called will process a single file and return tasks to be executed"""
    def task() -> List[Tuple[str, pd.DataFrame, str]]:
        file_path = create_file_path(base_path, prefix, file_index)
        output_file_1, output_file_2 = create_output_paths(output_base, damage, test_num)
        
        # Load the data (this is pure in the sense that it doesn't modify state, though it does read from disk)
        df = load_dataframe(file_path)
        
        # Extract the relevant columns (pure transformation)
        df1 = extract_columns(df, top_sensor)
        df2 = extract_columns(df, bottom_sensor)
        
        # Return operations to be performed
        return [
            (output_file_1, df1, f"Processing {file_path} -> {output_file_1} (column {top_sensor})"),
            (output_file_2, df2, f"Processing {file_path} -> {output_file_2} (column {bottom_sensor})")
        ]
    
    return task

# Pure function to generate all processing tasks
def generate_tasks(base_path: str, output_base: str, prefix: str) -> List[Callable[[], List[Tuple[str, pd.DataFrame, str]]]]:
    damage_scenarios = create_damage_scenarios()
    sensor_end_map = create_sensor_end_map()
    columns = generate_column_names()
    
    tasks = []
    
    for damage, files in damage_scenarios.items():
        for i, file_index in enumerate(files, start=1):
            top_sensor = columns[i-1]
            bottom_sensor = sensor_end_map[i]
            
            task = create_processing_task(
                base_path, output_base, prefix, damage, file_index, i, top_sensor, bottom_sensor
            )
            tasks.append(task)
    
    return tasks

# Effect function to ensure directory exists
def ensure_directory(path: str) -> None:
    pathlib.Path(path).parent.mkdir(parents=True, exist_ok=True)

# Effect function to save dataframe to file
def save_dataframe(output_file: str, df: pd.DataFrame) -> None:
    ensure_directory(output_file)
    df.to_csv(output_file, index=False)

# Effect function to execute a task
def execute_task(task: Callable[[], List[Tuple[str, pd.DataFrame, str]]]) -> None:
    operations = task()
    for output_file, df, log_message in operations:
        print(log_message)
        save_dataframe(output_file, df)
        print(Fore.GREEN + "Done")
    print("---")

# Effect function to execute all tasks
def execute_tasks(tasks: List[Callable[[], List[Tuple[str, pd.DataFrame, str]]]]) -> None:
    init(autoreset=True)  # Side effect: initialize colorama
    for task in tasks:
        execute_task(task)
    print(Fore.YELLOW + Style.BRIGHT + "All files have been created successfully.")

def main() -> None:
    if len(sys.argv) < 4:
        print("Usage: python convert.py <path_to_csv_files> <output_path> <prefix>")
        sys.exit(1)

    base_path = sys.argv[1]
    output_base = sys.argv[2]
    prefix = sys.argv[3]
    
    # Define the complete sequence of operations
    tasks = generate_tasks(base_path, output_base, prefix)
    
    # Execute tasks (side effects isolated here)
    execute_tasks(tasks)

if __name__ == "__main__":
    main()

Alternatives Considered

No response

Component

Data Processing

Priority

Medium (nice to have)

Implementation Ideas

No response

Expected Benefits

This approach will make the code more readable and easier to understand by clearly showing the data flow through the transformation pipeline.

Additional Context

No response

### Problem Statement Current `convert.py` seems quite hard to model/illustrate into flow diagram. ### Proposed Solution ```py import pandas as pd import os import sys from colorama import Fore, Style, init from typing import List, Dict, Tuple, Callable, Optional from functools import partial, reduce import pathlib # Pure function to generate column names def generate_column_names() -> List[str]: return ['Real'] + [f'Real.{i}' for i in range(1, 30)] # Pure function to create damage scenario mapping def create_damage_scenarios() -> Dict[int, range]: return { 1: range(1, 6), # Damage 1 files from zzzAD1.csv to zzzAD5.csv 2: range(6, 11), # Damage 2 files from zzzAD6.csv to zzzAD10.csv 3: range(11, 16), # Damage 3 files from zzzAD11.csv to zzzAD15.csvs 4: range(16, 21), # Damage 4 files from zzzAD16.csv to zzzAD20.csv 5: range(21, 26), # Damage 5 files from zzzAD21.csv to zzzAD25.csv 6: range(26, 31) # Damage 6 files from zzzAD26.csv to zzzAD30.csv } # Pure function to create sensor mapping def create_sensor_end_map() -> Dict[int, str]: return {1: 'Real.25', 2: 'Real.26', 3: 'Real.27', 4: 'Real.28', 5: 'Real.29'} # Pure function to create file path def create_file_path(base_path: str, prefix: str, file_index: int) -> str: return os.path.join(base_path, f'zzz{prefix}D{file_index}.TXT') # Pure function to create output file paths def create_output_paths(output_base: str, damage: int, test_num: int) -> Tuple[str, str]: output_dir = os.path.join(output_base, f'DAMAGE_{damage}') output_file_1 = os.path.join(output_dir, f'DAMAGE{damage}_TEST{test_num}_01.csv') output_file_2 = os.path.join(output_dir, f'DAMAGE{damage}_TEST{test_num}_02.csv') return output_file_1, output_file_2 # Pure function to load dataframe def load_dataframe(file_path: str) -> pd.DataFrame: return pd.read_csv(file_path, sep='\t', skiprows=10) # Pure function to extract columns from dataframe def extract_columns(df: pd.DataFrame, column_name: str) -> pd.DataFrame: return df[['Time', column_name]] # Pure function that defines a processing task (returns a function) def create_processing_task( base_path: str, output_base: str, prefix: str, damage: int, file_index: int, test_num: int, top_sensor: str, bottom_sensor: str ) -> Callable[[], List[Tuple[str, pd.DataFrame, str]]]: """Returns a function that when called will process a single file and return tasks to be executed""" def task() -> List[Tuple[str, pd.DataFrame, str]]: file_path = create_file_path(base_path, prefix, file_index) output_file_1, output_file_2 = create_output_paths(output_base, damage, test_num) # Load the data (this is pure in the sense that it doesn't modify state, though it does read from disk) df = load_dataframe(file_path) # Extract the relevant columns (pure transformation) df1 = extract_columns(df, top_sensor) df2 = extract_columns(df, bottom_sensor) # Return operations to be performed return [ (output_file_1, df1, f"Processing {file_path} -> {output_file_1} (column {top_sensor})"), (output_file_2, df2, f"Processing {file_path} -> {output_file_2} (column {bottom_sensor})") ] return task # Pure function to generate all processing tasks def generate_tasks(base_path: str, output_base: str, prefix: str) -> List[Callable[[], List[Tuple[str, pd.DataFrame, str]]]]: damage_scenarios = create_damage_scenarios() sensor_end_map = create_sensor_end_map() columns = generate_column_names() tasks = [] for damage, files in damage_scenarios.items(): for i, file_index in enumerate(files, start=1): top_sensor = columns[i-1] bottom_sensor = sensor_end_map[i] task = create_processing_task( base_path, output_base, prefix, damage, file_index, i, top_sensor, bottom_sensor ) tasks.append(task) return tasks # Effect function to ensure directory exists def ensure_directory(path: str) -> None: pathlib.Path(path).parent.mkdir(parents=True, exist_ok=True) # Effect function to save dataframe to file def save_dataframe(output_file: str, df: pd.DataFrame) -> None: ensure_directory(output_file) df.to_csv(output_file, index=False) # Effect function to execute a task def execute_task(task: Callable[[], List[Tuple[str, pd.DataFrame, str]]]) -> None: operations = task() for output_file, df, log_message in operations: print(log_message) save_dataframe(output_file, df) print(Fore.GREEN + "Done") print("---") # Effect function to execute all tasks def execute_tasks(tasks: List[Callable[[], List[Tuple[str, pd.DataFrame, str]]]]) -> None: init(autoreset=True) # Side effect: initialize colorama for task in tasks: execute_task(task) print(Fore.YELLOW + Style.BRIGHT + "All files have been created successfully.") def main() -> None: if len(sys.argv) < 4: print("Usage: python convert.py <path_to_csv_files> <output_path> <prefix>") sys.exit(1) base_path = sys.argv[1] output_base = sys.argv[2] prefix = sys.argv[3] # Define the complete sequence of operations tasks = generate_tasks(base_path, output_base, prefix) # Execute tasks (side effects isolated here) execute_tasks(tasks) if __name__ == "__main__": main() ``` ### Alternatives Considered _No response_ ### Component Data Processing ### Priority Medium (nice to have) ### Implementation Ideas _No response_ ### Expected Benefits This approach will make the code more readable and easier to understand by clearly showing the data flow through the transformation pipeline. ### Additional Context _No response_
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: nuluh/thesis#38