Source code for pymailai.client

"""Email client implementation."""

import email
import logging
from email.message import EmailMessage
from typing import AsyncGenerator, Optional

import aioimaplib
import aiosmtplib

from pymailai.base_client import BaseEmailClient
from pymailai.config import EmailConfig
from pymailai.message import EmailData

logger = logging.getLogger(__name__)


[docs] class EmailClient(BaseEmailClient): """Asynchronous email client for IMAP and SMTP operations."""
[docs] def __init__(self, config: EmailConfig): """Initialize email client with configuration.""" self.config = config self._imap: Optional[aioimaplib.IMAP4_SSL] = None self._smtp: Optional[aiosmtplib.SMTP] = None
[docs] async def connect(self) -> None: """Establish connections to IMAP and SMTP servers.""" # Connect to IMAP self._imap = aioimaplib.IMAP4_SSL( host=self.config.imap_server, port=self.config.imap_port, timeout=self.config.timeout, ) await self._imap.wait_hello_from_server() logger.debug( "Connected to IMAP server %s:%s", self.config.imap_server, self.config.imap_port, ) await self._imap.login(self.config.email, self.config.password) logger.debug("Logged in to IMAP server as %s", self.config.email) await self._imap.select(self.config.folder) logger.debug("Selected IMAP folder: %s", self.config.folder) # Connect to SMTP with appropriate security if self.config.smtp_port == 465: # Port 465 uses implicit SSL/TLS self._smtp = aiosmtplib.SMTP( hostname=self.config.smtp_server, port=self.config.smtp_port, timeout=self.config.timeout, use_tls=True, # Immediate TLS for port 465 ) await self._smtp.connect() logger.debug( "Connected to SMTP server %s:%s with SSL/TLS", self.config.smtp_server, self.config.smtp_port, ) else: # Ports 25 and 587 start unencrypted self._smtp = aiosmtplib.SMTP( hostname=self.config.smtp_server, port=self.config.smtp_port, timeout=self.config.timeout, use_tls=False, ) await self._smtp.connect() logger.debug( "Connected to SMTP server %s:%s", self.config.smtp_server, self.config.smtp_port, ) # Use STARTTLS for port 587 if self.config.smtp_port == 587: await self._smtp.starttls() await self._smtp.login(self.config.email, self.config.password) logger.debug("Logged in to SMTP server as %s", self.config.email)
[docs] async def disconnect(self) -> None: """Close connections to email servers.""" if self._imap: try: await self._imap.logout() logger.debug("Disconnected from IMAP server") except Exception as e: logger.warning("Error disconnecting from IMAP: %s", e) self._imap = None if self._smtp: try: await self._smtp.quit() logger.debug("Disconnected from SMTP server") except Exception as e: logger.warning("Error disconnecting from SMTP: %s", e) self._smtp = None
[docs] async def fetch_new_messages(self) -> AsyncGenerator[EmailData, None]: """Fetch new unread messages from the IMAP server.""" if not self._imap: raise RuntimeError("Not connected to IMAP server") # Search for unread messages _, data = await self._imap.search("UNSEEN") message_numbers = data[0].decode().split() logger.debug("Found %d unread messages", len(message_numbers)) for num in message_numbers: try: # Format message number for IMAP - ensure it's a valid message set _, msg_data = await self._imap.fetch(num, "(RFC822)") if not msg_data or not msg_data[0]: logger.warning("No data returned for message %s", num) continue except Exception as e: logger.error(f"Failed to fetch message {num}: {str(e)}") continue # When using RFC822, the email data is always the second element in the response email_body = msg_data[1] if not email_body: logger.warning( "Could not find email body in message data for message %s", num ) continue # Parse email message try: email_message = email.message_from_bytes( email_body, policy=email.policy.default ) if not isinstance(email_message, EmailMessage): # Convert Message to EmailMessage if needed temp_message = EmailMessage() temp_message.set_content(email_message.as_string()) email_message = temp_message except Exception as e: logger.error(f"Failed to parse email message: {str(e)}") continue # Convert to our EmailData format email_data = EmailData.from_email_message(email_message) yield email_data
[docs] async def send_message(self, message: EmailData) -> None: """Send an email message via SMTP.""" if not self._smtp: raise RuntimeError("Not connected to SMTP server") logger.debug("Sending message to %s", ", ".join(message.to_addresses)) email_message = message.to_email_message() await self._smtp.send_message(email_message) logger.debug("Message sent successfully")
[docs] async def mark_as_read(self, message_id: str) -> None: """Mark a message as read using its Message-ID.""" if not self._imap: raise RuntimeError("Not connected to IMAP server") # Search for the message by Message-ID _, data = await self._imap.search(f'HEADER "Message-ID" "{message_id}"') message_numbers = data[0].decode().split() if message_numbers: try: logger.debug("Marking message %s as read", message_id) # Mark the message as seen using the raw message number await self._imap.store(message_numbers[0], "+FLAGS", "\\Seen") except Exception as e: logger.error(f"Failed to mark message {message_id} as read: {str(e)}")
async def __aenter__(self) -> "EmailClient": """Async context manager entry.""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Async context manager exit.""" await self.disconnect()