From cb8065767d6c71915731bba271a5e4219cf45162 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20H=C3=B6rnquist?= Date: Wed, 26 Oct 2022 16:12:40 +0200 Subject: Introduce types. --- main.py | 203 ++++++++++++++++++++++++++++++++++++++++++++++++------------ password.py | 59 +++++++++++------- 2 files changed, 201 insertions(+), 61 deletions(-) diff --git a/main.py b/main.py index afd74f5..1ee89de 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,8 @@ import subprocess import email +import email.message from email.message import EmailMessage +from email.headerregistry import Address from subprocess import PIPE import html from email.parser import BytesParser @@ -9,14 +11,28 @@ from urllib.parse import urlparse, urlencode, parse_qs from http.cookies import BaseCookie import http.cookies import password +from password import Passwords from uuid import uuid4 +import xml.dom.minidom +import xml.dom +import os +from typing import ( + TypeAlias, + Union, + Callable, + Optional, + cast, +) from http.server import HTTPServer, BaseHTTPRequestHandler +HTML: TypeAlias = Union[tuple, list, Callable[[], str], None, + str, int, float] + parser = BytesParser(policy=email.policy.default) -def get_mail(id): +def get_mail(id: str) -> email.message.Message: cmd = subprocess.run(['mu', 'find', '-u', f'i:{id}', '--fields', 'l'], stdout=PIPE) @@ -27,22 +43,21 @@ def get_mail(id): return mail -def header_format(key, value): +def mailto(addr: str) -> HTML: + return ('a', {'href': f'mailto:{addr}'}, addr) + + +def format_email(addr: Address) -> list[HTML]: + mail_addr = f'{addr.username}@{addr.domain}' + return [addr.display_name, ' <', mailto(mail_addr), '>'] + + +def header_format(key: str, value) -> HTML: if key in ['to', 'cc', 'bcc']: - lst = [] - for addr in value.addresses: - mail_addr = f'{addr.username}@{addr.domain}' - addr = ' <', ('a', {'href': f'mailto:{mail_addr}'}, mail_addr), '>' - lst.append(('li', str(addr.display_name)) + addr) - return ('ul', lst) + return ('ul', *[('li', *format_email(addr)) + for addr in value.addresses]) elif key == 'from': - value = value.addresses[0] - mail_addr = f'{value.username}@{value.domain}' - return [str(value.display_name), - ' <', - ('a', {'href': f'mailto:{mail_addr}'}, mail_addr), - '>', - ] + return format_email(value.addresses[0]) elif key == 'in-reply-to': # type(value) == email.headerregistry._UnstructuredHeader id = str(value).strip("<>") @@ -54,7 +69,7 @@ def header_format(key, value): standalones = ['hr', 'br', 'meta'] -def _render_document(document): +def _render_document(document: HTML) -> str: if type(document) == tuple: tag, *body = document if body and type(body[0]) == dict: @@ -74,16 +89,18 @@ def _render_document(document): return str(document()) elif type(document) == list: return ''.join(_render_document(e) for e in document) + elif document is None: + return '' else: # strings, and everything else return html.escape(str(document)) -def render_document(document): +def render_document(document: HTML) -> str: return '\n' + _render_document(document) -style = lambda: """ +style: HTML = lambda: """ nav { display: block; width: 100%; @@ -110,14 +127,15 @@ style = lambda: """ """ -def attachement_tree(mail): +def attachement_tree(mail: EmailMessage) -> HTML: ct = mail.get_content_type() fn = mail.get_filename() children = [] for child in mail.iter_parts(): - children.append(attachement_tree(child)) + children.append(attachement_tree(cast(EmailMessage, child))) + content: HTML if children: content = ('ul', *children) else: @@ -132,7 +150,7 @@ def attachement_tree(mail): # -------------------------------------------------- -def login_page(returnto=None): +def login_page(returnto: Optional[str] = None) -> HTML: return ('form', {'action': '/login', 'method': 'POST'}, ('input', {'name': 'username', 'placeholder': 'Username'}), ('input', {'type': 'password', @@ -146,17 +164,19 @@ def login_page(returnto=None): ) -def user_info(username): +def user_info(username: str) -> HTML: return [('span', username), ('form', {'action': '/logout', 'method': 'POST'}, ('input', {'type': 'submit', 'value': 'Logga ut'}))] -def login_prompt(): +def login_prompt() -> HTML: return ('a', {'href': '/login'}, 'Logga in') -def page_base(title=None, body=[], username=None): +def page_base(title: Optional[str] = None, + body: HTML = [], + username: Optional[str] = None) -> HTML: return ('html', ('head', ('meta', {'charset': 'utf-8'}), @@ -170,9 +190,9 @@ def page_base(title=None, body=[], username=None): body)) -def response_for(id, username=None): +def response_for(id: str, username: Optional[str] = None) -> str: - mail = get_mail(id) + mail = cast(EmailMessage, get_mail(id)) headers = {} for (key, value) in mail.items(): @@ -186,11 +206,14 @@ def response_for(id, username=None): ('dd', header_format(h.lower(), x))] body_part = mail.get_body(preferencelist=('html', 'plain')) + if not body_part: + raise ValueError("No suitable body in email") ct = body_part.get_content_type() + body: HTML if ct == 'text/html': - body = lambda: body_part.get_content() + body = lambda: cast(EmailMessage, body_part).get_content() else: - body = ('pre', body_part.get_content()) + body = ('pre', cast(EmailMessage, body_part).get_content()) if t := headers.get('subject'): title = f'Mail — {t}' @@ -210,6 +233,91 @@ def response_for(id, username=None): return html_str +def search_field(q: str) -> HTML: + return ('form', {'action': '/search', 'method': 'GET'}, + ('label', {'for': 'search'}, + 'Mu Search Query'), + ('textarea', {'id': 'search', 'name': 'q'}, + q), + ('input', {'type': 'Submit'})) + + +class MuError(Exception): + codes = { + 1: 'General Error', + 2: 'No Matches', + 4: 'Database is corrupted' + } + + def __init__(self, returncode: int): + self.returncode: int = returncode + self.msg: str = MuError.codes.get(returncode, 'Unknown Error') + + def __repr__(self): + return f'MuError({self.returncode}, "{self.msg}")' + + +def mu_search(query, sortfield='subject', reverse=False): + if not query: + raise ValueError('Query required for mu_search') + cmdline = ['mu', 'find', + '--format=xml', + query, + '--sortfield', sortfield] + if reverse: + cmdline.append('--reverse') + cmd = subprocess.Popen(cmdline, stdout=subprocess.PIPE) + dom = xml.dom.minidom.parse(cmd.stdout) + if returncode := cmd.wait(): + raise MuError(returncode) + + message_list = [] + messages = dom.childNodes[0] + assert messages.localName == 'messages' + for message in messages.childNodes: + msg_dict = {} + if message.nodeType != xml.dom.Node.ELEMENT_NODE: + continue + for kv in message.childNodes: + if kv.nodeType != xml.dom.Node.ELEMENT_NODE: + continue + msg_dict[kv.localName] = kv.childNodes[0].data + message_list.append(msg_dict) + + return message_list + + +def search_result(q, by, reverse): + + keys = ['From', 'To', 'Subject', 'Date', 'Size', 'Maildir', 'Msgid'] + + rows = mu_search(q, by, reverse) + body = [] + for row in rows: + rowdata = ['tr'] + for key in keys: + rowdata.append(row[key]) + body.append(rowdata) + + return ('table', + ('thead', + ('tr', + [('th', m) for m in keys])), + ('tbody', + body + )) + + +def search_page(q, by, username=None): + main_body = [search_field(q)] + + if q: + main_body.append(search_result(q, by, False)) + + return render_document(page_base(title='Serach', + body=main_body, + username=username)) + def index_page(username): ids = [ @@ -230,27 +338,28 @@ def index_page(username): )) -valid_session_cookies = {} +valid_session_cookies: dict[str, str] = {} -def validate_session_cookie(cookie): +def validate_session_cookie(cookie: http.cookies.Morsel) -> Optional[str]: return valid_session_cookies.get(cookie.value) -def remove_session_cookie(cookie): +def remove_session_cookie(cookie: http.cookies.Morsel) -> http.cookies.Morsel: if valid_session_cookies.get(cookie.value): del valid_session_cookies[cookie.value] cookie.set(cookie.key, '', '') - cookie.expire = 0 + # TODO how to expire cookie + # cookie.expires = 0 return cookie -passwords = password.Passwords('passwords.json') +passwords: Passwords = password.Passwords(cast(os.PathLike, 'passwords.json')) -def new_session_cookie(username): +def new_session_cookie(username: str) -> http.cookies.Morsel: global valid_session_cookies - m = http.cookies.Morsel() + m: http.cookies.Morsel = http.cookies.Morsel() unique = str(uuid4()) valid_session_cookies[unique] = username m.set('session', unique, unique) @@ -277,14 +386,32 @@ class Handler(BaseHTTPRequestHandler): else: if id := query.get('id'): print("id =", id) - response = response_for(''.join(id).replace(' ', '+'), logged_in) + response = response_for(''.join(id).replace(' ', '+'), + logged_in) self.send_response(200) else: response = index_page(logged_in) self.send_response(200) response = response.encode('UTF-8') - self.send_header('Content-Type', 'text/html') + self.send_header('Content-Type', 'text/html; charset=UTF-8') + self.send_header('Content-Length', len(response)) + self.end_headers() + self.wfile.write(response) + + elif url.path == '/search': + if not logged_in: + self.send_response(307) + q = urlencode({'returnto': self.path}) + self.send_header('location', '/login?' + q) + self.end_headers() + else: + response = search_page(query.get('q'), + query.get('by'), + logged_in) + self.send_response(200) + response = response.encode('UTF-8') + self.send_header('Content-Type', 'text/html; charset=UTF-8') self.send_header('Content-Length', len(response)) self.end_headers() self.wfile.write(response) @@ -295,7 +422,7 @@ class Handler(BaseHTTPRequestHandler): self.send_response(200) content = render_document(page_base(title='Login', body=body)) content = content.encode('UTF-8') - self.send_header('Content-Type', 'text/html') + self.send_header('Content-Type', 'text/html; charset=UTF-8') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) diff --git a/password.py b/password.py index 5e1f37e..16192be 100755 --- a/password.py +++ b/password.py @@ -3,59 +3,73 @@ import hashlib import json import os import random +from typing import ( + TypedDict, +) -def gen_salt(length=10): + +def gen_salt(length: int = 10) -> str: # TODO is this a sufficient source of randomness return bytearray(random.randint(0, 256) for _ in range(length)).hex() + # Manual list of entries, to stop someone from executing arbitrary # code by modyfying password database hash_methods = { - 'sha256': hashlib.sha256 - } + 'sha256': hashlib.sha256 +} + + +class PasswordEntry(TypedDict): + hash: str + salt: str + # One of the keys of hash_methods + method: str + class Passwords: - def __init__(self, fname): + def __init__(self, fname: os.PathLike): self.fname = fname + self.db: dict[str, PasswordEntry] try: with open(fname) as f: self.db = json.load(f) - except: + except Exception: self.db = {} - def save(self): + def save(self) -> None: try: - with open(self.fname + '.tmp', 'w') as f: + with open(os.fspath(self.fname) + '.tmp', 'w') as f: json.dump(self.db, f) f.write('\n') - os.rename(self.fname + '.tmp', self.fname) - except e: - print('Saving password failed {e}') + os.rename(os.fspath(self.fname) + '.tmp', self.fname) + except Exception as e: + print(f'Saving password failed {e}') - def add(self, username, password): + def add(self, username: str, password: str) -> None: if cur := self.db.get(username): salt = cur['salt'] hashed = hashlib.sha256((salt + password).encode('UTF-8')) self.db[username] = { - 'hash': hashed.hexdigest(), - 'salt': salt, - 'method': 'sha256', - } + 'hash': hashed.hexdigest(), + 'salt': salt, + 'method': 'sha256', + } else: salt = gen_salt() hashed = hashlib.sha256((salt + password).encode('UTF-8')) self.db[username] = { - 'hash': hashed.hexdigest(), - 'salt': salt, - 'method': 'sha256' - } + 'hash': hashed.hexdigest(), + 'salt': salt, + 'method': 'sha256' + } - def validate(self, username, password): + def validate(self, username: str, password: str) -> bool: # These shall fail when key is missing data = self.db[username] proc = hash_methods[data['method']] - return data['hash'] == proc((data['salt'] + password).encode('UTF-8')).hexdigest() - + digest = proc((data['salt'] + password).encode('UTF-8')).hexdigest() + return data['hash'] == digest if __name__ == '__main__': @@ -77,7 +91,6 @@ if __name__ == '__main__': args = parser.parse_args() - passwords = Passwords(args.file) if args.cmd == 'add': passwords.add(args.username, args.password) -- cgit v1.2.3