from email.message import EmailMessage from email.headerregistry import Address from urllib.parse import urlencode from datetime import datetime import html from html.parser import HTMLParser import re from flask_login import ( LoginManager, login_required, login_user, current_user, logout_user, ) from typing import ( Optional, cast, ) from mu import get_mail import mu from html_render import HTML, render_document from user.local import LocalUser from user.pam import PamUser from maildir import find_maildirs, serialize_maildir import flask from flask import ( Flask, request, redirect, url_for, flash, get_flashed_messages ) # # A few operations depend on the index of attachements. These index # are all pre-order traversal indexes of the attachement tree, which # is also the order message.walk() returns them in. # login_manager = LoginManager() def mailto(addr: str) -> HTML: """Constructs a mailto anchor element.""" return ('a', {'href': f'mailto:{addr}'}, addr) def format_email(addr: Address) -> list[HTML]: """Format an email address suitable for the headers of the message view.""" mail_addr = f'{addr.username}@{addr.domain}' return [addr.display_name, ' <', mailto(mail_addr), '>'] def header_format(key: str, value) -> HTML: """Format email headers to HTML.""" if key in ['to', 'cc', 'bcc']: return ('ul', *[('li', *format_email(addr)) for addr in value.addresses]) elif key == 'from': return format_email(value.addresses[0]) elif key == 'in-reply-to': # type(value) == email.headerregistry._UnstructuredHeader id = str(value).strip("<>") return ['<', ('a', {'href': '?' + urlencode({'id': id})}, id), '>'] else: return value def attachement_tree(id: str, mail: EmailMessage, idx=0) -> tuple[HTML, int]: ct = mail.get_content_type() fn = mail.get_filename() children = [] _idx = idx for child in mail.iter_parts(): tree, idx = attachement_tree(id, cast(EmailMessage, child), idx + 1) children.append(tree) content: HTML if children: content = ('ul', *children) else: content = [] if fn: body = f'{ct} {fn}' else: body = str(ct) download = {} if mail.get_content_type() == 'application/octet-stream': download['download'] = mail.get_filename() or '' return ('li', ('a', {'data-idx': str(_idx), 'href': '/part?' + urlencode({'id': id, 'idx': _idx}), **download, }, body), content), idx # -------------------------------------------------- def login_page(returnto: Optional[str] = None) -> HTML: return ('form', {'action': '/login', 'method': 'POST', 'class': 'loginform'}, ('label', {'for': 'username'}, 'Användarnamn'), ('input', {'id': 'username', 'name': 'username', 'placeholder': 'Användarnamn'}), ('label', {'for': 'password'}, 'Lösenord'), ('input', {'id': 'password', 'name': 'password', 'placeholder': 'Lösenord', 'type': 'password'}), ('div', ('input', {'id': 'remember', 'name': 'remember', 'type': 'checkbox'}), ('label', {'for': 'remember'}, 'Kom ihåg mig')), ('input', {'type': 'hidden', 'name': 'returnto', 'value': returnto}) if returnto else [], ('input', {'type': 'submit', 'value': 'Logga in'}), ) def user_info(username: str) -> HTML: return [('span', username), ('form', {'action': '/logout', 'method': 'POST'}, ('input', {'type': 'submit', 'value': 'Logga ut'}))] def login_prompt() -> HTML: return ('a', {'href': '/login'}, 'Logga in') def flashed_messages() -> HTML: return ('ul', {'class': 'flashes'}, *[('li', msg) for msg in get_flashed_messages()]) def include_stylesheet(path): return ('link', {'type': 'text/css', 'rel': 'stylesheet', 'href': path}) def page_base(title: Optional[str] = None, body: HTML = []) -> HTML: return ('html', {'lang': 'sv'}, ('head', ('meta', {'charset': 'utf-8'}), ('meta', {'name': 'viewport', 'content': 'width=device-width, initial-scale=0.5'}), ('title', title, ' — Mu4Web'), include_stylesheet('/static/style.css'), ), ('body', ('nav', ('menu', ('li', ('h1', ('a', {'href': '/'}, 'Mu4Web'))), ('hr',), ('li', ('form', {'action': '/search', 'method': 'GET'}, ('input', {'type': 'text', 'placeholder': 'Sök...', 'name': 'q'}), ('input', {'type': 'Submit', 'value': 'Sök'}))), ('li', user_info(current_user.get_id()) if current_user.is_authenticated else login_prompt()) )), ('main', flashed_messages(), body), ('footer', ('menu', ('li', ('a', {'href': 'https://www.djcbsoftware.nl/code/mu/'}, 'mu')), ('li', ('a', {'href': 'https://git.hornquist.se/mu4web'}, 'Source')), )))) def response_for(id: str) -> str: mail = cast(EmailMessage, get_mail(id)) headers = {} for (key, value) in mail.items(): headers[key.lower()] = value head = [] for h in app.config['MESSAGE_HEADERS']: if x := headers.get(h.lower()): head += [('dt', h.title()), ('dd', header_format(h.lower(), x))] all_heads = [] for key, value in mail.items(): all_heads += [('dt', key.title()), ('dd', value)] full_headers = ('details', ('summary', 'Alla mailhuvuden'), ('dl', *all_heads)) if t := headers.get('subject'): title = f'Mail — {t}' else: title = 'Mail' body: list[HTML] = [] # Manual walk to preserve attachement index for idx, at in enumerate(mail.walk()): # body.append(('h2', at.get_content_type())) if at.is_multipart(): continue elif at.get_content_type() == 'text/html': # ct = at.get_content_type() url = '/part?' + urlencode({'id': id, 'idx': idx}) body.append(('iframe', {'src': url, 'height': '300', })) elif at.get_content_type() == 'text/plain': body.append(('pre', at.get_content())) elif at.get_content_type() == 'application/octet-stream': url = '/part?' + urlencode({'id': id, 'idx': idx}) body.append(('a', {'href': url, 'download': at.get_filename() or ''}, at.get_filename() or at.get_content_type())) else: url = '/part?' + urlencode({'id': id, 'idx': idx}) body.append(('a', {'href': url}, at.get_filename() or at.get_content_type())) tree, idx = attachement_tree(id, mail) main_body: list[HTML] = [('dl', *head), full_headers, ('hr',), ('main', body), ('hr',), ('a', {'href': '/raw?' + urlencode({'id': id})}, 'Råa bitar'), ('ul', tree), ] html_str = render_document(page_base(title=title, body=main_body)) return html_str def search_field(q: str) -> HTML: return ('form', {'id': 'searchform', 'action': '/search', 'method': 'GET'}, ('label', {'for': 'search'}, 'Sökförfrågan till Mu'), ('input', {'id': 'search', 'type': 'text', 'placeholder': 'Sök...', 'name': 'q', 'value': q}), ('input', {'type': 'Submit', 'value': 'Sök'})) def search_result(q: str, by: Optional[str], direction: str) -> HTML: assert direction in ('rising', 'falling') # keys = ['from', 'to', 'subject', 'date', 'size', 'maildir', 'msgid'] keys = ['from', 'to', 'subject', 'date'] if not by: by = app.config['DEFAULT_SORT_COLUMN'] rows = mu.search(q, by, direction == 'falling') body: list[tuple] = [] for row in rows: rowdata = [] for key in keys: data = row.get(key, None) if data and key == 'date': dt = datetime.fromtimestamp(int(data)) data = dt.strftime('%Y-%m-%d %H:%M') rowdata.append(('td', ('a', {'href': '/?id=' + row['msgid']}, data))) body.append(('tr', rowdata)) if len(rows) == 0: return "Inga träffar" else: heads: list[HTML] = [] for m in keys: link_body = m.title() params = {'q': q, 'by': m} if m == by: link_body += ' ' if direction == 'rising': link_body += '▲' params['direction'] = 'falling' else: link_body += '▼' params['direction'] = 'rising' heads.append(('th', ('a', {'href': '?' + urlencode(params)}, link_body))) return ('div', ('p', f"{len(rows)} träffar"), ('table', ('thead', ('tr', *heads )), ('tbody', body))) def search_page(q: str, by: Optional[str], direction: str) -> str: main_body = [search_field(q)] # TODO pagination # Mu handles the search without problem, but python is slow to # build the table, and the browser has problem rendering it if q: main_body.append(search_result(q, by, direction)) return render_document(page_base(title='Sökning', body=main_body)) def index_page(): data = mu.info() maildirs = find_maildirs(data['maildir'] + '/') entries = serialize_maildir(maildirs) rows = [] for key, value in data.items(): rows.append(('tr', ('td', key), ('td', value))) body = [('div', ('table', ('tbody', rows))), ('div', entries), ] return render_document(page_base(title='E-postindex', body=body)) app = Flask(__name__, instance_relative_config=True) # Default configuration values app.config.update( DEFAULT_DIRECTION='falling', DEFAULT_SORT_COLUMN='date', MESSAGE_HEADERS=[ 'from', 'subject', 'to', 'cc', 'date', ] ) app.config.from_pyfile('settings.py') login_manager.init_app(app) @login_manager.user_loader def load_user(user_id): # return User.get(user_id) return LocalUser(user_id) @app.route('/') def index(): if not current_user.is_authenticated: return redirect(url_for('login_page_', returnto=request.path)) if id := request.args.get('id'): response = response_for(''.join(id).replace(' ', '+')) else: response = index_page() return response @app.route('/search') @login_required def search_page_(): direction = request.args.get('direction', app.config['DEFAULT_DIRECTION']) if direction not in ('rising', 'falling'): direction = app.config['DEFAULT_DIRECTION'] return search_page(request.args.get('q', ''), request.args.get('by', None), direction) def multipart_page(msg_id: str, attachement: EmailMessage, attachement_idx: int) -> str: """ Build HTML response for a multi-part attachement. Multi part attachements are simply containers, and can't directly be opened. Instead, build a tree of all components. [Parameters] msg_id - Message ID of the mail in question attachement - The attachement to work with, must be multipart attachement_idx - Index of attachement in top level mail. Needed for links to work. """ tree, _ = attachement_tree(msg_id, attachement, attachement_idx) body: list[HTML] = [('a', {'href': '/?' + urlencode({'id': msg_id})}, 'Återvänd till brev'), ('ul', tree), ] return render_document(page_base(title='Multipart', body=body)) def attachement_response(attachement: EmailMessage): """ Builds a response for a given attachement. Gets content type and encoding from the attachements headers. """ response = flask.Response() response.charset = attachement.get_content_charset() response.mimetype = attachement.get_content_type() # does get_content do stuff depending on content-type? # Check how to explicitly get the raw bytes. response.set_data(attachement.get_content()) return response @app.route('/raw') @login_required def raw_message(): msg_id = request.args.get('id', '') filename = mu.find_file(msg_id) if not filename: return 'No message with that id', 404 return flask.send_file(filename, mimetype='message/rfc822') class MutableString: def __init__(self): self.str = '' def __iadd__(self, other): self.str += other return self def __repr__(self): return f'MutableString("{self.str}")' def __str__(self): return self.str class IMGParser(HTMLParser): """ Rewrites HTML image tags to be safer/have more functionality. Should only be fed the image tag. Everything else is assumed to be directly copied externaly. [Parameters] result - A mutable string which should be appended with the (possibly changed) img tag. msg_id - The Email Message ID field, used to construct some links. """ rx = re.compile('cid:(.*)') def __init__(self, result, msg_id): super().__init__() self.result = result self.msg_id = msg_id def handle_starttag(self, tag, attrs): # TODO this will also get called for self closing tags # (), which will drop that slash. FIX if tag == 'img': # - Expand img tags with CID: url's to point to our server. # These should be safe (from a tracking perspective) since # they are downloaded as part of the mail. # - Other images are blocked, a piece of javascript is # later added to unblock them on click self.result += '') + '' elif tag == 'a': # Add target="_parent" to all anchors. This causes links # in iframe:s (where the content will probably be shown) # to open in the current (top level) page, instead of # inside the iframe. args = ' '.join(f'{html.escape(key)}={html.escape(value)}' for (key, value) in [*attrs, ('target', '_parent')]) self.result += f'' else: assert False, 'Should never be reached' @app.route('/part') @login_required def attachement_part_page(): msg_id = request.args.get('id') raw = request.args.get('raw') if not msg_id: return "Message id required", 404 attachement_idx = int(request.args.get('idx', 0)) mail = cast(EmailMessage, get_mail(msg_id)) attachement = list(mail.walk())[attachement_idx] if attachement.is_multipart(): return multipart_page(msg_id, attachement, attachement_idx) elif not raw and attachement.get_content_type() == 'text/html': # Rewrites for HTML for different reasons # 1. Expand CID links to something we can handle # 2. add a.target = '_parent' to force links to open in # current tab (instead of keeping in iframe) # 3. Block external resources from loading per default # These should come with some form of toggle for turning them # on or off result = MutableString() # Content encoding here? source = attachement.get_content() parser = IMGParser(result, msg_id) idx = 0 for m in re.finditer(r'< *(a|img|script)[^>]*>', source): result += source[idx:m.start()] idx = m.end() parser.feed(m[0]) result += source[idx:] # This script adds an onclick event for each image we blocked # above, which unblocks it. # TODO this "fails" for images wrapped in anchor tags, since # the anchor tag has priority. result += "\n" return str(result) else: return attachement_response(attachement) # Returns specific content item from message with id # RFC 2392 # https://www.rfc-editor.org/rfc/rfc2392 # Possibly change this to actually use that form of URI:s @app.route('/cid', methods=['GET']) def cid(): msg_id = request.args.get('id') cid = request.args.get('cid') if not msg_id: return "Message id required", 404 if not cid: return "CID required", 404 mail = cast(EmailMessage, get_mail(msg_id)) # .walk(), since attachement may be a few steps down in the # multipart/* tree for attachment in mail.walk(): if attachment.get('content-id') == f'<{cid}>': return attachement_response(attachment) return "Object not found", 404 @app.route('/login', methods=['GET']) def login_page_(): returnto = request.args.get('returnto') if current_user.is_authenticated: # Redirect away already logged in users if returnto: return redirect(returnto) else: return redirect(url_for('index')) else: # Give the login prompt to non-logged in users. body = login_page(returnto) return render_document(page_base(title='Login', body=body)) @app.route('/login', methods=['POST']) def login_form(): resp = redirect(request.args.get('returnto', url_for('index'))) username = request.form['username'] password = request.form['password'] remember = bool(request.form.get('remember', False)) user = PamUser(username) if user.validate(password): login_user(user, remember=remember) else: flash('Invalid username or password') return resp @app.route('/logout', methods=['POST']) @login_required def logout_form(): logout_user() return redirect(url_for('index')) if __name__ == '__main__': app.run(debug=True, port=8090)