"""Email agent for AI-powered email processing."""
import asyncio
import logging
from typing import Any, Callable, Coroutine, Optional, Union, cast
from pymailai.base_client import BaseEmailClient
from pymailai.client import EmailClient
from pymailai.config import EmailConfig
from pymailai.message import EmailData
logger = logging.getLogger(__name__)
# Type alias for message handler
MessageHandler = Callable[[EmailData], Coroutine[Any, Any, Optional[EmailData]]]
[docs]
class EmailAgent:
"""Process incoming emails and generate responses using AI."""
[docs]
def __init__(
self,
config_or_client: Union[EmailConfig, BaseEmailClient],
message_handler: Optional[MessageHandler] = None,
):
"""Initialize the email agent.
Args:
config_or_client: Either EmailConfig for IMAP/SMTP or a BaseEmailClient instance
message_handler: Optional async callback for custom message processing
"""
self.config = (
config_or_client if isinstance(config_or_client, EmailConfig) else None
)
self.message_handler = message_handler
self._client = (
None if isinstance(config_or_client, EmailConfig) else config_or_client
)
self._running = False
self._task: Optional[asyncio.Task] = None
[docs]
async def process_message(self, message: EmailData) -> Optional[EmailData]:
"""Process an incoming email message.
This method can be overridden to implement custom processing logic.
By default, it calls the message_handler if one was provided.
Args:
message: The incoming email message to process
Returns:
Optional response message to send
"""
if self.message_handler:
return await self.message_handler(message)
return None
async def _check_messages(self) -> None:
"""Poll for new messages and process them."""
if not self._client:
logger.error("Email client not initialized")
return
try:
# Process messages as they come in
async for message in self._client.fetch_new_messages():
try:
# Process the message
response = await self.process_message(message)
# Mark original message as read
await self._client.mark_as_read(message.message_id)
# Send response if one was generated
if response and self._client:
await self._client.send_message(response)
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
# Still mark as read to avoid reprocessing
if self._client:
await self._client.mark_as_read(message.message_id)
except Exception as e:
logger.error(f"Error fetching messages: {e}", exc_info=True)
async def _run(self) -> None:
"""Run loop for the email agent."""
if not self._client and self.config:
self._client = cast(BaseEmailClient, EmailClient(self.config))
try:
if not self._client:
raise RuntimeError("Email client not initialized")
async with self._client:
while self._running:
await self._check_messages()
# Use config interval if available, otherwise default to 60 seconds
await asyncio.sleep(
self.config.check_interval if self.config else 60
)
finally:
self._client = None
[docs]
async def start(self) -> None:
"""Start the email agent."""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._run())
logger.info("Email agent started")
[docs]
async def stop(self) -> None:
"""Stop the email agent."""
if not self._running:
return
self._running = False
if self._task:
try:
await self._task
except asyncio.CancelledError:
pass # Task was cancelled, which is expected
self._task = None
logger.info("Email agent stopped")
[docs]
async def __aenter__(self) -> "EmailAgent":
"""Async context manager entry."""
await self.start()
return self
[docs]
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit."""
await self.stop()