ForgeAgents
  • Articles
    • Articles 1-3
      • FORGE® Article No . 1
      • FORGE® Article No . 2
      • FORGE® Article No . 3
  • FORGE
    • Introduction
    • Key Features
    • AI Agents
      • CodeSynth
      • DocSavvy
      • DataWiz
      • EmailMaestro
      • PerfGuard
      • ConfigWizard
      • InfraSage
      • UXOracle
      • Translator
    • Architecture
    • Integrations
    • Augmentations
    • Roadmap
    • Glossary
    • FAQ
    • Licensing
Powered by GitBook
On this page
  1. FORGE
  2. AI Agents

EmailMaestro

Mission: Automates email and communication workflows, crafting release notes, bug alerts, and announcements from commit messages or contextual data.

Parallel Email Dispatch with Error Handling & Templating

import logging
from typing import List, Dict
import concurrent.futures
import smtplib
from email.mime.text import MIMEText
from jinja2 import Template

logging.basicConfig(level=logging.INFO)

class EmailMaestroAgent:
    def __init__(self, smtp_server: str, smtp_port: int, username: str, password: str):
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.username = username
        self.password = password

    def draft_emails(self, context_data: List[Dict[str, str]], template_path: str) -> List[str]:
        """
        Generates personalized email bodies based on a Jinja2 template and context data.
        """
        with open(template_path, 'r', encoding='utf-8') as f:
            template_str = f.read()
        template = Template(template_str)

        drafted_emails = []
        for cdata in context_data:
            rendered = template.render(**cdata)
            drafted_emails.append(rendered)
        return drafted_emails

    def send_emails(self, drafted_emails: List[str], recipients: List[str], subject: str):
        """
        Sends out emails in parallel. If an error occurs for one recipient, logs it but continues with others.
        """
        def send_single_email(body: str, recipient: str):
            msg = MIMEText(body)
            msg['Subject'] = subject
            msg['From'] = self.username
            msg['To'] = recipient

            try:
                with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server:
                    server.login(self.username, self.password)
                    server.send_message(msg)
                logging.info(f"Email sent to {recipient}")
            except Exception as e:
                logging.error(f"Failed to send email to {recipient}: {str(e)}")

        with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
            futures = []
            for i, draft in enumerate(drafted_emails):
                if i < len(recipients):
                    futures.append(executor.submit(send_single_email, draft, recipients[i]))

            concurrent.futures.wait(futures)

# Usage:
# recipients_list = ["dev1@example.com", "dev2@example.com", ...]
# context_data_list = [
#     {'bug_id': 1234, 'severity': 'High', 'description': 'NullPointerException in Payment Service'},
#     {'bug_id': 5678, 'severity': 'Low', 'description': 'UI color mismatch on Settings page'},
#     ...
# ]
# agent = EmailMaestroAgent(smtp_server="smtp.mailserver.com", smtp_port=465, username="me", password="secret")
# drafted = agent.draft_emails(context_data_list, template_path="bug_alert_template.j2")
# agent.send_emails(drafted, recipients_list, subject="[URGENT] Bug Alert")
PreviousDataWizNextPerfGuard

Last updated 4 months ago