"""Web routes for mu4web.""" from email.message import EmailMessage from urllib.parse import urlencode from datetime import datetime import html from html.parser import HTMLParser import re from flask_login import ( LoginManager, login_required, login_user, current_user, logout_user, ) from typing import ( Any, Optional, cast, ) import sqlite3 from .mu import get_mail from . import mu from . import message_db from .html_render import HTML, render_document from .user.local import LocalUser from .user.pam import PamUser from .maildir import find_maildirs, serialize_maildir from .tree import fetch_relation_tree, Tree import flask from flask import ( Flask, request, redirect, url_for, flash, get_flashed_messages ) from .components import ( attachement_tree, dl, flashed_messages, header_format, include_stylesheet, login_page, login_prompt, search_field, user_info, ) from .util import MutableString # # A few operations depend on the index of attachements. These index # are all pre-order traversal indexes of the attachement tree, which # is also the order message.walk() returns them in. # login_manager = LoginManager() # -------------------------------------------------- app = Flask(__name__, instance_relative_config=True) # Default configuration values app.config.update( DEFAULT_DIRECTION='falling', DEFAULT_SORT_COLUMN='date', MESSAGE_HEADERS=[ 'from', 'subject', 'to', 'cc', 'date', ] ) app.config.from_pyfile('settings.py') login_manager.init_app(app) def fix_id(id: str) -> str: """Update an ID gotten through a GET parameter into something mu likes.""" return ''.join(id).replace(' ', '+') @login_manager.user_loader def load_user(user_id): """ Find the user with the given id, and return its session object. :param user_id: The string id of the user. .. todo:: Return None on invalid id. """ # return User.get(user_id) return LocalUser(user_id) @app.route('/') def index(): """ Return index page, mail page, or redirect to login page. If the user isn't logged in, then the user is redirected to the login page. Otherwise, if the ``id`` param is set, show the email with that message id, and finally show the index page if nothing else matched. """ if not current_user.is_authenticated: return redirect(url_for('login_page_', returnto=request.path)) if id := request.args.get('id'): fixed_id = fix_id(id) response = response_for(fixed_id, cast(EmailMessage, get_mail(fixed_id))) else: response = index_page() return response @app.route('/search') @login_required def search_page_(): """ Search page response. :param q: :param by: """ direction = request.args.get('direction', app.config['DEFAULT_DIRECTION']) if direction not in ('rising', 'falling'): direction = app.config['DEFAULT_DIRECTION'] return search_page(request.args.get('q', ''), request.args.get('by', None), direction) def multipart_page(msg_id: str, attachement: EmailMessage, attachement_idx: int) -> str: """ Build HTML response for a multi-part attachement. Multi part attachements are simply containers, and can't directly be opened. Instead, build a tree of all components. :param msg_id: Message ID of the mail in question :param attachement: The attachement to work with, must be multipart :param attachement_idx: Index of attachement in top level mail. Needed for links to work. """ tree, _ = attachement_tree(msg_id, attachement, attachement_idx) body: list[HTML] = [('a', {'href': '/?' + urlencode({'id': msg_id})}, 'Återvänd till brev'), ('ul', tree), ] return render_document(page_base(title='Multipart', body=body)) def attachement_response(attachement: EmailMessage) -> flask.Response: """ Build a response for a given attachement. Gets content type and encoding from the attachements headers. """ response = flask.Response() response.charset = attachement.get_content_charset() or 'application/binary' response.mimetype = attachement.get_content_type() # does get_content do stuff depending on content-type? # Check how to explicitly get the raw bytes. response.set_data(attachement.get_content()) return response @app.route('/raw') @login_required def raw_message(): """ Get the "raw" bytes of an email. Looks up a message from the mu database, and then returns the data with no formatting, and a content type of message/rfc822. :param id: Message id """ msg_id = request.args.get('id', '') filename = mu.find_file(msg_id) if not filename: return 'No message with that id', 404 return flask.send_file(filename, mimetype='message/rfc822') class IMGParser(HTMLParser): """ Rewrites HTML image tags to be safer/have more functionality. Should only be fed the image tag. Everything else is assumed to be directly copied externaly. :param result: A mutable string which should be appended with the (possibly changed) img tag. :param msg_id: The Email Message ID field, used to construct some links. """ rx = re.compile('cid:(.*)') def __init__(self, result: MutableString, msg_id: str) -> None: super().__init__() self.result = result self.msg_id = msg_id def handle_starttag(self, # noqa: D102 tag: str, attrs: list[tuple[str, Optional[str]]] ) -> None: # TODO this will also get called for self closing tags # (), which will drop that slash. FIX if tag == 'img': # - Expand img tags with CID: url's to point to our server. # These should be safe (from a tracking perspective) since # they are downloaded as part of the mail. # - Other images are blocked, a piece of javascript is # later added to unblock them on click self.result += '' elif tag == 'script': # Keep script tag contents, but change it to text. I'm not # sure how many try to inject javascript into their # emails, but we don't want any of it. args = ' '.join(f'{key}={value}' for (key, value) in attrs) self.result += '
' + html.escape(f'" return str(result) else: return attachement_response(attachement) # Returns specific content item from message with id # RFC 2392 # https://www.rfc-editor.org/rfc/rfc2392 # Possibly change this to actually use that form of URI:s @app.route('/cid', methods=['GET']) def cid(): """ Get contents of a cid. :param id: A message id. :param cid: A given content id. :return: A response with the given object, with correct content-type headers. """ msg_id = request.args.get('id') cid = request.args.get('cid') if not msg_id: return "Message id required", 404 if not cid: return "CID required", 404 mail = cast(EmailMessage, get_mail(msg_id)) # .walk(), since attachement may be a few steps down in the # multipart/* tree for attachment in mail.walk(): if attachment.get('content-id') == f'<{cid}>': return attachement_response(attachment) return "Object not found", 404 @app.route('/login', methods=['GET']) def login_page_(): """Login page.""" returnto = request.args.get('returnto') if current_user.is_authenticated: # Redirect away already logged in users if returnto: return redirect(returnto) else: return redirect(url_for('index')) else: # Give the login prompt to non-logged in users. body = login_page(returnto) return render_document(page_base(title='Login', body=body)) @app.route('/login', methods=['POST']) def login_form(): """Login a user.""" resp = redirect(request.args.get('returnto', url_for('index'))) username = request.form['username'] password = request.form['password'] remember = bool(request.form.get('remember', False)) user = PamUser(username) if user.validate(password): login_user(user, remember=remember) else: flash('Invalid username or password') return resp @app.route('/logout', methods=['POST']) @login_required def logout_form(): """Logout the currently logged in user.""" logout_user() return redirect(url_for('index')) @app.errorhandler(500) def internal_server_error(e: Any) -> tuple[str, int]: """Fallback error page for 500 errors.""" return ("error page", 500) if __name__ == '__main__': app.run(debug=True, port=8090)