diff options
Diffstat (limited to 'mu4web/main.py')
-rw-r--r-- | mu4web/main.py | 227 |
1 files changed, 193 insertions, 34 deletions
diff --git a/mu4web/main.py b/mu4web/main.py index 5d2d975..e56a0df 100644 --- a/mu4web/main.py +++ b/mu4web/main.py @@ -1,3 +1,5 @@ +"""Web routes for mu4web.""" + from email.message import EmailMessage from email.headerregistry import Address from urllib.parse import urlencode @@ -13,6 +15,7 @@ from flask_login import ( logout_user, ) from typing import ( + Any, Optional, cast, ) @@ -33,7 +36,6 @@ from flask import ( get_flashed_messages ) - # # A few operations depend on the index of attachements. These index # are all pre-order traversal indexes of the attachement tree, which @@ -44,7 +46,7 @@ login_manager = LoginManager() def mailto(addr: str) -> HTML: - """Constructs a mailto anchor element.""" + """Construct a mailto anchor element.""" return ('a', {'href': f'mailto:{addr}'}, addr) @@ -54,7 +56,7 @@ def format_email(addr: Address) -> list[HTML]: return [addr.display_name, ' <', mailto(mail_addr), '>'] -def header_format(key: str, value) -> HTML: +def header_format(key: str, value: Any) -> HTML: """Format email headers to HTML.""" if key in ['to', 'cc', 'bcc']: return ('ul', *[('li', *format_email(addr)) @@ -66,10 +68,14 @@ def header_format(key: str, value) -> HTML: id = str(value).strip("<>") return ['<', ('a', {'href': '?' + urlencode({'id': id})}, id), '>'] else: - return value + # assert type(value) == str, f"Weird value in header {value!r}" + return str(value) -def attachement_tree(id: str, mail: EmailMessage, idx=0) -> tuple[HTML, int]: +def attachement_tree(id: str, + mail: EmailMessage, + idx: int = 0) -> tuple[HTML, int]: + """Construct a tree of all attachements for the given mail.""" ct = mail.get_content_type() fn = mail.get_filename() @@ -102,6 +108,7 @@ def attachement_tree(id: str, mail: EmailMessage, idx=0) -> tuple[HTML, int]: def login_page(returnto: Optional[str] = None) -> HTML: + """HTML form for the login page.""" return ('form', {'action': '/login', 'method': 'POST', 'class': 'loginform'}, ('label', {'for': 'username'}, 'Användarnamn'), ('input', {'id': 'username', 'name': 'username', 'placeholder': 'Användarnamn'}), @@ -121,21 +128,29 @@ def login_page(returnto: Optional[str] = None) -> HTML: def user_info(username: str) -> HTML: + """ + Return user info for top bar. + + Includes the users name, and a button for logging out. + """ return [('span', username), ('form', {'action': '/logout', 'method': 'POST'}, - ('input', {'type': 'submit', 'value': 'Logga ut'}))] + ('input', {'type': 'submit', 'value': 'Logga ut'}))] def login_prompt() -> HTML: + """Return link to the login page.""" return ('a', {'href': '/login'}, 'Logga in') def flashed_messages() -> HTML: + """Return Flasks flashed messages, formatted as a list.""" return ('ul', {'class': 'flashes'}, *[('li', msg) for msg in get_flashed_messages()]) -def include_stylesheet(path): +def include_stylesheet(path: str) -> HTML: + """Return HTML for including a stylesheet inside the <head>.""" return ('link', {'type': 'text/css', 'rel': 'stylesheet', 'href': path}) @@ -143,12 +158,29 @@ def include_stylesheet(path): def page_base(title: Optional[str] = None, body: HTML = []) -> HTML: + """ + Build base layout for almost all pages. + + The base contents of our html page, from the <html/> tag and down. + + :param title: + Local pagetitle, will be suffixed with site suffix. + + :param body: + Contents of the page. Will work without this, but the page + would lack any actual contents. + """ + if title: + full_title = f'{title} — Mu4Web' + else: + full_title = 'Mu4Web' + return ('html', {'lang': 'sv'}, ('head', ('meta', {'charset': 'utf-8'}), ('meta', {'name': 'viewport', 'content': 'width=device-width, initial-scale=0.5'}), - ('title', title, ' — Mu4Web'), + ('title', full_title), include_stylesheet(url_for('static', filename='style.css')), ), ('body', @@ -177,11 +209,19 @@ def page_base(title: Optional[str] = None, )))) -def response_for(id: str) -> str: - - mail = cast(EmailMessage, get_mail(id)) +def response_for(id: str, mail: EmailMessage) -> str: + """ + Build response page for an email or a tree. + :param id: + The message id of the root message + :param mail: + Either the root component of a mail, or a sub-component of + type message/rfc822. + """ + # Setup headers headers = {} + print("mail", mail) for (key, value) in mail.items(): headers[key.lower()] = value @@ -200,11 +240,13 @@ def response_for(id: str) -> str: ('summary', 'Alla mailhuvuden'), ('dl', *all_heads)) + # Setup title if t := headers.get('subject'): title = f'Mail — {t}' else: title = 'Mail' + # Setup body body: list[HTML] = [] # Manual walk to preserve attachement index for idx, at in enumerate(mail.walk()): @@ -229,6 +271,7 @@ def response_for(id: str) -> str: body.append(('a', {'href': url}, at.get_filename() or at.get_content_type())) + # Setup attachements tree, idx = attachement_tree(id, mail) main_body: list[HTML] = [('dl', *head), @@ -247,6 +290,7 @@ def response_for(id: str) -> str: def search_field(q: str) -> HTML: + """Build large search form for search page.""" return ('form', {'id': 'searchform', 'action': '/search', 'method': 'GET'}, @@ -261,6 +305,18 @@ def search_field(q: str) -> HTML: def search_result(q: str, by: Optional[str], direction: str) -> HTML: + """ + Search database for query, and build resulting HTML body. + + :param q: + Mu search query. + + :param by: + Parameter to sort by. + + :param direction: + Direction to sort results in. One of 'rising' or 'falling'. + """ assert direction in ('rising', 'falling') # keys = ['from', 'to', 'subject', 'date', 'size', 'maildir', 'msgid'] @@ -269,16 +325,16 @@ def search_result(q: str, by: Optional[str], direction: str) -> HTML: by = app.config['DEFAULT_SORT_COLUMN'] rows = mu.search(q, by, direction == 'falling') - body: list[tuple] = [] + body: list[HTML] = [] for row in rows: - rowdata = [] + rowdata: list[HTML] = [] for key in keys: data = row.get(key, None) if data and key == 'date': dt = datetime.fromtimestamp(int(data)) data = dt.strftime('%Y-%m-%d %H:%M') - rowdata.append(('td', ('a', {'href': '/?id=' + row['msgid']}, data))) - body.append(('tr', rowdata)) + rowdata.append(['td', ('a', {'href': '/?id=' + row['msgid']}, data)]) + body.append(['tr', rowdata]) if len(rows) == 0: return "Inga träffar" @@ -311,6 +367,7 @@ def search_result(q: str, by: Optional[str], direction: str) -> HTML: def search_page(q: str, by: Optional[str], direction: str) -> str: + """Return rendered HTML for search page.""" main_body = [search_field(q)] # TODO pagination @@ -323,7 +380,8 @@ def search_page(q: str, by: Optional[str], body=main_body)) -def index_page(): +def index_page() -> str: + """Return rendered HTML for index page.""" data = mu.info() maildirs = find_maildirs(data['maildir'] + '/') @@ -334,9 +392,9 @@ def index_page(): rows.append(('tr', ('td', key), ('td', value))) - body = [('div', ('table', ('tbody', rows))), - ('div', entries), - ] + body: HTML = [('div', ('table', ('tbody', rows))), + ('div', entries), + ] return render_document(page_base(title='E-postindex', body=body)) @@ -361,18 +419,41 @@ app.config.from_pyfile('settings.py') login_manager.init_app(app) +def fix_id(id: str) -> str: + """Update an ID gotten through a GET parameter into something mu likes.""" + return ''.join(id).replace(' ', '+') + + @login_manager.user_loader def load_user(user_id): + """ + Find the user with the given id, and return its session object. + + :param user_id: + The string id of the user. + + .. todo:: + + Return None on invalid id. + """ # return User.get(user_id) return LocalUser(user_id) @app.route('/') def index(): + """ + Return index page, mail page, or redirect to login page. + + If the user isn't logged in, then the user is redirected to the login page. + Otherwise, if the ``id`` param is set, show the email with that message id, + and finally show the index page if nothing else matched. + """ if not current_user.is_authenticated: return redirect(url_for('login_page_', returnto=request.path)) if id := request.args.get('id'): - response = response_for(''.join(id).replace(' ', '+')) + fixed_id = fix_id(id) + response = response_for(fixed_id, cast(EmailMessage, get_mail(fixed_id))) else: response = index_page() return response @@ -381,6 +462,12 @@ def index(): @app.route('/search') @login_required def search_page_(): + """ + Search page response. + + :param q: + :param by: + """ direction = request.args.get('direction', app.config['DEFAULT_DIRECTION']) if direction not in ('rising', 'falling'): direction = app.config['DEFAULT_DIRECTION'] @@ -413,9 +500,9 @@ def multipart_page(msg_id: str, body=body)) -def attachement_response(attachement: EmailMessage): +def attachement_response(attachement: EmailMessage) -> flask.Response: """ - Builds a response for a given attachement. + Build a response for a given attachement. Gets content type and encoding from the attachements headers. """ @@ -432,6 +519,15 @@ def attachement_response(attachement: EmailMessage): @app.route('/raw') @login_required def raw_message(): + """ + Get the "raw" bytes of an email. + + Looks up a message from the mu database, and then returns the data + with no formatting, and a content type of message/rfc822. + + :param id: + Message id + """ msg_id = request.args.get('id', '') filename = mu.find_file(msg_id) if not filename: @@ -440,39 +536,53 @@ def raw_message(): class MutableString: - def __init__(self): + """ + A mutatable string. + + Strings are immutable by default in python. This works almost + exactly like a regular string, but ``+=`` actually changes the + object in place. + """ + + def __init__(self) -> None: self.str = '' - def __iadd__(self, other): + def __iadd__(self, other: str) -> 'MutableString': self.str += other return self - def __repr__(self): + def __repr__(self) -> str: return f'MutableString("{self.str}")' - def __str__(self): + def __str__(self) -> str: return self.str class IMGParser(HTMLParser): """ Rewrites HTML image tags to be safer/have more functionality. + Should only be fed the image tag. Everything else is assumed to be directly copied externaly. - [Parameters] - result - A mutable string which should be appended with the - (possibly changed) img tag. - msg_id - The Email Message ID field, used to construct some links. + :param result: + A mutable string which should be appended with the (possibly + changed) img tag. + :param msg_id: + The Email Message ID field, used to construct some links. """ + rx = re.compile('cid:(.*)') - def __init__(self, result, msg_id): + def __init__(self, result: MutableString, msg_id: str) -> None: super().__init__() self.result = result self.msg_id = msg_id - def handle_starttag(self, tag, attrs): + def handle_starttag(self, # noqa: D102 + tag: str, + attrs: list[tuple[str, Optional[str]]] + ) -> None: # TODO this will also get called for self closing tags # (<img/>), which will drop that slash. FIX @@ -484,6 +594,8 @@ class IMGParser(HTMLParser): # later added to unblock them on click self.result += '<img ' for key, value in attrs: + if not value: + value = key if key == 'src': if m := IMGParser.rx.match(value): params = urlencode({'id': self.msg_id, 'cid': m[1]}) @@ -521,7 +633,7 @@ class IMGParser(HTMLParser): # in iframe:s (where the content will probably be shown) # to open in the current (top level) page, instead of # inside the iframe. - args = ' '.join(f'{html.escape(key)}={html.escape(value)}' + args = ' '.join(f'{html.escape(key)}={html.escape(value or key)}' for (key, value) in [*attrs, ('target', '_parent')]) self.result += f'<a {args}>' @@ -533,6 +645,29 @@ class IMGParser(HTMLParser): @app.route('/part') @login_required def attachement_part_page(): + """ + Page for a specific attachment. + + :param id: + Message id for the message we want. + + :param idx: + Optional numeric index of the attachement of the message, + counted as the lines appear when printing the tree, one line + at a time.:: + + 0. ├── a + 1. │ ├── b + 2. │ └── c + 3. └── d + + Defaults to 0 (the root element). + + :param raw: + Optional boolean parameter, which if set, returns the + attachement verabtim as stored on disk, instead of rendered + into HTML. + """ msg_id = request.args.get('id') raw = request.args.get('raw') if not msg_id: @@ -541,7 +676,10 @@ def attachement_part_page(): mail = cast(EmailMessage, get_mail(msg_id)) attachement = list(mail.walk())[attachement_idx] - if attachement.is_multipart(): + if attachement.get_content_type() == 'message/rfc822': + return response_for(fix_id(msg_id), attachement) + + elif attachement.is_multipart(): return multipart_page(msg_id, attachement, attachement_idx) @@ -586,6 +724,18 @@ def attachement_part_page(): # Possibly change this to actually use that form of URI:s @app.route('/cid', methods=['GET']) def cid(): + """ + Get contents of a cid. + + :param id: + A message id. + :param cid: + A given content id. + + :return: + A response with the given object, with correct content-type + headers. + """ msg_id = request.args.get('id') cid = request.args.get('cid') if not msg_id: @@ -605,6 +755,7 @@ def cid(): @app.route('/login', methods=['GET']) def login_page_(): + """Login page.""" returnto = request.args.get('returnto') if current_user.is_authenticated: # Redirect away already logged in users @@ -620,6 +771,7 @@ def login_page_(): @app.route('/login', methods=['POST']) def login_form(): + """Login a user.""" resp = redirect(request.args.get('returnto', url_for('index'))) username = request.form['username'] @@ -636,9 +788,16 @@ def login_form(): @app.route('/logout', methods=['POST']) @login_required def logout_form(): + """Logout the currently logged in user.""" logout_user() return redirect(url_for('index')) +@app.errorhandler(500) +def internal_server_error(e: Any) -> tuple[str, int]: + """Fallback error page for 500 errors.""" + return ("error page", 500) + + if __name__ == '__main__': app.run(debug=True, port=8090) |