diff options
author | Hugo Hörnquist <hugo@lysator.liu.se> | 2022-11-29 02:51:29 +0100 |
---|---|---|
committer | Hugo Hörnquist <hugo@lysator.liu.se> | 2022-11-29 02:51:29 +0100 |
commit | 4587e5e124afcb13c81dea68e373420670d076b4 (patch) | |
tree | 91416ac5cb53dce9d80b466f66206a05c5a25a40 /mu4web | |
parent | Add makefile for ctags file. (diff) | |
download | mu4web-4587e5e124afcb13c81dea68e373420670d076b4.tar.gz mu4web-4587e5e124afcb13c81dea68e373420670d076b4.tar.xz |
Properly specify version requirements, and add directory layout.
Diffstat (limited to 'mu4web')
-rw-r--r-- | mu4web/__init__.py | 1 | ||||
-rw-r--r-- | mu4web/html_render.py | 71 | ||||
-rw-r--r-- | mu4web/main.py | 350 | ||||
-rw-r--r-- | mu4web/mu.py | 128 | ||||
-rwxr-xr-x | mu4web/password.py | 120 |
5 files changed, 670 insertions, 0 deletions
diff --git a/mu4web/__init__.py b/mu4web/__init__.py new file mode 100644 index 0000000..72bdd01 --- /dev/null +++ b/mu4web/__init__.py @@ -0,0 +1 @@ +VERSION = "0.1" diff --git a/mu4web/html_render.py b/mu4web/html_render.py new file mode 100644 index 0000000..99140ad --- /dev/null +++ b/mu4web/html_render.py @@ -0,0 +1,71 @@ +import html +from typing import ( + Callable, + TypeAlias, + Union, +) + +HTML: TypeAlias = Union[tuple, + list['HTML'], + Callable[[], str], + None, str, int, float] + + +standalones = ['hr', 'br', 'meta'] +"""Tags which can't have a closing tag.""" + + +def _render_document(document: HTML) -> str: + if isinstance(document, tuple): + tag, *body = document + if body and isinstance(body[0], dict): + print(body[0]) + attributes = ' '.join(f'{a}="{html.escape(b)}"' + for a, b in body[0].items()) + body = body[1:] + start = f'<{tag} {attributes}>' + else: + start = f'<{tag}>' + + if tag in standalones: + return start + else: + if body: + items = ''.join(_render_document(b) for b in body) + else: + items = '' + return start + f'{items}</{tag}>' + elif callable(document): + return str(document()) + elif isinstance(document, list): + return ''.join(_render_document(e) for e in document) + elif document is None: + return '' + else: + # strings, and everything else + return html.escape(str(document)) + + +def render_document(document: HTML) -> str: + """ + Render an HTML structure to an Html string. + + The following Python types are converted as follows: + - Tuples + - The first value becomes the tags name + - The second value, if a dictionary, becomes the tags attributes + - All following values (including the second if not a dictionary) + gets individually passed to render_document. + - Lists + Each element gets passed to render_document + - Callable[[], str] + Gets called, and its output is included verbatim. Useful for + including strings which shouldn't be escaped. + - str + Gets escaped, and included + - int, float + Gets included as their default string representation. + - None + Becomes an empty string + """ + return '<!doctype html>\n' + _render_document(document) diff --git a/mu4web/main.py b/mu4web/main.py new file mode 100644 index 0000000..7c3d7bd --- /dev/null +++ b/mu4web/main.py @@ -0,0 +1,350 @@ +from email.message import EmailMessage +from email.headerregistry import Address +from urllib.parse import urlencode +import password +from password import Passwords +import os +from datetime import datetime +from flask_login import ( + LoginManager, + login_required, + login_user, + current_user, + logout_user, +) +from typing import ( + Optional, + cast, +) +from mu import mu_search, get_mail +from html_render import HTML, render_document + + +from flask import ( + Flask, + session, + request, + redirect, + url_for, + flash, + get_flashed_messages +) + +login_manager = LoginManager() + + +def mailto(addr: str) -> HTML: + return ('a', {'href': f'mailto:{addr}'}, addr) + + +def format_email(addr: Address) -> list[HTML]: + mail_addr = f'{addr.username}@{addr.domain}' + return [addr.display_name, ' <', mailto(mail_addr), '>'] + + +def header_format(key: str, value) -> HTML: + if key in ['to', 'cc', 'bcc']: + return ('ul', *[('li', *format_email(addr)) + for addr in value.addresses]) + elif key == 'from': + return format_email(value.addresses[0]) + elif key == 'in-reply-to': + # type(value) == email.headerregistry._UnstructuredHeader + id = str(value).strip("<>") + return ['<', ('a', {'href': '?' + urlencode({'id': id})}, id), '>'] + else: + return value + + +style: HTML = lambda: """ + nav { + display: block; + width: 100%; + height: 4em; + color: white; + background-color: darkgrey; + } + + dl { + display: grid; + grid-template-columns: 10ch auto; + } + dt { + font-weight: bold; + } + dd { + font-family: mono; + font-size: 80%; + } + dd > * { + margin: 0; + } + + tr:nth-child(2n) { + background-color: lightblue; + } + + """ + + +def attachement_tree(mail: EmailMessage) -> HTML: + ct = mail.get_content_type() + fn = mail.get_filename() + + children = [] + for child in mail.iter_parts(): + children.append(attachement_tree(cast(EmailMessage, child))) + + content: HTML + if children: + content = ('ul', *children) + else: + content = [] + + if fn: + body = f'{ct} {fn}' + else: + body = str(ct) + return ('li', body, content) + +# -------------------------------------------------- + + +def login_page(returnto: Optional[str] = None) -> HTML: + return ('form', {'action': '/login', 'method': 'POST'}, + ('input', {'name': 'username', 'placeholder': 'Username'}), + ('input', {'type': 'password', + 'placeholder': 'Password', + 'name': 'password'}), + ('input', {'type': 'hidden', + 'name': 'returnto', + 'value': returnto}) + if returnto else [], + ('input', {'type': 'submit'}), + ) + + +def user_info(username: str) -> HTML: + return [('span', username), + ('form', {'action': '/logout', 'method': 'POST'}, + ('input', {'type': 'submit', 'value': 'Logga ut'}))] + + +def login_prompt() -> HTML: + return ('a', {'href': '/login'}, 'Logga in') + + +def flashed_messages() -> HTML: + return ('ul', {'class': 'flashes'}, + *[('li', msg) for msg in get_flashed_messages()]) + + +def page_base(title: Optional[str] = None, + body: HTML = []) -> HTML: + return ('html', + ('head', + ('meta', {'charset': 'utf-8'}), + ('title', title), + ('style', style), + ), + ('body', + ('nav', + user_info(current_user.get_id()) if current_user.is_authenticated else login_prompt() + ), + flashed_messages(), + body)) + + +def response_for(id: str, username: Optional[str] = None) -> str: + + mail = cast(EmailMessage, get_mail(id)) + + headers = {} + for (key, value) in mail.items(): + headers[key.lower()] = value + + head = [] + for h in ['date', 'from', 'to', 'cc', 'bcc', 'subject', 'x-original-to', + 'in-reply-to']: + if x := headers.get(h.lower()): + head += [('dt', h.title()), + ('dd', header_format(h.lower(), x))] + + body_part = mail.get_body(preferencelist=('html', 'plain')) + if not body_part: + raise ValueError("No suitable body in email") + ct = body_part.get_content_type() + body: HTML + if ct == 'text/html': + body = lambda: cast(EmailMessage, body_part).get_content() + else: + body = ('pre', cast(EmailMessage, body_part).get_content()) + + if t := headers.get('subject'): + title = f'Mail — {t}' + else: + title = 'Mail' + + main_body = [('dl', *head), + ('hr',), + ('main', body), + ('hr',), + ('ul', attachement_tree(mail)), + ] + html_str = render_document(page_base(title=title, + body=main_body)) + + return html_str + + +def search_field(q: str) -> HTML: + return ('form', {'action': '/search', 'method': 'GET'}, + ('label', {'for': 'search'}, + 'Mu Search Query'), + ('textarea', {'id': 'search', 'name': 'q'}, + q), + ('input', {'type': 'Submit'})) + + +def search_result(q, by, reverse) -> HTML: + + # keys = ['from', 'to', 'subject', 'date', 'size', 'maildir', 'msgid'] + keys = ['from', 'to', 'subject', 'date'] + + rows = mu_search(q, by, reverse) + body: list[tuple] = [] + for row in rows: + rowdata = [] + for key in keys: + data = row.get(key, None) + if data and key == 'date': + dt = datetime.fromtimestamp(int(data)) + data = dt.strftime('%Y-%m-%d %H:%M') + rowdata.append(('td', ('a', {'href': '/?id=' + row['msgid']}, data))) + body.append(('tr', rowdata)) + + + # TODO print number of results + + return ('table', + ('thead', + ('tr', + [('th', m.title()) for m in keys])), + ('tbody', + body + )) + + +def search_page(q, by): + main_body = [search_field(q)] + + if q: + main_body.append(search_result(q, by, False)) + + return render_document(page_base(title='Search', + body=main_body)) + + +def index_page(): + ids = [ + 'CAEzixGsw-4zJ8_ejK_vDgmcQ9s-MbBc-ho+HL4arV4a+ghOOPg@mail.gmail.com', + 'CA+pcBt-gLb0GtbFOjJ5_7Q_WXtqApVPQ9w-3O7GH=VqCEQat6g@mail.gmail.com', + ] + + body = [('h1', "Sample ID's"), + ('ul', + [('li', ('a', {'href': '?' + urlencode({'id': id})}, id)) + for id in ids] + ), + ] + + return render_document(page_base(title='Mail index', + body=body)) + + +passwords: Passwords = password.Passwords(cast(os.PathLike, 'passwords.json')) + + +app = Flask(__name__) +login_manager.init_app(app) +app.secret_key = 'THIS IS A RANDOM STRING' + + +class User: + def __init__(self, username: str): + self._username = username + self._authenticated = False + + # @property + def is_authenticated(self): + return self._authenticated + + # @property + def is_active(self): + return True + + # @property + def is_anonymous(self): + return False + + # @property + def get_id(self): + return self._username + + + + +@login_manager.user_loader +def load_user(user_id): + # return User.get(user_id) + return User(user_id) + + +@app.route('/') +def index(): + if not current_user.is_authenticated: + return redirect(url_for('login_page_', returnto=request.path)) + if id := request.args.get('id'): + print("id =", id) + response = response_for(''.join(id).replace(' ', '+')) + else: + response = index_page() + return response + + +@app.route('/search') +@login_required +def search_page_(): + return search_page(request.args.get('q'), + request.args.get('by', None)) + + +@app.route('/login', methods=['GET']) +def login_page_(): + body = login_page(request.args.get('returnto')) + return render_document(page_base(title='Login', body=body)) + + +@app.route('/login', methods=['POST']) +def login_form(): + resp = redirect(request.args.get('returnto', url_for('index'))) + + username = request.form['username'] + password = request.form['password'] + user = User(username) + if passwords.validate(username, password): + login_user(user) + else: + flash('Invalid username or password') + return resp + + +@app.route('/logout', methods=['POST']) +@login_required +def logout_form(): + logout_user() + return redirect(url_for('index')) + + +if __name__ == '__main__': + app.run(debug=True, port=8090) diff --git a/mu4web/mu.py b/mu4web/mu.py new file mode 100644 index 0000000..d52a362 --- /dev/null +++ b/mu4web/mu.py @@ -0,0 +1,128 @@ +""" +Wrapper for the `mu` command line. +""" + +import email.message +import email.policy +from email.parser import BytesParser +import subprocess +from subprocess import PIPE +import xml.dom.minidom +import xml.dom + +from typing import ( + Literal, + Optional, + Union, +) + +parser = BytesParser(policy=email.policy.default) + + +def get_mail(id: str) -> email.message.Message: + """ + Lookup email by Message-ID. + + [Raises] + MuError + """ + cmd = subprocess.run(['mu', 'find', '-u', f'i:{id}', + '--fields', 'l'], + stdout=PIPE) + filename = cmd.stdout.decode('UTF-8').strip() + + if cmd.returncode != 0: + raise MuError(cmd.returncode) + + with open(filename, "rb") as f: + mail = parser.parse(f) + return mail + + +class MuError(Exception): + codes = { + 1: 'General Error', + 2: 'No Matches', + 4: 'Database is corrupted' + } + + def __init__(self, returncode: int): + self.returncode: int = returncode + self.msg: str = MuError.codes.get(returncode, 'Unknown Error') + + def __repr__(self): + return f'MuError({self.returncode}, "{self.msg}")' + + def __str__(self): + return repr(self) + + +Sortfield = Union[Literal['cc'], + Literal['c'], + Literal['bcc'], + Literal['h'], + Literal['date'], + Literal['d'], + Literal['from'], + Literal['f'], + Literal['maildir'], + Literal['m'], + Literal['msgid'], + Literal['i'], + Literal['prio'], + Literal['p'], + Literal['subject'], + Literal['s'], + Literal['to'], + Literal['t'], + Literal['list'], + Literal['v']] + + + +def mu_search(query, sortfield: Optional[Sortfield] = 'subject', reverse: bool = False) -> list[dict[str, str]]: + """ + [Parameters] + query - Search query as per mu-find(1). + sortfield - Field to sort the values by + reverse - If the sort should be reversed + + [Returns] + >>> {'from': 'Hugo Hörnquist <hugo@example.com>', + 'date': '1585678375', + 'size': '377', + 'msgid': 'SAMPLE-ID@localhost', + 'path': '/home/hugo/.local/var/mail/INBOX/cur/filename', + 'maildir': '/INBOX' + } + """ + + if not query: + raise ValueError('Query required for mu_search') + cmdline = ['mu', 'find', + '--format=xml', + query] + if sortfield: + cmdline.extend(['--sortfield', sortfield]) + if reverse: + cmdline.append('--reverse') + print(cmdline) + cmd = subprocess.run(cmdline, capture_output=True) + if cmd.returncode != 0: + raise MuError(cmd.returncode) + dom = xml.dom.minidom.parseString(cmd.stdout.decode('UTF-8')) + + message_list = [] + messages = dom.childNodes[0] + assert messages.localName == 'messages' + for message in messages.childNodes: + msg_dict = {} + if message.nodeType != xml.dom.Node.ELEMENT_NODE: + continue + for kv in message.childNodes: + if kv.nodeType != xml.dom.Node.ELEMENT_NODE: + continue + msg_dict[kv.localName] = kv.childNodes[0].data + message_list.append(msg_dict) + + return message_list diff --git a/mu4web/password.py b/mu4web/password.py new file mode 100755 index 0000000..ff0df21 --- /dev/null +++ b/mu4web/password.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 + +""" +Simple password store, backed by a JSON file. + +Also contains an entry point for managing the store. +""" + +import hashlib +import json +import os +import random +from typing import ( + TypedDict, +) + + +def gen_salt(length: int = 10) -> str: + # TODO is this a sufficient source of randomness + return bytearray(random.randint(0, 256) for _ in range(length)).hex() + + +# Manual list of entries, to stop someone from executing arbitrary +# code by modyfying password database +hash_methods = { + 'sha256': hashlib.sha256 +} + + +class PasswordEntry(TypedDict): + hash: str + salt: str + # One of the keys of hash_methods + method: str + + +class Passwords: + """ + Simple password store. + + [Parameters] + fname - Path of json file to load and store from. + """ + def __init__(self, fname: os.PathLike): + self.fname = fname + self.db: dict[str, PasswordEntry] + try: + with open(fname) as f: + self.db = json.load(f) + except Exception: + self.db = {} + + def save(self) -> None: + """Dump current data to disk.""" + try: + with open(os.fspath(self.fname) + '.tmp', 'w') as f: + json.dump(self.db, f) + f.write('\n') + os.rename(os.fspath(self.fname) + '.tmp', self.fname) + except Exception as e: + print(f'Saving password failed {e}') + + def add(self, username: str, password: str) -> None: + """Add (or modify) entry in store.""" + if cur := self.db.get(username): + salt = cur['salt'] + hashed = hashlib.sha256((salt + password).encode('UTF-8')) + self.db[username] = { + 'hash': hashed.hexdigest(), + 'salt': salt, + 'method': 'sha256', + } + else: + salt = gen_salt() + hashed = hashlib.sha256((salt + password).encode('UTF-8')) + self.db[username] = { + 'hash': hashed.hexdigest(), + 'salt': salt, + 'method': 'sha256' + } + + def validate(self, username: str, password: str) -> bool: + """Check if user exists, and if it has a correct password.""" + # These shall fail when key is missing + data = self.db[username] + proc = hash_methods[data['method']] + digest = proc((data['salt'] + password).encode('UTF-8')).hexdigest() + return data['hash'] == digest + + +def main(): + import argparse + parser = argparse.ArgumentParser() + + parser.add_argument('--file', default='passwords.json') + + subparsers = parser.add_subparsers(dest='cmd') + + add_parser = subparsers.add_parser('add') + add_parser.add_argument('username') + add_parser.add_argument('password') + + val_parser = subparsers.add_parser('validate') + val_parser.add_argument('username') + val_parser.add_argument('password') + + args = parser.parse_args() + + passwords = Passwords(args.file) + if args.cmd == 'add': + passwords.add(args.username, args.password) + passwords.save() + elif args.cmd == 'validate': + print(passwords.validate(args.username, args.password)) + else: + parser.print_help() + + +if __name__ == '__main__': + main() |