Source code for pymailai.client

"""Email client implementation."""

import asyncio
import email
import logging
from email.message import EmailMessage
from typing import AsyncGenerator, Optional, cast

import aioimaplib
import aiosmtplib

from pymailai.base_client import BaseEmailClient
from pymailai.config import EmailConfig
from pymailai.message import EmailData

logger = logging.getLogger(__name__)


[docs] class EmailClient(BaseEmailClient): """Asynchronous email client for IMAP and SMTP operations."""
[docs] def __init__(self, config: EmailConfig): """Initialize email client with configuration.""" self.config = config self._imap: Optional[aioimaplib.IMAP4_SSL] = None self._smtp: Optional[aiosmtplib.SMTP] = None
async def _connect_smtp(self) -> None: """Establish connection to SMTP server.""" if self._smtp: try: await self._smtp.quit() except Exception: pass self._smtp = None # Connect to SMTP with appropriate security if self.config.smtp_port == 465: # Port 465 uses implicit SSL/TLS self._smtp = aiosmtplib.SMTP( hostname=self.config.smtp_server, port=self.config.smtp_port, timeout=self.config.timeout, use_tls=True, # Immediate TLS for port 465 ) await self._smtp.connect() logger.debug( "Connected to SMTP server %s:%s with SSL/TLS", self.config.smtp_server, self.config.smtp_port, ) else: # Ports 25 and 587 start unencrypted self._smtp = aiosmtplib.SMTP( hostname=self.config.smtp_server, port=self.config.smtp_port, timeout=self.config.timeout, use_tls=False, ) await self._smtp.connect() logger.debug( "Connected to SMTP server %s:%s", self.config.smtp_server, self.config.smtp_port, ) # Use STARTTLS for port 587 if self.config.smtp_port == 587: await self._smtp.starttls() await self._smtp.login(self.config.email, self.config.password) logger.debug("Logged in to SMTP server as %s", self.config.email) async def _ensure_smtp_connection(self) -> None: """Ensure SMTP connection is active and reconnect if needed.""" try: if not self._smtp: await self._connect_smtp() else: # Test connection with NOOP try: await self._smtp.noop() except Exception as e: logger.warning("SMTP connection lost, reconnecting: %s", e) await self._connect_smtp() except Exception as e: logger.error("Failed to ensure SMTP connection: %s", e) raise async def _connect_imap(self) -> None: """Establish connection to IMAP server.""" if self._imap: try: await self._imap.logout() except Exception: pass self._imap = None # Connect to IMAP self._imap = aioimaplib.IMAP4_SSL( host=self.config.imap_server, port=self.config.imap_port, timeout=self.config.timeout, ) await self._imap.wait_hello_from_server() logger.debug( "Connected to IMAP server %s:%s", self.config.imap_server, self.config.imap_port, ) await self._imap.login(self.config.email, self.config.password) logger.debug("Logged in to IMAP server as %s", self.config.email) await self._imap.select(self.config.folder) logger.debug("Selected IMAP folder: %s", self.config.folder) async def _ensure_imap_connection(self) -> None: """Ensure IMAP connection is active and reconnect if needed.""" try: if not self._imap: await self._connect_imap() else: # Test connection with NOOP try: imap = cast(aioimaplib.IMAP4_SSL, self._imap) await asyncio.wait_for(imap.noop(), timeout=self.config.timeout) except Exception as e: logger.warning("IMAP connection lost, reconnecting: %s", e) await self._connect_imap() except Exception as e: logger.error("Failed to ensure IMAP connection: %s", e) raise
[docs] async def connect(self) -> None: """Establish connections to IMAP and SMTP servers.""" await self._connect_imap() await self._connect_smtp()
[docs] async def disconnect(self) -> None: """Close connections to email servers.""" if self._imap: try: await self._imap.logout() logger.debug("Disconnected from IMAP server") except Exception as e: logger.warning("Error disconnecting from IMAP: %s", e) self._imap = None if self._smtp: try: await self._smtp.quit() logger.debug("Disconnected from SMTP server") except Exception as e: logger.warning("Error disconnecting from SMTP: %s", e) self._smtp = None
[docs] async def fetch_new_messages(self) -> AsyncGenerator[EmailData, None]: """Fetch new unread messages from the IMAP server.""" max_retries = 3 retry_count = 0 while retry_count < max_retries: try: await self._ensure_imap_connection() imap = cast(aioimaplib.IMAP4_SSL, self._imap) # Search for unread messages with timeout _, data = await asyncio.wait_for( imap.search("UNSEEN"), timeout=self.config.timeout ) break except Exception as e: retry_count += 1 if retry_count == max_retries: logger.error( f"Failed to fetch messages after {max_retries} attempts: {e}" ) raise wait_time = min(2**retry_count, 30) logger.warning( f"Error fetching messages (attempt {retry_count}/{max_retries}), " f"retrying in {wait_time} seconds: {e}" ) await asyncio.sleep(wait_time) message_numbers = data[0].decode().split() logger.debug("Found %d unread messages", len(message_numbers)) for num in message_numbers: fetch_retries = 0 while fetch_retries < max_retries: try: await self._ensure_imap_connection() imap = cast(aioimaplib.IMAP4_SSL, self._imap) # Format message number for IMAP - ensure it's a valid message set _, msg_data = await asyncio.wait_for( imap.fetch(num, "(RFC822)"), timeout=self.config.timeout ) if not msg_data or not msg_data[0]: logger.warning("No data returned for message %s", num) break break except Exception as e: fetch_retries += 1 if fetch_retries == max_retries: logger.error( f"Failed to fetch message {num} after {max_retries} attempts: {e}" ) break wait_time = min(2**fetch_retries, 30) logger.warning( f"Error fetching message {num} (attempt {fetch_retries}/{max_retries}), " f"retrying in {wait_time} seconds: {e}" ) await asyncio.sleep(wait_time) if fetch_retries == max_retries: continue # When using RFC822, the email data is always the second element in the response email_body = msg_data[1] if not email_body: logger.warning( "Could not find email body in message data for message %s", num ) continue # Parse email message try: email_message = email.message_from_bytes( email_body, policy=email.policy.default ) if not isinstance(email_message, EmailMessage): # Convert Message to EmailMessage if needed temp_message = EmailMessage() temp_message.set_content(email_message.as_string()) email_message = temp_message except Exception as e: logger.error(f"Failed to parse email message: {str(e)}") continue # Convert to our EmailData format email_data = EmailData.from_email_message(email_message) yield email_data
[docs] async def send_message(self, message: EmailData, max_retries: int = 3) -> None: """Send an email message via SMTP with automatic reconnection and retries. Args: message: The email message to send max_retries: Maximum number of retry attempts (default: 3) """ retries = 0 last_error = None while retries <= max_retries: try: await self._ensure_smtp_connection() logger.debug("Sending message to %s", ", ".join(message.to_addresses)) email_message = message.to_email_message() if self._smtp: await self._smtp.send_message(email_message) logger.debug("Message sent successfully") return except Exception as e: last_error = e retries += 1 if retries <= max_retries: wait_time = min( 2**retries, 30 ) # Exponential backoff, max 30 seconds logger.warning( "Failed to send message (attempt %d/%d), retrying in %d seconds: %s", retries, max_retries, wait_time, e, ) await asyncio.sleep(wait_time) logger.error("Failed to send message after %d attempts", max_retries) raise last_error or RuntimeError("Failed to send message")
[docs] async def mark_as_read(self, message_id: str) -> None: """Mark a message as read using its Message-ID.""" max_retries = 3 retry_count = 0 while retry_count < max_retries: try: await self._ensure_imap_connection() imap = cast(aioimaplib.IMAP4_SSL, self._imap) # Search for the message by Message-ID with timeout _, data = await asyncio.wait_for( imap.search(f'HEADER "Message-ID" "{message_id}"'), timeout=self.config.timeout, ) message_numbers = data[0].decode().split() if message_numbers: logger.debug("Marking message %s as read", message_id) # Mark the message as seen using the raw message number with timeout await asyncio.wait_for( imap.store(message_numbers[0], "+FLAGS", "\\Seen"), timeout=self.config.timeout, ) return except Exception as e: retry_count += 1 if retry_count < max_retries: wait_time = min(2**retry_count, 30) logger.warning( f"Error marking message as read (attempt {retry_count}/{max_retries}), " f"retrying in {wait_time} seconds: {e}" ) await asyncio.sleep(wait_time) else: logger.error( f"Failed to mark message as read after {max_retries} attempts: {e}" )
async def __aenter__(self) -> "EmailClient": """Async context manager entry.""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Async context manager exit.""" await self.disconnect()
[docs] async def query_messages( self, query_params: dict ) -> AsyncGenerator[EmailData, None]: """Query messages using IMAP search criteria. Args: query_params: Dictionary containing query parameters: - after_date: Optional[str] - Messages after this date (YYYY-MM-DD) - before_date: Optional[str] - Messages before this date (YYYY-MM-DD) - subject: Optional[str] - Subject line contains this text - from_address: Optional[str] - Sender email address - to_address: Optional[str] - Recipient email address - label: Optional[str] - Message folder (defaults to INBOX) - unread_only: Optional[bool] - Only unread messages if True - include_body: Optional[bool] - Include message body in results """ max_retries = 3 retry_count = 0 while retry_count < max_retries: try: await self._ensure_imap_connection() imap = cast(aioimaplib.IMAP4_SSL, self._imap) # Build IMAP search criteria search_criteria = [] if query_params.get("after_date"): search_criteria.append(f'SINCE "{query_params["after_date"]}"') if query_params.get("before_date"): search_criteria.append(f'BEFORE "{query_params["before_date"]}"') if query_params.get("subject"): search_criteria.append(f'SUBJECT "{query_params["subject"]}"') if query_params.get("from_address"): search_criteria.append(f'FROM "{query_params["from_address"]}"') if query_params.get("to_address"): search_criteria.append(f'TO "{query_params["to_address"]}"') if query_params.get("unread_only"): search_criteria.append("UNSEEN") # If no criteria specified, search all messages search_command = " ".join(search_criteria) if search_criteria else "ALL" logger.debug(f"Executing IMAP search: {search_command}") # Switch to requested folder/label if specified if query_params.get("label"): await imap.select(query_params["label"]) # Execute search with timeout _, data = await asyncio.wait_for( imap.search(search_command), timeout=self.config.timeout ) break except Exception as e: retry_count += 1 if retry_count == max_retries: logger.error( f"Failed to search messages after {max_retries} attempts: {e}" ) raise wait_time = min(2**retry_count, 30) logger.warning( f"Error searching messages (attempt {retry_count}/{max_retries}), " f"retrying in {wait_time} seconds: {e}" ) await asyncio.sleep(wait_time) message_numbers = data[0].decode().split() logger.debug(f"Found {len(message_numbers)} matching messages") for num in message_numbers: fetch_retries = 0 while fetch_retries < max_retries: try: await self._ensure_imap_connection() imap = cast(aioimaplib.IMAP4_SSL, self._imap) # Fetch message data _, msg_data = await asyncio.wait_for( imap.fetch( num, "(RFC822)" if query_params.get("include_body") else "(RFC822.HEADER)", ), timeout=self.config.timeout, ) if not msg_data or not msg_data[0]: logger.warning(f"No data returned for message {num}") break break except Exception as e: fetch_retries += 1 if fetch_retries == max_retries: logger.error( f"Failed to fetch message {num} after {max_retries} attempts: {e}" ) break wait_time = min(2**fetch_retries, 30) logger.warning( f"Error fetching message {num} (attempt {fetch_retries}/{max_retries}), " f"retrying in {wait_time} seconds: {e}" ) await asyncio.sleep(wait_time) if fetch_retries == max_retries: continue # When using RFC822, the email data is always the second element in the response email_body = msg_data[1] if not email_body: logger.warning( f"Could not find email body in message data for message {num}" ) continue # Parse email message try: email_message = email.message_from_bytes( email_body, policy=email.policy.default ) if not isinstance(email_message, EmailMessage): # Convert Message to EmailMessage if needed temp_message = EmailMessage() temp_message.set_content(email_message.as_string()) email_message = temp_message except Exception as e: logger.error(f"Failed to parse email message: {str(e)}") continue # Convert to our EmailData format email_data = EmailData.from_email_message(email_message) yield email_data