EmailMaestro
Mission: Automates email and communication workflows, crafting release notes, bug alerts, and announcements from commit messages or contextual data.
Parallel Email Dispatch with Error Handling & Templating
import logging
from typing import List, Dict
import concurrent.futures
import smtplib
from email.mime.text import MIMEText
from jinja2 import Template
logging.basicConfig(level=logging.INFO)
class EmailMaestroAgent:
def __init__(self, smtp_server: str, smtp_port: int, username: str, password: str):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
def draft_emails(self, context_data: List[Dict[str, str]], template_path: str) -> List[str]:
"""
Generates personalized email bodies based on a Jinja2 template and context data.
"""
with open(template_path, 'r', encoding='utf-8') as f:
template_str = f.read()
template = Template(template_str)
drafted_emails = []
for cdata in context_data:
rendered = template.render(**cdata)
drafted_emails.append(rendered)
return drafted_emails
def send_emails(self, drafted_emails: List[str], recipients: List[str], subject: str):
"""
Sends out emails in parallel. If an error occurs for one recipient, logs it but continues with others.
"""
def send_single_email(body: str, recipient: str):
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.username
msg['To'] = recipient
try:
with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server:
server.login(self.username, self.password)
server.send_message(msg)
logging.info(f"Email sent to {recipient}")
except Exception as e:
logging.error(f"Failed to send email to {recipient}: {str(e)}")
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
futures = []
for i, draft in enumerate(drafted_emails):
if i < len(recipients):
futures.append(executor.submit(send_single_email, draft, recipients[i]))
concurrent.futures.wait(futures)
# Usage:
# recipients_list = ["dev1@example.com", "dev2@example.com", ...]
# context_data_list = [
# {'bug_id': 1234, 'severity': 'High', 'description': 'NullPointerException in Payment Service'},
# {'bug_id': 5678, 'severity': 'Low', 'description': 'UI color mismatch on Settings page'},
# ...
# ]
# agent = EmailMaestroAgent(smtp_server="smtp.mailserver.com", smtp_port=465, username="me", password="secret")
# drafted = agent.draft_emails(context_data_list, template_path="bug_alert_template.j2")
# agent.send_emails(drafted, recipients_list, subject="[URGENT] Bug Alert")
Last updated