Source code for pymailai.gmail_client

"""Gmail API client implementation."""

import base64
import logging
from datetime import datetime
from email import utils
from typing import AsyncGenerator

from pymailai.base_client import BaseEmailClient
from pymailai.message import EmailData

logger = logging.getLogger(__name__)

[docs] class GmailClient(BaseEmailClient): """Asynchronous Gmail client using the Gmail API."""
[docs] def __init__(self, service): """Initialize Gmail client with service.""" self.service = service
[docs] async def connect(self) -> None: """No connection needed for Gmail API.""""Gmail API client ready")
[docs] async def disconnect(self) -> None: """No disconnection needed for Gmail API.""""Gmail API client closed")
[docs] async def fetch_new_messages(self) -> AsyncGenerator[EmailData, None]: """Fetch new unread messages using Gmail API.""" try: # Search for unread messages results = ( self.service.users() .messages() .list(userId="me", q="is:unread -in:chats") .execute() ) messages = results.get("messages") if not messages:"No unread messages found") return"Found {len(messages)} unread messages") for message in messages: try: # Get full message details msg = ( self.service.users() .messages() .get(userId="me", id=message["id"], format="full") .execute() ) # Create EmailData directly with Gmail message ID headers = {h["name"]: h["value"] for h in msg["payload"]["headers"]} # Extract body body_text = None body_html = None def extract_parts(part): nonlocal body_text, body_html if part.get("mimeType") == "multipart/alternative": # Handle nested multipart/alternative for subpart in part.get("parts", []): extract_parts(subpart) elif part.get("mimeType") == "multipart/mixed": # Handle nested multipart/mixed for subpart in part.get("parts", []): extract_parts(subpart) elif part.get("mimeType") == "text/plain": data = part.get("body", {}).get("data", "") if data: body_text = base64.urlsafe_b64decode(data).decode() elif part.get("mimeType") == "text/html": data = part.get("body", {}).get("data", "") if data: body_html = base64.urlsafe_b64decode(data).decode() # Start extraction from the payload if msg["payload"].get("mimeType", "").startswith("multipart/"): extract_parts(msg["payload"]) else: # Single part message data = msg["payload"]["body"].get("data", "") if data: content = base64.urlsafe_b64decode(data).decode() if msg["payload"]["mimeType"] == "text/html": body_html = content else: body_text = content # Parse timestamp from headers or use message internal date date_str = headers.get("Date") if date_str: timestamp = utils.parsedate_to_datetime(date_str) else: # Use internal date (Unix timestamp in seconds) timestamp = datetime.fromtimestamp( int(msg["internalDate"]) / 1000 ) # Create EmailData with Gmail message ID email_data = EmailData( message_id=msg["id"], # Use Gmail message ID directly subject=headers.get("Subject", ""), from_address=headers.get("From", ""), to_addresses=[ addr.strip() for addr in headers.get("To", "").split(",") if addr.strip() ], cc_addresses=[ addr.strip() for addr in headers.get("Cc", "").split(",") if addr.strip() ], body_text=body_text or "", body_html=body_html, timestamp=timestamp, references=headers.get("References", ""), in_reply_to=headers.get("In-Reply-To", ""), ) yield email_data except Exception as e: logger.error(f"Failed to process message {message['id']}: {str(e)}") continue except Exception as e: logger.error(f"Failed to fetch messages: {str(e)}")
[docs] async def send_message(self, message: EmailData) -> None: """Send an email message via Gmail API.""" try: # Convert to EmailMessage email_message = message.to_email_message() # Encode the message encoded_message = base64.urlsafe_b64encode( email_message.as_bytes() ).decode() # Create the Gmail API message gmail_message = {"raw": encoded_message} # Send the message result = ( self.service.users() .messages() .send(userId="me", body=gmail_message) .execute() )"Message sent successfully with ID: {result.get('id')}") except Exception as e: logger.error(f"Failed to send message: {str(e)}") raise
[docs] async def mark_as_read(self, message_id: str) -> None: """Mark a message as read using Gmail API. Args: message_id: Gmail message ID to mark as read """ try: # Directly modify the message using its Gmail ID self.service.users().messages().modify( userId="me", id=message_id, body={"removeLabelIds": ["UNREAD"]}, ).execute()"Marked message {message_id} as read") except Exception as e: logger.error(f"Failed to mark message as read: {str(e)}")
async def __aenter__(self) -> "GmailClient": """Async context manager entry.""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Async context manager exit.""" await self.disconnect()