CodeSynth
Mission: Automates code generation, structural refactoring, and performance improvements.
Complex Python Example with Parallel Task Execution and Logging
import math
import logging
import concurrent.futures
from typing import Dict, Any, List
logging.basicConfig(level=logging.INFO)
# === Original Code with Performance Bottlenecks and Minimal Error Handling ===
def calculate_employee_compensation(employees: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Calculates compensation for each employee based on custom rate multipliers,
overhead tasks, and historical performance. This version is naive, can be slow,
and lacks concurrency optimizations.
"""
results = []
for emp in employees:
base = emp.get('base_salary', 50000)
overhead_tasks = emp.get('tasks', [])
overhead_sum = 0
for task in overhead_tasks:
overhead_sum += math.sqrt(len(task) * 20) # naive overhead calculation
# Performance multiplier is poorly handled
performance_score = emp.get('performance_score', 0.0)
total_comp = base + overhead_sum + (performance_score * 1.15)
results.append({
'id': emp.get('id', 'unknown'),
'compensation': round(total_comp, 2)
})
return results
# === CodeSynth's Proposed Advanced Refactor ===
def calculate_employee_compensation_refactored(employees: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Refactored version:
1) Leverages concurrency to process employees in parallel.
2) Includes robust error handling and validations (e.g., ensuring base_salary is numeric).
3) Uses improved overhead computation with caching for repeated tasks.
4) Logs key milestones for better observability.
"""
if not isinstance(employees, list):
logging.error("Expected 'employees' to be a list.")
return []
results = []
# Cache overhead calculations for repeated tasks to reduce time complexity
overhead_cache = {}
def process_employee(emp: Dict[str, Any]) -> Dict[str, Any]:
try:
base = float(emp.get('base_salary', 50000))
overhead_tasks = emp.get('tasks', [])
perf_score = float(emp.get('performance_score', 0))
overhead_sum = 0
for task in overhead_tasks:
if task not in overhead_cache:
overhead_cache[task] = math.sqrt(len(task) * 20)
overhead_sum += overhead_cache[task]
total_comp = base + overhead_sum + (perf_score * 1.15)
employee_id = emp.get('id', 'unknown')
logging.info(f"Processed employee ID {employee_id} with overhead {round(overhead_sum, 2)}.")
return {'id': employee_id, 'compensation': round(total_comp, 2)}
except (ValueError, TypeError) as e:
logging.warning(f"Skipping employee due to data error: {str(e)} => {emp}")
return {'id': 'invalid', 'compensation': 0}
# Utilize ThreadPoolExecutor for parallel processing
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
future_to_emp = {executor.submit(process_employee, emp): emp for emp in employees}
for future in concurrent.futures.as_completed(future_to_emp):
result = future.result()
results.append(result)
return results
# Example usage:
# employees_data = [
# {'id': 101, 'base_salary': 60000, 'tasks': ['reporting', 'analysis'], 'performance_score': 2.5},
# {'id': 102, 'base_salary': 52000, 'tasks': ['data_entry', 'meetings'], 'performance_score': 1.2},
# {'id': 103, 'base_salary': 75000, 'tasks': ['architecture', 'design'], 'performance_score': 3.1},
# ]
# refined_results = calculate_employee_compensation_refactored(employees_data)
# print(refined_results)
Last updated