""" Index message trees and metadata. ``mu`` is good for searching, but suboptimal for advanced queries. Instead, dump all messages in an sqlite database, along with their relations to each other, and some basic metadata to allow a quick join. """ from email.message import EmailMessage from email.parser import BytesParser from mu4web.util import find from socket import gethostname from datetime import datetime import email.message import email.policy import os import os.path import pathlib import re import sqlite3 import time import traceback import uuid from dataclasses import dataclass, field from typing import ( Iterator, TypeAlias, Optional, ) from .util import cwd, Lists from . import mu from . import message_db, cache_dir MailEntry: TypeAlias = tuple[str, bytes, int | float] MailRelation: TypeAlias = tuple[str, str, str] # Literal['references', 'in-reply-to', 'attach']] parser = BytesParser(policy=email.policy.default) def create_maildir(dir: str) -> None: """Create a maildir directory.""" for sub in ['cur', 'new', 'tmp']: pathlib.Path(os.path.join(dir, sub)).mkdir(parents=True, exist_ok=True) def parse_msg_ids(s: str) -> list[str]: """ Extract all msg-id's from a string. This is per RFC 2822, it should be extended to allow comments, since the standard defined msg-id as:: msg-id = [CFWS] "<" id-left "@" id-right ">" [CFWS] Where CFWS is either whitespace, or parenthesis delimited comments, which can contain things looking like message id's. """ return re.findall(r'<([^>]+)>', s) def extract_relations(msg: email.message.Message) -> list[MailRelation]: """ Find all messages the given message directly references. This checks the "References" and "In-Reply-To" headers, but doesn't check any attachments. """ relations: list[MailRelation] = [] msg_id = parse_msg_ids(msg['message-id'])[0] for h in ['references', 'in-reply-to']: if val := msg.get(h): for id in parse_msg_ids(val): relations.append((msg_id, id, h)) return relations def handle_entry(msg: email.message.Message, file: bytes, mtime: int | float) -> Lists[MailEntry, MailRelation]: """ Find all messages the given message reference in any way. This includes direct reference in the message header, but also attached emails, and their headers. :param msg: The message to scan for references. :param file: Path to the file on disk which contains this message. :param mtime: :returns: A pair of lists, the first containing information about each encountered message, while the second contains information about all referenced message. """ data = Lists[MailEntry, MailRelation]() msg_id = parse_msg_ids(msg['message-id'])[0] data[0].append((msg_id, file, mtime)) # Fetch direct references data[1].extend(extract_relations(msg)) # For each attachement, which is another email message. referenced: list[email.message.Message] = [] for attach in msg.walk(): if attach.get_content_type() != 'message/rfc822': continue continue # Forwarded messages seem to always have the form: # - A short meta header # - a blank line # - Original headers and body as first attachment, coded # as text/plain. for sub_message in attach.walk(): if sub_message['message-id']: referenced.append(sub_message) elif sub_message.get_content_type() == 'text/plain': # TODO check type better parsed = parser.parsebytes(bytes((sub_message).get_body())) # type: ignore if parsed['message-id']: # referenced.append(parsed) pass for ref in referenced: other_id = parse_msg_ids(ref['message-id'])[0] data[1].append((msg_id, other_id, 'attach')) data += handle_entry(ref, file, mtime) return data def fetch_meta(cur: sqlite3.Cursor) -> None: """ Update message metadata in the database. Locate all the messages without cached metadata, collect it, and update the database with that data. The metadata collected contains (at least): * Subject * From * Date """ q = """ SELECT DISTINCT msgId, path FROM message LEFT JOIN message_data ON message.msgId = message_data.id WHERE message_data.id IS NULL """ cur.execute(f"SELECT count(1) FROM ({q})") count, = list(cur)[0] cur.execute(q) records = [] for (idx, (id, path)) in enumerate(list(cur)): print(f'{idx}/{count}', end='\033[K\r') try: with open(path, 'rb') as f: msg = parser.parse(f) records.append(( id, msg['subject'], msg['from'], msg['date'])) except Exception as e: print(f'\nErr: {e}') cur.executemany(""" INSERT INTO message_data (id, subject, `from`, date) VALUES (?, ?, ?, ?) """, records) def deliver(maildir: str, msg: EmailMessage) -> None: """ "Deliver" a message to a mailbox. Place the given message into the given maildir. Technically doesn't follow the `maildir spec`_, since the delivery identifiers doesn't use the "standards" overly complicated uniqueness algorithm, and instead opt for UUIDv4. But at least `mutt`_ doesn't care. :param maildir: Maildir in which to place the message in. The ``cur``, ``new``, and ``tmp`` directories needs to be created beforehand. :param msg: The message to deliver. .. _maildir spec: http://cr.yp.to/proto/maildir.html .. _mutt: http://www.mutt.org/ """ basename = '.'.join([str(int(time.clock_gettime(time.CLOCK_REALTIME))), str(uuid.uuid4()), gethostname() .replace('/', fr"\0{ord('/')}") .replace(':', fr"\0{ord(':'):o}")]) with cwd(maildir): with open(f'tmp/{basename}', 'wb') as f: f.write(msg.as_bytes(policy=email.policy.SMTPUTF8)) os.rename(f'tmp/{basename}', f'new/{basename}:2,') def find_emails(maildir: str) -> Iterator[bytes]: """ Iterate over all email messages in all maildirs. Find all files directly in maildirs, where maildirs are defined as directories containing ``new``, ``cur``, and ``tmp`` directories. """ for file in find(maildir, type='f'): if os.path.basename(os.path.dirname(file)) not in {b'new', b'cur', b'tmp'}: continue yield file def create_tables(cur: sqlite3.Cursor) -> None: """Create SQL tables.""" cur.execute(""" -- Messages message present in the database. -- Note that the same msgId might appear multiple times, while path is unique. CREATE TABLE IF NOT EXISTS message ( msgId TEXT NOT NULL , path TEXT NOT NULL , mtime DATETIME NOT NULL ) """) # `entry` references `ref` by means of `how` cur.execute(""" -- How any given message relates to any other message. CREATE TABLE IF NOT EXISTS msg_reference ( entry TEXT NOT NULL , ref TEXT NOT NULL , how TEXT , FOREIGN KEY (entry) REFERENCES message(msgId) ) """) cur.execute(""" -- Extra data extracted from the email. -- Could also be queried through ``mu``, but this allows *much* -- faster lookup of the data we almost always want. CREATE TABLE IF NOT EXISTS message_data ( id TEXT -- PRIMARY KEY REFERENCES message(msgId) , subject TEXT , `from` TEXT , date DATETIME ) """) cur.execute(""" -- Easy access to the number of unique messages in the database. -- Mostly for debug purposes. CREATE VIEW IF NOT EXISTS unique_messages AS SELECT COUNT(1) AS amount FROM (SELECT * FROM message GROUP BY msgId) """) def setup_relation_data(db_path: str, maildir: str) -> None: """ Create database for mail relations, and populate it. Creates a cache database for how each mail relates to each other, and then populates it. This is idempotent, and will only add missing or changed entries. """ con = sqlite3.connect(db_path) cur = con.cursor() err_maildir = os.path.join( cache_dir, "error", datetime.now().isoformat('T', 'seconds')) create_maildir(err_maildir) total = len(list(find_emails(maildir))) create_tables(cur) cur.execute("SELECT path, mtime FROM message") mtimes = {row[0]: row[1] for row in cur} data: Lists[MailEntry, MailRelation] = Lists() failures: int = 0 # For every mail in our mail directories for idx, file in enumerate(find_emails(maildir)): print(f' {idx}/{total}, {file!r}', end='\033[K\r') current_mtime = os.stat(file).st_mtime last_mtime = mtimes.get(file, 0) if not (current_mtime > last_mtime): continue with open(file, 'rb') as f: msg = None try: msg = parser.parse(f) data += handle_entry(msg, file, current_mtime) except Exception as e: failures += 1 errmsg = EmailMessage() errmsg['Subject'] = str(e) fname = file.decode('UTF-8', errors='ignore') errmsg.add_attachment(f"File: {fname}\n\n" + traceback.format_exc(), filename="Python error report") if msg: # maintype='message', subtype='rfc822' errmsg.add_attachment(msg) try: deliver(err_maildir, errmsg) except Exception as e: print(f"\nFailed delivering error {e}") entries = list(set(data[0])) relations = list(set(data[1])) print(f"\nInserting {len(entries)} entries and {len(relations)} references") if failures > 0: print(f"Failed on {failures} entries") print(f"See the maildir at '{err_maildir}' for report") cur.execute("PRAGMA FOREIGN_KEYS = OFF") cur.executemany("INSERT INTO message (msgId, path, mtime) VALUES (?, ?, ?)", entries) cur.executemany("INSERT INTO msg_reference (entry, ref, how) VALUES (?, ?, ?)", relations) con.commit() print() fetch_meta(cur) con.commit() cur.execute("CREATE INDEX IF NOT EXISTS message_id ON message (msgId)") cur.execute("CREATE INDEX IF NOT EXISTS msg_ref_from ON msg_reference (entry)") cur.execute("CREATE INDEX IF NOT EXISTS msg_ref_to ON msg_reference (ref)") con.commit() # -------------------------------------------------- @dataclass class TreeEntry: """ A flat message tree entry. These are the entries initially querrief from the database, and then later embedded in the "real" tree nodes as data containers. This represents parts of an email message, limited to the parts we want. This can be extended later, if more fields are needed. :param entry: Message id of the entry :param ref: The message id of the parent of this entry. Used for constructing the final tree. :param subject: The message subject of this entry. :param from_: The sender of this :param data: """ entry: str ref: str subject: str from_: str date: datetime @dataclass(kw_only=True) class Tree: """ An email message tree. :param data: Contents of this node. See TreeEntry for details :param parent: Direct parent of this node. A value of ``None`` indicates that this is the root of a tree. :param children: List of children of this node. """ data: TreeEntry parent: Optional[str] = None children: list['Tree'] = field(default_factory=list) def _edge_list_to_tree(lst: list[TreeEntry]) -> Optional[Tree]: """ Build a tree structure from a list of edge pairs. Currently this function is overly specific, requiring very specificly formatted structures. In short, it transforms a list of nodes, whcih each contains an ``entry`` (for the nodes id) and ``ref`` (for its parents id), and returns the realized tree. .. todo:: This fails if the root isn't present in the database. """ nodes: dict[str, Tree] = {} root: Optional[Tree] = None for e in lst: nodes[e.entry] = Tree(data=e) for entry in nodes.values(): if not entry.data.ref: root = entry parent = nodes.get(entry.data.ref) if not parent: continue parent.children.append(entry) return root def fetch_relation_tree(cur: sqlite3.Cursor, msg: EmailMessage) -> Optional[Tree]: """ Build a tree of refenenced emails. For a given email, find all other messages in its conversation tree, and return that as an HTML tree. """ id = parse_msg_ids(msg['message-id'])[0] res = cur.execute(""" WITH RECURSIVE -- Go up until we find the message trees root parents(entry, ref, depth) AS (SELECT '', ?, 0 UNION SELECT DISTINCT m.entry, m.ref, r.depth + 1 FROM parents r LEFT JOIN msg_reference m ON r.ref = m.entry WHERE m.how = 'in-reply-to') , root(entry) AS (SELECT ref FROM parents ORDER BY depth DESC LIMIT 1) -- Then go down, finding all messages in the tree , children(entry, ref, how, depth) AS (SELECT entry, '', '*base*', 0 FROM root UNION SELECT m.entry, r.entry, m.how, depth + 1 FROM msg_reference m LEFT JOIN children r ON m.ref = r.entry WHERE m.ref = r.entry AND m.how = 'in-reply-to') SELECT entry , ref , subject , `from` , date FROM children c -- Attach some information about the message. LEFT JOIN message_data m ON c.entry = m.id GROUP BY entry """, (id,)) nodes: list[TreeEntry] = [] for (entry, ref, subject, from_, date) in list(res): # print('entry:', (entry, ref, subject, from_, date)) nodes.append( TreeEntry(entry, ref, subject, from_, datetime.strptime(date, '%a, %d %b %Y %H:%M:%S %z'))) return _edge_list_to_tree(nodes) # -------------------------------------------------- def main() -> None: """Entry point for building cache.""" maildir = mu.info()['maildir'] db_path = message_db setup_relation_data(db_path, maildir) if __name__ == '__main__': # pragma: no cover main()