Source code for pymailai.agent

"""Email agent for AI-powered email processing."""

import asyncio
import logging
from typing import Any, Callable, Coroutine, Optional, Union, cast

from pymailai.base_client import BaseEmailClient
from pymailai.client import EmailClient
from pymailai.config import EmailConfig
from pymailai.message import EmailData

logger = logging.getLogger(__name__)

# Type alias for message handler
MessageHandler = Callable[[EmailData], Coroutine[Any, Any, Optional[EmailData]]]


[docs] class EmailAgent: """Process incoming emails and generate responses using AI."""
[docs] def __init__( self, config_or_client: Union[EmailConfig, BaseEmailClient], message_handler: Optional[MessageHandler] = None, ): """Initialize the email agent. Args: config_or_client: Either EmailConfig for IMAP/SMTP or a BaseEmailClient instance message_handler: Optional async callback for custom message processing """ self.config = ( config_or_client if isinstance(config_or_client, EmailConfig) else None ) self.message_handler = message_handler self._client = ( None if isinstance(config_or_client, EmailConfig) else config_or_client ) self._running = False self._task: Optional[asyncio.Task] = None
[docs] async def process_message(self, message: EmailData) -> Optional[EmailData]: """Process an incoming email message. This method can be overridden to implement custom processing logic. By default, it calls the message_handler if one was provided. Args: message: The incoming email message to process Returns: Optional response message to send """ if self.message_handler: return await self.message_handler(message) return None
async def _check_messages(self) -> None: """Poll for new messages and process them with error handling and retries.""" if not self._client: logger.error("Email client not initialized") return try: # Process messages as they come in async for message in self._client.fetch_new_messages(): # Define max_retries at the start of message processing max_retries = 3 response = None try: # Mark message as read immediately if configured if getattr(self.config, "mark_seen_immediately", True): for attempt in range(max_retries): try: await self._client.mark_as_read(message.message_id) break except Exception as e: if attempt == max_retries - 1: logger.error( f"Failed to mark message as read after {max_retries} attempts: {e}", # noqa E501 exc_info=True, ) raise wait_time = min(2**attempt, 30) logger.warning( f"Failed to mark message as read (attempt {attempt + 1}/{max_retries}), " # noqa E501 f"retrying in {wait_time} seconds: {e}" ) await asyncio.sleep(wait_time) # Process the message response = await self.process_message(message) # Send response if one was generated if response and self._client: try: await self._client.send_message(response) except Exception as e: logger.error(f"Failed to send response: {e}", exc_info=True) # Don't retry here as send_message already has retry logic except Exception as e: logger.error(f"Error processing message: {e}", exc_info=True) # Only mark as read if we haven't already done so if not getattr(self.config, "mark_seen_immediately", True): if self._client: for attempt in range(max_retries): try: await self._client.mark_as_read(message.message_id) break except Exception as mark_err: if attempt == max_retries - 1: logger.error( f"Failed to mark errored message as read after {max_retries} attempts: {mark_err}", # noqa E501 exc_info=True, ) break wait_time = min(2**attempt, 30) logger.warning( f"Failed to mark errored message as read (attempt {attempt + 1}/{max_retries}), " # noqa E501 f"retrying in {wait_time} seconds: {mark_err}" ) await asyncio.sleep(wait_time) except Exception as e: logger.error(f"Error fetching messages: {e}", exc_info=True) # Add delay before next fetch attempt to prevent rapid retries on persistent errors await asyncio.sleep(5) async def _run(self) -> None: """Run loop for the email agent with connection management.""" if not self._client and self.config: self._client = cast(BaseEmailClient, EmailClient(self.config)) while self._running: try: if not self._client: raise RuntimeError("Email client not initialized") async with self._client: while self._running: try: await self._check_messages() # Use config interval if available, otherwise default to 60 seconds await asyncio.sleep( self.config.check_interval if self.config else 60 ) except Exception as e: logger.error( f"Error in message check loop: {e}", exc_info=True ) # Add delay before retry to prevent rapid retries on persistent errors await asyncio.sleep(5) except Exception as e: logger.error(f"Connection error in agent run loop: {e}", exc_info=True) if self._running: # Wait before attempting to reconnect await asyncio.sleep(10) continue finally: self._client = None
[docs] async def start(self) -> None: """Start the email agent.""" if self._running: return self._running = True self._task = asyncio.create_task(self._run()) logger.info("Email agent started")
[docs] async def stop(self) -> None: """Stop the email agent.""" if not self._running: return self._running = False if self._task: try: await self._task except asyncio.CancelledError: pass # Task was cancelled, which is expected self._task = None logger.info("Email agent stopped")
[docs] async def __aenter__(self) -> "EmailAgent": """Async context manager entry.""" await self.start() return self
[docs] async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Async context manager exit.""" await self.stop()