aboutsummaryrefslogtreecommitdiff
path: root/mu4web
diff options
context:
space:
mode:
authorHugo Hörnquist <hugo@lysator.liu.se>2022-11-29 02:51:29 +0100
committerHugo Hörnquist <hugo@lysator.liu.se>2022-11-29 02:51:29 +0100
commit4587e5e124afcb13c81dea68e373420670d076b4 (patch)
tree91416ac5cb53dce9d80b466f66206a05c5a25a40 /mu4web
parentAdd makefile for ctags file. (diff)
downloadmu4web-4587e5e124afcb13c81dea68e373420670d076b4.tar.gz
mu4web-4587e5e124afcb13c81dea68e373420670d076b4.tar.xz
Properly specify version requirements, and add directory layout.
Diffstat (limited to 'mu4web')
-rw-r--r--mu4web/__init__.py1
-rw-r--r--mu4web/html_render.py71
-rw-r--r--mu4web/main.py350
-rw-r--r--mu4web/mu.py128
-rwxr-xr-xmu4web/password.py120
5 files changed, 670 insertions, 0 deletions
diff --git a/mu4web/__init__.py b/mu4web/__init__.py
new file mode 100644
index 0000000..72bdd01
--- /dev/null
+++ b/mu4web/__init__.py
@@ -0,0 +1 @@
+VERSION = "0.1"
diff --git a/mu4web/html_render.py b/mu4web/html_render.py
new file mode 100644
index 0000000..99140ad
--- /dev/null
+++ b/mu4web/html_render.py
@@ -0,0 +1,71 @@
+import html
+from typing import (
+ Callable,
+ TypeAlias,
+ Union,
+)
+
+HTML: TypeAlias = Union[tuple,
+ list['HTML'],
+ Callable[[], str],
+ None, str, int, float]
+
+
+standalones = ['hr', 'br', 'meta']
+"""Tags which can't have a closing tag."""
+
+
+def _render_document(document: HTML) -> str:
+ if isinstance(document, tuple):
+ tag, *body = document
+ if body and isinstance(body[0], dict):
+ print(body[0])
+ attributes = ' '.join(f'{a}="{html.escape(b)}"'
+ for a, b in body[0].items())
+ body = body[1:]
+ start = f'<{tag} {attributes}>'
+ else:
+ start = f'<{tag}>'
+
+ if tag in standalones:
+ return start
+ else:
+ if body:
+ items = ''.join(_render_document(b) for b in body)
+ else:
+ items = ''
+ return start + f'{items}</{tag}>'
+ elif callable(document):
+ return str(document())
+ elif isinstance(document, list):
+ return ''.join(_render_document(e) for e in document)
+ elif document is None:
+ return ''
+ else:
+ # strings, and everything else
+ return html.escape(str(document))
+
+
+def render_document(document: HTML) -> str:
+ """
+ Render an HTML structure to an Html string.
+
+ The following Python types are converted as follows:
+ - Tuples
+ - The first value becomes the tags name
+ - The second value, if a dictionary, becomes the tags attributes
+ - All following values (including the second if not a dictionary)
+ gets individually passed to render_document.
+ - Lists
+ Each element gets passed to render_document
+ - Callable[[], str]
+ Gets called, and its output is included verbatim. Useful for
+ including strings which shouldn't be escaped.
+ - str
+ Gets escaped, and included
+ - int, float
+ Gets included as their default string representation.
+ - None
+ Becomes an empty string
+ """
+ return '<!doctype html>\n' + _render_document(document)
diff --git a/mu4web/main.py b/mu4web/main.py
new file mode 100644
index 0000000..7c3d7bd
--- /dev/null
+++ b/mu4web/main.py
@@ -0,0 +1,350 @@
+from email.message import EmailMessage
+from email.headerregistry import Address
+from urllib.parse import urlencode
+import password
+from password import Passwords
+import os
+from datetime import datetime
+from flask_login import (
+ LoginManager,
+ login_required,
+ login_user,
+ current_user,
+ logout_user,
+)
+from typing import (
+ Optional,
+ cast,
+)
+from mu import mu_search, get_mail
+from html_render import HTML, render_document
+
+
+from flask import (
+ Flask,
+ session,
+ request,
+ redirect,
+ url_for,
+ flash,
+ get_flashed_messages
+)
+
+login_manager = LoginManager()
+
+
+def mailto(addr: str) -> HTML:
+ return ('a', {'href': f'mailto:{addr}'}, addr)
+
+
+def format_email(addr: Address) -> list[HTML]:
+ mail_addr = f'{addr.username}@{addr.domain}'
+ return [addr.display_name, ' <', mailto(mail_addr), '>']
+
+
+def header_format(key: str, value) -> HTML:
+ if key in ['to', 'cc', 'bcc']:
+ return ('ul', *[('li', *format_email(addr))
+ for addr in value.addresses])
+ elif key == 'from':
+ return format_email(value.addresses[0])
+ elif key == 'in-reply-to':
+ # type(value) == email.headerregistry._UnstructuredHeader
+ id = str(value).strip("<>")
+ return ['<', ('a', {'href': '?' + urlencode({'id': id})}, id), '>']
+ else:
+ return value
+
+
+style: HTML = lambda: """
+ nav {
+ display: block;
+ width: 100%;
+ height: 4em;
+ color: white;
+ background-color: darkgrey;
+ }
+
+ dl {
+ display: grid;
+ grid-template-columns: 10ch auto;
+ }
+ dt {
+ font-weight: bold;
+ }
+ dd {
+ font-family: mono;
+ font-size: 80%;
+ }
+ dd > * {
+ margin: 0;
+ }
+
+ tr:nth-child(2n) {
+ background-color: lightblue;
+ }
+
+ """
+
+
+def attachement_tree(mail: EmailMessage) -> HTML:
+ ct = mail.get_content_type()
+ fn = mail.get_filename()
+
+ children = []
+ for child in mail.iter_parts():
+ children.append(attachement_tree(cast(EmailMessage, child)))
+
+ content: HTML
+ if children:
+ content = ('ul', *children)
+ else:
+ content = []
+
+ if fn:
+ body = f'{ct} {fn}'
+ else:
+ body = str(ct)
+ return ('li', body, content)
+
+# --------------------------------------------------
+
+
+def login_page(returnto: Optional[str] = None) -> HTML:
+ return ('form', {'action': '/login', 'method': 'POST'},
+ ('input', {'name': 'username', 'placeholder': 'Username'}),
+ ('input', {'type': 'password',
+ 'placeholder': 'Password',
+ 'name': 'password'}),
+ ('input', {'type': 'hidden',
+ 'name': 'returnto',
+ 'value': returnto})
+ if returnto else [],
+ ('input', {'type': 'submit'}),
+ )
+
+
+def user_info(username: str) -> HTML:
+ return [('span', username),
+ ('form', {'action': '/logout', 'method': 'POST'},
+ ('input', {'type': 'submit', 'value': 'Logga ut'}))]
+
+
+def login_prompt() -> HTML:
+ return ('a', {'href': '/login'}, 'Logga in')
+
+
+def flashed_messages() -> HTML:
+ return ('ul', {'class': 'flashes'},
+ *[('li', msg) for msg in get_flashed_messages()])
+
+
+def page_base(title: Optional[str] = None,
+ body: HTML = []) -> HTML:
+ return ('html',
+ ('head',
+ ('meta', {'charset': 'utf-8'}),
+ ('title', title),
+ ('style', style),
+ ),
+ ('body',
+ ('nav',
+ user_info(current_user.get_id()) if current_user.is_authenticated else login_prompt()
+ ),
+ flashed_messages(),
+ body))
+
+
+def response_for(id: str, username: Optional[str] = None) -> str:
+
+ mail = cast(EmailMessage, get_mail(id))
+
+ headers = {}
+ for (key, value) in mail.items():
+ headers[key.lower()] = value
+
+ head = []
+ for h in ['date', 'from', 'to', 'cc', 'bcc', 'subject', 'x-original-to',
+ 'in-reply-to']:
+ if x := headers.get(h.lower()):
+ head += [('dt', h.title()),
+ ('dd', header_format(h.lower(), x))]
+
+ body_part = mail.get_body(preferencelist=('html', 'plain'))
+ if not body_part:
+ raise ValueError("No suitable body in email")
+ ct = body_part.get_content_type()
+ body: HTML
+ if ct == 'text/html':
+ body = lambda: cast(EmailMessage, body_part).get_content()
+ else:
+ body = ('pre', cast(EmailMessage, body_part).get_content())
+
+ if t := headers.get('subject'):
+ title = f'Mail — {t}'
+ else:
+ title = 'Mail'
+
+ main_body = [('dl', *head),
+ ('hr',),
+ ('main', body),
+ ('hr',),
+ ('ul', attachement_tree(mail)),
+ ]
+ html_str = render_document(page_base(title=title,
+ body=main_body))
+
+ return html_str
+
+
+def search_field(q: str) -> HTML:
+ return ('form', {'action': '/search', 'method': 'GET'},
+ ('label', {'for': 'search'},
+ 'Mu Search Query'),
+ ('textarea', {'id': 'search', 'name': 'q'},
+ q),
+ ('input', {'type': 'Submit'}))
+
+
+def search_result(q, by, reverse) -> HTML:
+
+ # keys = ['from', 'to', 'subject', 'date', 'size', 'maildir', 'msgid']
+ keys = ['from', 'to', 'subject', 'date']
+
+ rows = mu_search(q, by, reverse)
+ body: list[tuple] = []
+ for row in rows:
+ rowdata = []
+ for key in keys:
+ data = row.get(key, None)
+ if data and key == 'date':
+ dt = datetime.fromtimestamp(int(data))
+ data = dt.strftime('%Y-%m-%d %H:%M')
+ rowdata.append(('td', ('a', {'href': '/?id=' + row['msgid']}, data)))
+ body.append(('tr', rowdata))
+
+
+ # TODO print number of results
+
+ return ('table',
+ ('thead',
+ ('tr',
+ [('th', m.title()) for m in keys])),
+ ('tbody',
+ body
+ ))
+
+
+def search_page(q, by):
+ main_body = [search_field(q)]
+
+ if q:
+ main_body.append(search_result(q, by, False))
+
+ return render_document(page_base(title='Search',
+ body=main_body))
+
+
+def index_page():
+ ids = [
+ 'CAEzixGsw-4zJ8_ejK_vDgmcQ9s-MbBc-ho+HL4arV4a+ghOOPg@mail.gmail.com',
+ 'CA+pcBt-gLb0GtbFOjJ5_7Q_WXtqApVPQ9w-3O7GH=VqCEQat6g@mail.gmail.com',
+ ]
+
+ body = [('h1', "Sample ID's"),
+ ('ul',
+ [('li', ('a', {'href': '?' + urlencode({'id': id})}, id))
+ for id in ids]
+ ),
+ ]
+
+ return render_document(page_base(title='Mail index',
+ body=body))
+
+
+passwords: Passwords = password.Passwords(cast(os.PathLike, 'passwords.json'))
+
+
+app = Flask(__name__)
+login_manager.init_app(app)
+app.secret_key = 'THIS IS A RANDOM STRING'
+
+
+class User:
+ def __init__(self, username: str):
+ self._username = username
+ self._authenticated = False
+
+ # @property
+ def is_authenticated(self):
+ return self._authenticated
+
+ # @property
+ def is_active(self):
+ return True
+
+ # @property
+ def is_anonymous(self):
+ return False
+
+ # @property
+ def get_id(self):
+ return self._username
+
+
+
+
+@login_manager.user_loader
+def load_user(user_id):
+ # return User.get(user_id)
+ return User(user_id)
+
+
+@app.route('/')
+def index():
+ if not current_user.is_authenticated:
+ return redirect(url_for('login_page_', returnto=request.path))
+ if id := request.args.get('id'):
+ print("id =", id)
+ response = response_for(''.join(id).replace(' ', '+'))
+ else:
+ response = index_page()
+ return response
+
+
+@app.route('/search')
+@login_required
+def search_page_():
+ return search_page(request.args.get('q'),
+ request.args.get('by', None))
+
+
+@app.route('/login', methods=['GET'])
+def login_page_():
+ body = login_page(request.args.get('returnto'))
+ return render_document(page_base(title='Login', body=body))
+
+
+@app.route('/login', methods=['POST'])
+def login_form():
+ resp = redirect(request.args.get('returnto', url_for('index')))
+
+ username = request.form['username']
+ password = request.form['password']
+ user = User(username)
+ if passwords.validate(username, password):
+ login_user(user)
+ else:
+ flash('Invalid username or password')
+ return resp
+
+
+@app.route('/logout', methods=['POST'])
+@login_required
+def logout_form():
+ logout_user()
+ return redirect(url_for('index'))
+
+
+if __name__ == '__main__':
+ app.run(debug=True, port=8090)
diff --git a/mu4web/mu.py b/mu4web/mu.py
new file mode 100644
index 0000000..d52a362
--- /dev/null
+++ b/mu4web/mu.py
@@ -0,0 +1,128 @@
+"""
+Wrapper for the `mu` command line.
+"""
+
+import email.message
+import email.policy
+from email.parser import BytesParser
+import subprocess
+from subprocess import PIPE
+import xml.dom.minidom
+import xml.dom
+
+from typing import (
+ Literal,
+ Optional,
+ Union,
+)
+
+parser = BytesParser(policy=email.policy.default)
+
+
+def get_mail(id: str) -> email.message.Message:
+ """
+ Lookup email by Message-ID.
+
+ [Raises]
+ MuError
+ """
+ cmd = subprocess.run(['mu', 'find', '-u', f'i:{id}',
+ '--fields', 'l'],
+ stdout=PIPE)
+ filename = cmd.stdout.decode('UTF-8').strip()
+
+ if cmd.returncode != 0:
+ raise MuError(cmd.returncode)
+
+ with open(filename, "rb") as f:
+ mail = parser.parse(f)
+ return mail
+
+
+class MuError(Exception):
+ codes = {
+ 1: 'General Error',
+ 2: 'No Matches',
+ 4: 'Database is corrupted'
+ }
+
+ def __init__(self, returncode: int):
+ self.returncode: int = returncode
+ self.msg: str = MuError.codes.get(returncode, 'Unknown Error')
+
+ def __repr__(self):
+ return f'MuError({self.returncode}, "{self.msg}")'
+
+ def __str__(self):
+ return repr(self)
+
+
+Sortfield = Union[Literal['cc'],
+ Literal['c'],
+ Literal['bcc'],
+ Literal['h'],
+ Literal['date'],
+ Literal['d'],
+ Literal['from'],
+ Literal['f'],
+ Literal['maildir'],
+ Literal['m'],
+ Literal['msgid'],
+ Literal['i'],
+ Literal['prio'],
+ Literal['p'],
+ Literal['subject'],
+ Literal['s'],
+ Literal['to'],
+ Literal['t'],
+ Literal['list'],
+ Literal['v']]
+
+
+
+def mu_search(query, sortfield: Optional[Sortfield] = 'subject', reverse: bool = False) -> list[dict[str, str]]:
+ """
+ [Parameters]
+ query - Search query as per mu-find(1).
+ sortfield - Field to sort the values by
+ reverse - If the sort should be reversed
+
+ [Returns]
+ >>> {'from': 'Hugo Hörnquist <hugo@example.com>',
+ 'date': '1585678375',
+ 'size': '377',
+ 'msgid': 'SAMPLE-ID@localhost',
+ 'path': '/home/hugo/.local/var/mail/INBOX/cur/filename',
+ 'maildir': '/INBOX'
+ }
+ """
+
+ if not query:
+ raise ValueError('Query required for mu_search')
+ cmdline = ['mu', 'find',
+ '--format=xml',
+ query]
+ if sortfield:
+ cmdline.extend(['--sortfield', sortfield])
+ if reverse:
+ cmdline.append('--reverse')
+ print(cmdline)
+ cmd = subprocess.run(cmdline, capture_output=True)
+ if cmd.returncode != 0:
+ raise MuError(cmd.returncode)
+ dom = xml.dom.minidom.parseString(cmd.stdout.decode('UTF-8'))
+
+ message_list = []
+ messages = dom.childNodes[0]
+ assert messages.localName == 'messages'
+ for message in messages.childNodes:
+ msg_dict = {}
+ if message.nodeType != xml.dom.Node.ELEMENT_NODE:
+ continue
+ for kv in message.childNodes:
+ if kv.nodeType != xml.dom.Node.ELEMENT_NODE:
+ continue
+ msg_dict[kv.localName] = kv.childNodes[0].data
+ message_list.append(msg_dict)
+
+ return message_list
diff --git a/mu4web/password.py b/mu4web/password.py
new file mode 100755
index 0000000..ff0df21
--- /dev/null
+++ b/mu4web/password.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python3
+
+"""
+Simple password store, backed by a JSON file.
+
+Also contains an entry point for managing the store.
+"""
+
+import hashlib
+import json
+import os
+import random
+from typing import (
+ TypedDict,
+)
+
+
+def gen_salt(length: int = 10) -> str:
+ # TODO is this a sufficient source of randomness
+ return bytearray(random.randint(0, 256) for _ in range(length)).hex()
+
+
+# Manual list of entries, to stop someone from executing arbitrary
+# code by modyfying password database
+hash_methods = {
+ 'sha256': hashlib.sha256
+}
+
+
+class PasswordEntry(TypedDict):
+ hash: str
+ salt: str
+ # One of the keys of hash_methods
+ method: str
+
+
+class Passwords:
+ """
+ Simple password store.
+
+ [Parameters]
+ fname - Path of json file to load and store from.
+ """
+ def __init__(self, fname: os.PathLike):
+ self.fname = fname
+ self.db: dict[str, PasswordEntry]
+ try:
+ with open(fname) as f:
+ self.db = json.load(f)
+ except Exception:
+ self.db = {}
+
+ def save(self) -> None:
+ """Dump current data to disk."""
+ try:
+ with open(os.fspath(self.fname) + '.tmp', 'w') as f:
+ json.dump(self.db, f)
+ f.write('\n')
+ os.rename(os.fspath(self.fname) + '.tmp', self.fname)
+ except Exception as e:
+ print(f'Saving password failed {e}')
+
+ def add(self, username: str, password: str) -> None:
+ """Add (or modify) entry in store."""
+ if cur := self.db.get(username):
+ salt = cur['salt']
+ hashed = hashlib.sha256((salt + password).encode('UTF-8'))
+ self.db[username] = {
+ 'hash': hashed.hexdigest(),
+ 'salt': salt,
+ 'method': 'sha256',
+ }
+ else:
+ salt = gen_salt()
+ hashed = hashlib.sha256((salt + password).encode('UTF-8'))
+ self.db[username] = {
+ 'hash': hashed.hexdigest(),
+ 'salt': salt,
+ 'method': 'sha256'
+ }
+
+ def validate(self, username: str, password: str) -> bool:
+ """Check if user exists, and if it has a correct password."""
+ # These shall fail when key is missing
+ data = self.db[username]
+ proc = hash_methods[data['method']]
+ digest = proc((data['salt'] + password).encode('UTF-8')).hexdigest()
+ return data['hash'] == digest
+
+
+def main():
+ import argparse
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument('--file', default='passwords.json')
+
+ subparsers = parser.add_subparsers(dest='cmd')
+
+ add_parser = subparsers.add_parser('add')
+ add_parser.add_argument('username')
+ add_parser.add_argument('password')
+
+ val_parser = subparsers.add_parser('validate')
+ val_parser.add_argument('username')
+ val_parser.add_argument('password')
+
+ args = parser.parse_args()
+
+ passwords = Passwords(args.file)
+ if args.cmd == 'add':
+ passwords.add(args.username, args.password)
+ passwords.save()
+ elif args.cmd == 'validate':
+ print(passwords.validate(args.username, args.password))
+ else:
+ parser.print_help()
+
+
+if __name__ == '__main__':
+ main()