Source code for pymailai.gmail_client

"""Gmail API client implementation."""

import base64
import logging
from datetime import datetime
from email import utils
from typing import AsyncGenerator, Optional, Tuple

from pymailai.base_client import BaseEmailClient
from pymailai.email_reply import ReplyBuilder
from pymailai.message import EmailData
from pymailai.text_processor import TextProcessor

logger = logging.getLogger(__name__)


[docs] class GmailClient(BaseEmailClient): """Asynchronous Gmail client using the Gmail API."""
[docs] def __init__(self, service): """Initialize Gmail client with service.""" self.service = service
[docs] async def connect(self) -> None: """No connection needed for Gmail API.""" logger.info("Gmail API client ready")
[docs] async def disconnect(self) -> None: """No disconnection needed for Gmail API.""" logger.info("Gmail API client closed")
[docs] async def fetch_new_messages(self) -> AsyncGenerator[EmailData, None]: """Fetch new unread messages using Gmail API.""" try: # Search for unread messages results = ( self.service.users() .messages() .list(userId="me", q="is:unread -in:chats") .execute() ) messages = results.get("messages") if not messages: logger.info("No unread messages found") return logger.info(f"Found {len(messages)} unread messages") for message in messages: try: # Get the thread ID first msg = ( self.service.users() .messages() .get( userId="me", id=message["id"], format="metadata", metadataHeaders=["threadId"], ) .execute() ) # Get all messages in the thread thread = ( self.service.users() .threads() .get(userId="me", id=msg["threadId"]) .execute() ) # Process all messages in the thread to build conversation history thread_parts = [] thread_html_parts = [] # Process messages in chronological order (oldest first) messages_in_thread = thread["messages"] is_conversation = len(messages_in_thread) > 1 for thread_msg in messages_in_thread: headers = { h["name"]: h["value"] for h in thread_msg["payload"]["headers"] } msg_text, msg_html = self._extract_message_content( thread_msg["payload"] ) if msg_text: processed_text = TextProcessor.process_text_with_quotes( msg_text ) if is_conversation: # Parse timestamp from headers date_str = headers.get("Date") if date_str: timestamp = utils.parsedate_to_datetime(date_str) else: timestamp = datetime.fromtimestamp( int(thread_msg["internalDate"]) / 1000 ) # Use ReplyBuilder to format the message thread_parts.append( ReplyBuilder.build_reply_body( original_text=processed_text, reply_text="", subject=headers.get("Subject", ""), timestamp=timestamp, from_address=headers.get("From", ""), ) ) else: thread_parts.append(processed_text) if msg_html: if is_conversation: # Parse timestamp from headers date_str = headers.get("Date") if date_str: timestamp = utils.parsedate_to_datetime(date_str) else: timestamp = datetime.fromtimestamp( int(thread_msg["internalDate"]) / 1000 ) # Add HTML with quote formatting thread_html_parts.append( f'<div class="email-quote">{msg_html}</div>' ) else: thread_html_parts.append(msg_html) elif ( msg_text and not msg_html and thread_msg["payload"] .get("mimeType", "") .startswith("multipart/") ): # Only convert text to HTML for multipart messages thread_html_parts.append(f"<pre>{msg_text}</pre>") # Use the last message's headers for the email metadata last_msg = thread["messages"][-1] headers = { h["name"]: h["value"] for h in last_msg["payload"]["headers"] } # Combine thread history body_text = TextProcessor.combine_text_parts( list(reversed(thread_parts)) ) body_html = ( "<br><br>".join(reversed(thread_html_parts)) if thread_html_parts else None ) # Parse timestamp from headers or use message internal date date_str = headers.get("Date") if date_str: timestamp = utils.parsedate_to_datetime(date_str) else: # Use internal date (Unix timestamp in seconds) timestamp = datetime.fromtimestamp( int(msg["internalDate"]) / 1000 ) # Create EmailData with the original unread message ID email_data = EmailData( message_id=message["id"], # Use the original unread message ID subject=headers.get("Subject", ""), from_address=headers.get("From", ""), to_addresses=[ addr.strip() for addr in headers.get("To", "").split(",") if addr.strip() ], cc_addresses=[ addr.strip() for addr in headers.get("Cc", "").split(",") if addr.strip() ], body_text=body_text or "", body_html=body_html, timestamp=timestamp, references=[ ref.strip() for ref in headers.get("References", "").split() if ref.strip() ], in_reply_to=headers.get("In-Reply-To", ""), ) yield email_data except Exception as e: logger.error(f"Failed to process message {message['id']}: {str(e)}") continue except Exception as e: logger.error(f"Failed to fetch messages: {str(e)}")
[docs] async def send_message(self, message: EmailData) -> None: """Send an email message via Gmail API.""" try: # Convert to EmailMessage email_message = message.to_email_message() # Encode the message encoded_message = base64.urlsafe_b64encode( email_message.as_bytes() ).decode() # Create the Gmail API message gmail_message = {"raw": encoded_message} # Send the message result = ( self.service.users() .messages() .send(userId="me", body=gmail_message) .execute() ) logger.info(f"Message sent successfully with ID: {result.get('id')}") except Exception as e: logger.error(f"Failed to send message: {str(e)}") raise
[docs] async def mark_as_read(self, message_id: str) -> None: """Mark a message as read using Gmail API. Args: message_id: Gmail message ID to mark as read """ try: # Directly modify the message using its Gmail ID self.service.users().messages().modify( userId="me", id=message_id, body={"removeLabelIds": ["UNREAD"]}, ).execute() logger.info(f"Marked message {message_id} as read") except Exception as e: logger.error(f"Failed to mark message as read: {str(e)}")
async def __aenter__(self) -> "GmailClient": """Async context manager entry.""" await self.connect() return self def _extract_message_content( self, payload: dict ) -> Tuple[Optional[str], Optional[str]]: """Extract text and HTML content from a Gmail message payload. Args: payload: Gmail message payload dictionary Returns: Tuple of (plain_text_content, html_content) """ def decode_part(part: dict) -> Optional[str]: """Decode content from a message part.""" data = part.get("body", {}).get("data", "") if data: return base64.urlsafe_b64decode(data).decode() return None def extract_content_recursive( part: dict, ) -> Tuple[Optional[str], Optional[str]]: """Recursively extract text and HTML content from message parts.""" mime_type = part.get("mimeType", "") if mime_type.startswith("multipart/"): text = None html = None for subpart in part.get("parts", []): subtext, subhtml = extract_content_recursive(subpart) if subtext and not text: text = subtext if subhtml and not html: html = subhtml return text, html else: content = decode_part(part) if content: if mime_type == "text/plain": return content, None elif mime_type == "text/html": return None, content return None, None return extract_content_recursive(payload) async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: """Async context manager exit.""" await self.disconnect()
[docs] async def query_messages( self, query_params: dict ) -> AsyncGenerator[EmailData, None]: """Query messages using Gmail API with specified parameters.""" try: # Build Gmail API query string query_parts = [] if query_params.get("after_date"): query_parts.append(f"after:{query_params['after_date']}") if query_params.get("before_date"): query_parts.append(f"before:{query_params['before_date']}") if query_params.get("subject"): query_parts.append(f"subject:{query_params['subject']}") if query_params.get("from_address"): query_parts.append(f"from:{query_params['from_address']}") if query_params.get("to_address"): query_parts.append(f"to:{query_params['to_address']}") if query_params.get("label"): query_parts.append(f"label:{query_params['label']}") if query_params.get("unread_only"): query_parts.append("is:unread") q = " ".join(query_parts) logger.info(f"Executing Gmail query: {q}") # Execute search results = self.service.users().messages().list(userId="me", q=q).execute() messages = results.get("messages", []) if not messages: logger.info("No messages found matching query") return logger.info(f"Found {len(messages)} matching messages") for message in messages: try: # Get full message data if body is requested, otherwise just metadata msg = ( self.service.users() .messages() .get( userId="me", id=message["id"], format="full" if query_params.get("include_body") else "metadata", metadataHeaders=[ "Subject", "From", "To", "Cc", "Date", "References", "In-Reply-To", ], ) .execute() ) # Extract headers headers = {h["name"]: h["value"] for h in msg["payload"]["headers"]} # Extract body content if requested body_text = None body_html = None if query_params.get("include_body", False): body_text, body_html = self._extract_message_content( msg["payload"] ) # Parse timestamp date_str = headers.get("Date") if date_str: timestamp = utils.parsedate_to_datetime(date_str) else: timestamp = datetime.fromtimestamp( int(msg["internalDate"]) / 1000 ) # Create EmailData email_data = EmailData( message_id=message["id"], subject=headers.get("Subject", ""), from_address=headers.get("From", ""), to_addresses=[ addr.strip() for addr in headers.get("To", "").split(",") if addr.strip() ], cc_addresses=[ addr.strip() for addr in headers.get("Cc", "").split(",") if addr.strip() ], body_text=body_text or "", body_html=body_html, timestamp=timestamp, references=[ ref.strip() for ref in headers.get("References", "").split() if ref.strip() ], in_reply_to=headers.get("In-Reply-To", ""), ) yield email_data except Exception as e: logger.error(f"Failed to process message {message['id']}: {str(e)}") continue except Exception as e: logger.error(f"Failed to query messages: {str(e)}")