"""Email message data structures and utilities."""
from dataclasses import dataclass
from datetime import datetime
from email import utils
from email.message import EmailMessage
from typing import Any, Dict, List, Optional, Tuple, Union
from pymailai.markdown_converter import MarkdownConverter
[docs]
@dataclass
class EmailData:
"""Represents processed email data."""
message_id: str
subject: str
from_address: str
to_addresses: List[str]
cc_addresses: List[str]
body_text: str
body_html: Optional[str]
timestamp: datetime
references: Optional[Union[List[str], str]] = None
in_reply_to: Optional[str] = None
attachments: List[Dict[str, Any]] = None # type: ignore
def __post_init__(self) -> None:
"""Initialize empty lists for None values and ensure references is a list."""
self.attachments = [] if self.attachments is None else self.attachments
# Convert references to List[str]
if self.references is None:
self.references = []
elif isinstance(self.references, str):
# Convert string to list
self.references = (
[ref.strip() for ref in self.references.split()]
if self.references.strip()
else []
)
elif isinstance(self.references, list):
# Already a list, no need to cast
pass
else:
raise ValueError(
f"References must be None, string, or list, not {type(self.references)}"
)
[docs]
@classmethod
def from_email_message(cls, msg: EmailMessage) -> "EmailData":
"""Create EmailData from an email.message.EmailMessage object."""
body_text_parts = []
body_html = None
attachments = []
# Process message parts
if msg.is_multipart():
for part in msg.walk():
if part.is_multipart():
continue
content_type = part.get_content_type()
# Handle attachments and inline images
disposition = part.get("Content-Disposition", "")
if "attachment" in disposition or content_type.startswith("image/"):
attachments.append(
{
"filename": part.get_filename(),
"content_type": content_type,
"payload": part.get_payload(decode=True),
}
)
elif content_type == "text/plain":
payload = part.get_payload(decode=True)
assert isinstance(payload, bytes) # type assertion for mypy
body_text_parts.append(payload.decode())
elif content_type == "text/html":
payload = part.get_payload(decode=True)
assert isinstance(payload, bytes) # type assertion for mypy
body_html = payload.decode()
else:
payload = msg.get_payload(decode=True)
assert isinstance(payload, bytes) # type assertion for mypy
body_text_parts.append(payload.decode())
# Combine all text parts
body_text = "\n".join(part for part in body_text_parts if part.strip())
return cls(
message_id=msg["Message-ID"] or "",
subject=msg["Subject"] or "",
from_address=msg["From"] or "",
to_addresses=[
addr.strip() for addr in (msg["To"] or "").split(",") if addr
],
cc_addresses=[
addr.strip() for addr in (msg["Cc"] or "").split(",") if addr
],
body_text=body_text,
body_html=body_html,
timestamp=datetime.fromtimestamp(
utils.mktime_tz(cls._get_valid_date_tuple(msg["Date"]))
),
references=[ref.strip() for ref in (msg["References"] or "").split()],
in_reply_to=msg["In-Reply-To"],
attachments=attachments,
)
@staticmethod
def _get_valid_date_tuple(
date_str: Optional[str],
) -> Tuple[int, int, int, int, int, int, int, int, int, Optional[int]]:
"""Get a valid date tuple from a date string, using current time as fallback."""
default_tuple = utils.parsedate_tz(utils.formatdate(localtime=True))
assert (
default_tuple is not None
) # formatdate() always returns a valid date string
if date_str is None:
return default_tuple
parsed = utils.parsedate_tz(date_str)
return parsed if parsed is not None else default_tuple
def _format_quoted_text(self, text: str, level: int = 1) -> str:
"""Format text with email-style quotation marks and attribution.
Args:
text: The text to quote
level: The quotation level (number of '>' characters to prepend)
Returns:
The quoted text with attribution and '>' characters prepended to each line
"""
# Format the date to a readable string
date_str = self.timestamp.strftime("%b %d, %Y, at %I:%M %p")
# Create attribution line
attribution = f"On {date_str}, {self.from_address} wrote:"
# Format the quoted text with proper indentation
prefix = ">" * level
quoted_lines = []
# Split text into lines and process each line
lines = text.splitlines()
i = 0
while i < len(lines):
line = lines[i]
# Check if this line starts an embedded quote
if (
line.startswith("On ")
and i + 1 < len(lines)
and lines[i + 1].startswith(">")
):
# Found an embedded quote, preserve its structure but add our quote level
quoted_lines.append(f"{prefix} {line}")
i += 1
while i < len(lines) and (
lines[i].startswith(">") or not lines[i].strip()
):
if lines[i].startswith(">"):
# Add our quote level to the existing quote
quoted_lines.append(f"{prefix}{lines[i]}")
else:
# Empty line within quote
quoted_lines.append(prefix)
i += 1
continue
# Regular line
if line.strip():
quoted_lines.append(f"{prefix} {line}")
else:
quoted_lines.append(prefix)
i += 1
# Combine attribution with quoted text
return f"{attribution}\n{prefix}\n" + "\n".join(quoted_lines)
[docs]
def create_reply(
self, reply_text: str, include_history: bool = True, quote_level: int = 1
) -> "EmailData":
"""Create a reply EmailData object with proper threading fields set.
Args:
reply_text: The text of the reply message
include_history: Whether to include quoted message history
quote_level: The quotation level for the previous message
Returns:
A new EmailData object configured as a reply to this message
"""
# Build references list
if isinstance(self.references, str):
# Convert string references to list before copying
new_references = (
[ref.strip() for ref in self.references.split()]
if self.references.strip()
else []
)
else:
# None or List[str]
new_references = [] if self.references is None else self.references.copy()
if self.message_id:
new_references.append(self.message_id)
# Format body text with quotations if including history
body_text = reply_text
if include_history:
quoted = self._format_quoted_text(self.body_text, quote_level)
body_text = f"{reply_text}\n\n{quoted}"
# Create reply email data
return EmailData(
message_id="", # Will be set by email server
subject=f"Re: {self.subject}"
if not self.subject.startswith("Re: ")
else self.subject,
from_address="", # Should be set by caller
to_addresses=[self.from_address],
cc_addresses=self.cc_addresses,
body_text=body_text,
body_html=None, # HTML version would need to be generated separately
timestamp=datetime.now(),
references=new_references,
in_reply_to=self.message_id,
attachments=[],
)
[docs]
def to_email_message(self) -> EmailMessage:
"""Convert EmailData to an email.message.EmailMessage object."""
msg = EmailMessage()
msg["Subject"] = self.subject
msg["From"] = self.from_address
msg["To"] = ", ".join(self.to_addresses)
if self.cc_addresses:
msg["Cc"] = ", ".join(self.cc_addresses)
if self.in_reply_to:
msg["In-Reply-To"] = self.in_reply_to
if self.references:
msg["References"] = " ".join(self.references)
# Convert markdown to HTML if no HTML content is provided and text appears to be markdown
html_content = self.body_html
if not html_content and any(
marker in self.body_text for marker in ["```", "#", "**", "__", ">", "-"]
):
converter = MarkdownConverter()
html_content = converter.convert(self.body_text)
elif html_content:
# Use existing HTML content as is
html_content = self.body_html
# Start with mixed if we have attachments
if self.attachments:
msg.make_mixed()
# Create content part
content = EmailMessage()
if html_content:
content.make_alternative()
content.add_alternative(self.body_text, subtype="plain")
content.add_alternative(html_content, subtype="html")
else:
content.set_content(self.body_text)
msg.attach(content)
# Add attachments
for attachment in self.attachments:
msg.add_attachment(
attachment["payload"],
maintype=attachment["content_type"].split("/")[0],
subtype=attachment["content_type"].split("/")[1],
filename=attachment["filename"],
)
else:
# No attachments
if html_content:
msg.make_alternative()
msg.add_alternative(self.body_text, subtype="plain")
msg.add_alternative(html_content, subtype="html")
else:
msg.set_content(self.body_text)
return msg