"""Web routes for mu4web.""" from email.message import EmailMessage from urllib.parse import urlencode from datetime import datetime import html from html.parser import HTMLParser import re from flask_login import ( LoginManager, login_required, login_user, current_user, logout_user, ) from typing import ( Any, Optional, cast, ) import sqlite3 from .mu import get_mail from . import mu from . import message_db from .html_render import HTML, render_document from .user.local import LocalUser from .user.pam import PamUser from .maildir import find_maildirs, serialize_maildir from .tree import fetch_relation_tree, Tree import flask from flask import ( Flask, request, redirect, url_for, flash, get_flashed_messages ) from .components import ( attachement_tree, dl, flashed_messages, header_format, include_stylesheet, login_page, login_prompt, search_field, user_info, ) from .util import MutableString # # A few operations depend on the index of attachements. These index # are all pre-order traversal indexes of the attachement tree, which # is also the order message.walk() returns them in. # login_manager = LoginManager() # -------------------------------------------------- app = Flask(__name__, instance_relative_config=True) # Default configuration values app.config.update( DEFAULT_DIRECTION='falling', DEFAULT_SORT_COLUMN='date', MESSAGE_HEADERS=[ 'from', 'subject', 'to', 'cc', 'date', ] ) app.config.from_pyfile('settings.py') login_manager.init_app(app) def fix_id(id: str) -> str: """Update an ID gotten through a GET parameter into something mu likes.""" return ''.join(id).replace(' ', '+') @login_manager.user_loader def load_user(user_id): """ Find the user with the given id, and return its session object. :param user_id: The string id of the user. .. todo:: Return None on invalid id. """ # return User.get(user_id) return LocalUser(user_id) @app.route('/') def index(): """ Return index page, mail page, or redirect to login page. If the user isn't logged in, then the user is redirected to the login page. Otherwise, if the ``id`` param is set, show the email with that message id, and finally show the index page if nothing else matched. """ if not current_user.is_authenticated: return redirect(url_for('login_page_', returnto=request.path)) if id := request.args.get('id'): fixed_id = fix_id(id) response = response_for(fixed_id, cast(EmailMessage, get_mail(fixed_id))) else: response = index_page() return response @app.route('/search') @login_required def search_page_(): """ Search page response. :param q: :param by: """ direction = request.args.get('direction', app.config['DEFAULT_DIRECTION']) if direction not in ('rising', 'falling'): direction = app.config['DEFAULT_DIRECTION'] return search_page(request.args.get('q', ''), request.args.get('by', None), direction) def multipart_page(msg_id: str, attachement: EmailMessage, attachement_idx: int) -> str: """ Build HTML response for a multi-part attachement. Multi part attachements are simply containers, and can't directly be opened. Instead, build a tree of all components. :param msg_id: Message ID of the mail in question :param attachement: The attachement to work with, must be multipart :param attachement_idx: Index of attachement in top level mail. Needed for links to work. """ tree, _ = attachement_tree(msg_id, attachement, attachement_idx) body: list[HTML] = [('a', {'href': '/?' + urlencode({'id': msg_id})}, 'Återvänd till brev'), ('ul', tree), ] return render_document(page_base(title='Multipart', body=body)) def attachement_response(attachement: EmailMessage) -> flask.Response: """ Build a response for a given attachement. Gets content type and encoding from the attachements headers. """ response = flask.Response() response.charset = attachement.get_content_charset() or 'application/binary' response.mimetype = attachement.get_content_type() # does get_content do stuff depending on content-type? # Check how to explicitly get the raw bytes. response.set_data(attachement.get_content()) return response @app.route('/raw') @login_required def raw_message(): """ Get the "raw" bytes of an email. Looks up a message from the mu database, and then returns the data with no formatting, and a content type of message/rfc822. :param id: Message id """ msg_id = request.args.get('id', '') filename = mu.find_file(msg_id) if not filename: return 'No message with that id', 404 return flask.send_file(filename, mimetype='message/rfc822') class IMGParser(HTMLParser): """ Rewrites HTML image tags to be safer/have more functionality. Should only be fed the image tag. Everything else is assumed to be directly copied externaly. :param result: A mutable string which should be appended with the (possibly changed) img tag. :param msg_id: The Email Message ID field, used to construct some links. """ rx = re.compile('cid:(.*)') def __init__(self, result: MutableString, msg_id: str) -> None: super().__init__() self.result = result self.msg_id = msg_id def handle_starttag(self, # noqa: D102 tag: str, attrs: list[tuple[str, Optional[str]]] ) -> None: # TODO this will also get called for self closing tags # (), which will drop that slash. FIX if tag == 'img': # - Expand img tags with CID: url's to point to our server. # These should be safe (from a tracking perspective) since # they are downloaded as part of the mail. # - Other images are blocked, a piece of javascript is # later added to unblock them on click self.result += '') + '' elif tag == 'a': # Add target="_parent" to all anchors. This causes links # in iframe:s (where the content will probably be shown) # to open in the current (top level) page, instead of # inside the iframe. args = ' '.join(f'{html.escape(key)}={html.escape(value or key)}' for (key, value) in [*attrs, ('target', '_parent')]) self.result += f'' else: assert False, 'Should never be reached' def page_base(title: Optional[str] = None, body: HTML = []) -> HTML: """ Build base layout for almost all pages. The base contents of our html page, from the tag and down. :param title: Local pagetitle, will be suffixed with site suffix. :param body: Contents of the page. Will work without this, but the page would lack any actual contents. """ if title: full_title = f'{title} — Mu4Web' else: full_title = 'Mu4Web' return ('html', {'lang': 'sv'}, ('head', ('meta', {'charset': 'utf-8'}), ('meta', {'name': 'viewport', 'content': 'width=device-width, initial-scale=0.5'}), ('title', full_title), include_stylesheet(url_for('static', filename='style.css')), ), ('body', ('nav', ('menu', ('li', ('h1', ('a', {'href': '/'}, 'Mu4Web'))), ('hr',), ('li', ('form', {'action': '/search', 'method': 'GET'}, ('input', {'type': 'text', 'placeholder': 'Sök...', 'name': 'q'}), ('input', {'type': 'Submit', 'value': 'Sök'}))), ('li', user_info(current_user.get_id()) if current_user.is_authenticated else login_prompt()) )), ('main', flashed_messages(get_flashed_messages()), body), ('footer', ('menu', ('li', ('a', {'href': 'https://www.djcbsoftware.nl/code/mu/'}, 'mu')), ('li', ('a', {'href': 'https://git.hornquist.se/mu4web'}, 'Source')), )))) def search_page(q: str, by: Optional[str], direction: str) -> str: """Return rendered HTML for search page.""" main_body = [search_field(q)] # TODO pagination # Mu handles the search without problem, but python is slow to # build the table, and the browser has problem rendering it if q: main_body.append(search_result(q, by, direction)) return render_document(page_base(title='Sökning', body=main_body)) def index_page() -> str: """Return rendered HTML for index page.""" data = mu.info() maildirs = find_maildirs(data['maildir'] + '/') entries = serialize_maildir(maildirs) rows = [] for key, value in data.items(): rows.append(('tr', ('td', key), ('td', value))) body: HTML = [('div', ('table', ('tbody', rows))), ('div', entries), ] return render_document(page_base(title='E-postindex', body=body)) def search_result(q: str, by: Optional[str], direction: str) -> HTML: """ Search database for query, and build resulting HTML body. :param q: Mu search query. :param by: Parameter to sort by. :param direction: Direction to sort results in. One of 'rising' or 'falling'. """ assert direction in ('rising', 'falling') # keys = ['from', 'to', 'subject', 'date', 'size', 'maildir', 'msgid'] keys = ['from', 'to', 'subject', 'date'] if not by: by = app.config['DEFAULT_SORT_COLUMN'] rows = mu.search(q, by, direction == 'falling') body: list[HTML] = [] for row in rows: rowdata: list[HTML] = [] for key in keys: data = row.get(key, None) if data and key == 'date': dt = datetime.fromtimestamp(int(data)) data = dt.strftime('%Y-%m-%d %H:%M') rowdata.append(('td', ('a', {'href': '/?id=' + row['msgid'], 'title': data or '' }, data))) body.append(('tr', rowdata)) if len(rows) == 0: return "Inga träffar" else: heads: list[HTML] = [] for m in keys: link_body = m.title() params = {'q': q, 'by': m} if m == by: link_body += ' ' if direction == 'rising': link_body += '▲' params['direction'] = 'falling' else: link_body += '▼' params['direction'] = 'rising' heads.append(('th', ('a', {'href': '?' + urlencode(params)}, link_body))) return ('div', ('p', f"{len(rows)} träffar"), ('table', ('thead', ('tr', *heads )), ('tbody', body))) def tree_to_html(current_id: str, tree: Tree) -> HTML: """ Format the given tree as HTML. Currently this is specific to a specific kind of trees. """ body: list[HTML] if current_id == tree.data.entry: body = [ f"{tree.data.date:%Y-%m-%d %H:%M} {tree.data.from_}", ] else: body = [ ('a', {'href': '?' + urlencode({'id': tree.data.entry})}, f"{tree.data.date:%Y-%m-%d %H:%M} {tree.data.from_}", # tree.data.subject, ) ] if tree.children: body += [('ul', [tree_to_html(current_id, c) for c in tree.children])] return ('li', body) def response_for(id: str, mail: EmailMessage) -> str: """ Build response page for an email or a tree. :param id: The message id of the root message :param mail: Either the root component of a mail, or a sub-component of type message/rfc822. """ # Setup headers headers = {} for (key, value) in mail.items(): headers[key.lower()] = value header_entries = [] for h in app.config['MESSAGE_HEADERS']: if x := headers.get(h.lower()): header_entries.append((h.title(), header_format(h.lower(), x))) header_list = dl(header_entries) full_headers = ('details', ('summary', 'Alla mailhuvuden'), dl((key.title(), value) for (key, value) in mail.items())) # Setup title if t := headers.get('subject'): title = f'Mail — {t}' else: title = 'Mail' # Setup body body: list[HTML] = [] # Manual walk to preserve attachement index for idx, at in enumerate(mail.walk()): # body.append(('h2', at.get_content_type())) if at.is_multipart(): continue elif at.get_content_type() == 'text/html': # ct = at.get_content_type() url = '/part?' + urlencode({'id': id, 'idx': idx}) body.append(('iframe', {'src': url, 'height': '300', })) elif at.get_content_type() == 'text/plain': items: list[HTML] = [] for line in at.get_content().split('\n'): if m := re.match(r'^((> *)+)', line): for depth, str in enumerate(re.findall('> *', m[1])): items.append(('span', {'class': f'quote-marker quote-{depth % 5}'}, str)) items.append(line[m.end():]) items.append("\n") else: items.append(line) items.append("\n") body.append(('pre', items)) elif at.get_content_type() == 'application/octet-stream': url = '/part?' + urlencode({'id': id, 'idx': idx}) body.append(('a', {'href': url, 'download': at.get_filename() or ''}, at.get_filename() or at.get_content_type())) else: url = '/part?' + urlencode({'id': id, 'idx': idx}) body.append(('a', {'href': url}, at.get_filename() or at.get_content_type())) con = sqlite3.connect(message_db) cur = con.cursor() relation_tree_data = fetch_relation_tree(cur, mail) relation_tree: HTML if not relation_tree_data: relation_tree = '' else: relation_tree = ('ul', tree_to_html(id, relation_tree_data)) # Setup attachements tree, idx = attachement_tree(id, mail) main_body: list[HTML] = [header_list, full_headers, ('hr',), relation_tree, ('hr',), ('main', body), ('hr',), ('a', {'href': '/raw?' + urlencode({'id': id})}, 'Råa bitar'), ('ul', tree), ] html_str = render_document(page_base(title=title, body=main_body)) return html_str @app.route('/part') @login_required def attachement_part_page(): """ Page for a specific attachment. :param id: Message id for the message we want. :param idx: Optional numeric index of the attachement of the message, counted as the lines appear when printing the tree, one line at a time.:: 0. ├── a 1. │   ├── b 2. │   └── c 3. └── d Defaults to 0 (the root element). :param raw: Optional boolean parameter, which if set, returns the attachement verabtim as stored on disk, instead of rendered into HTML. """ msg_id = request.args.get('id') raw = request.args.get('raw') if not msg_id: return "Message id required", 404 attachement_idx = int(request.args.get('idx', 0)) mail = cast(EmailMessage, get_mail(msg_id)) attachement = list(mail.walk())[attachement_idx] if attachement.get_content_type() == 'message/rfc822': return response_for(fix_id(msg_id), attachement) elif attachement.is_multipart(): return multipart_page(msg_id, attachement, attachement_idx) elif not raw and attachement.get_content_type() == 'text/html': # Rewrites for HTML for different reasons # 1. Expand CID links to something we can handle # 2. add a.target = '_parent' to force links to open in # current tab (instead of keeping in iframe) # 3. Block external resources from loading per default # These should come with some form of toggle for turning them # on or off result = MutableString() # Content encoding here? source = attachement.get_content() parser = IMGParser(result, msg_id) idx = 0 for m in re.finditer(r'< *(a|img|script)[^>]*>', source): result += source[idx:m.start()] idx = m.end() parser.feed(m[0]) result += source[idx:] # This script adds an onclick event for each image we blocked # above, which unblocks it. # TODO this "fails" for images wrapped in anchor tags, since # the anchor tag has priority. url = url_for('static', filename='enable_images.js') result += f"\n" return str(result) else: return attachement_response(attachement) # Returns specific content item from message with id # RFC 2392 # https://www.rfc-editor.org/rfc/rfc2392 # Possibly change this to actually use that form of URI:s @app.route('/cid', methods=['GET']) def cid(): """ Get contents of a cid. :param id: A message id. :param cid: A given content id. :return: A response with the given object, with correct content-type headers. """ msg_id = request.args.get('id') cid = request.args.get('cid') if not msg_id: return "Message id required", 404 if not cid: return "CID required", 404 mail = cast(EmailMessage, get_mail(msg_id)) # .walk(), since attachement may be a few steps down in the # multipart/* tree for attachment in mail.walk(): if attachment.get('content-id') == f'<{cid}>': return attachement_response(attachment) return "Object not found", 404 @app.route('/login', methods=['GET']) def login_page_(): """Login page.""" returnto = request.args.get('returnto') if current_user.is_authenticated: # Redirect away already logged in users if returnto: return redirect(returnto) else: return redirect(url_for('index')) else: # Give the login prompt to non-logged in users. body = login_page(returnto) return render_document(page_base(title='Login', body=body)) @app.route('/login', methods=['POST']) def login_form(): """Login a user.""" resp = redirect(request.args.get('returnto', url_for('index'))) username = request.form['username'] password = request.form['password'] remember = bool(request.form.get('remember', False)) user = PamUser(username) if user.validate(password): login_user(user, remember=remember) else: flash('Invalid username or password') return resp @app.route('/logout', methods=['POST']) @login_required def logout_form(): """Logout the currently logged in user.""" logout_user() return redirect(url_for('index')) @app.errorhandler(500) def internal_server_error(e: Any) -> tuple[str, int]: """Fallback error page for 500 errors.""" return ("error page", 500) if __name__ == '__main__': app.run(debug=True, port=8090)