aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHugo Hörnquist <hugo@lysator.liu.se>2022-10-26 16:12:40 +0200
committerHugo Hörnquist <hugo@lysator.liu.se>2022-10-26 16:12:40 +0200
commitcb8065767d6c71915731bba271a5e4219cf45162 (patch)
tree48240b6e61962164213b37057d5ec9f26f972035
parentFix UID's with + in them. (diff)
downloadmu4web-cb8065767d6c71915731bba271a5e4219cf45162.tar.gz
mu4web-cb8065767d6c71915731bba271a5e4219cf45162.tar.xz
Introduce types.
-rw-r--r--main.py203
-rwxr-xr-xpassword.py59
2 files changed, 201 insertions, 61 deletions
diff --git a/main.py b/main.py
index afd74f5..1ee89de 100644
--- a/main.py
+++ b/main.py
@@ -1,6 +1,8 @@
import subprocess
import email
+import email.message
from email.message import EmailMessage
+from email.headerregistry import Address
from subprocess import PIPE
import html
from email.parser import BytesParser
@@ -9,14 +11,28 @@ from urllib.parse import urlparse, urlencode, parse_qs
from http.cookies import BaseCookie
import http.cookies
import password
+from password import Passwords
from uuid import uuid4
+import xml.dom.minidom
+import xml.dom
+import os
+from typing import (
+ TypeAlias,
+ Union,
+ Callable,
+ Optional,
+ cast,
+)
from http.server import HTTPServer, BaseHTTPRequestHandler
+HTML: TypeAlias = Union[tuple, list, Callable[[], str], None,
+ str, int, float]
+
parser = BytesParser(policy=email.policy.default)
-def get_mail(id):
+def get_mail(id: str) -> email.message.Message:
cmd = subprocess.run(['mu', 'find', '-u', f'i:{id}',
'--fields', 'l'],
stdout=PIPE)
@@ -27,22 +43,21 @@ def get_mail(id):
return mail
-def header_format(key, value):
+def mailto(addr: str) -> HTML:
+ return ('a', {'href': f'mailto:{addr}'}, addr)
+
+
+def format_email(addr: Address) -> list[HTML]:
+ mail_addr = f'{addr.username}@{addr.domain}'
+ return [addr.display_name, ' <', mailto(mail_addr), '>']
+
+
+def header_format(key: str, value) -> HTML:
if key in ['to', 'cc', 'bcc']:
- lst = []
- for addr in value.addresses:
- mail_addr = f'{addr.username}@{addr.domain}'
- addr = ' <', ('a', {'href': f'mailto:{mail_addr}'}, mail_addr), '>'
- lst.append(('li', str(addr.display_name)) + addr)
- return ('ul', lst)
+ return ('ul', *[('li', *format_email(addr))
+ for addr in value.addresses])
elif key == 'from':
- value = value.addresses[0]
- mail_addr = f'{value.username}@{value.domain}'
- return [str(value.display_name),
- ' <',
- ('a', {'href': f'mailto:{mail_addr}'}, mail_addr),
- '>',
- ]
+ return format_email(value.addresses[0])
elif key == 'in-reply-to':
# type(value) == email.headerregistry._UnstructuredHeader
id = str(value).strip("<>")
@@ -54,7 +69,7 @@ def header_format(key, value):
standalones = ['hr', 'br', 'meta']
-def _render_document(document):
+def _render_document(document: HTML) -> str:
if type(document) == tuple:
tag, *body = document
if body and type(body[0]) == dict:
@@ -74,16 +89,18 @@ def _render_document(document):
return str(document())
elif type(document) == list:
return ''.join(_render_document(e) for e in document)
+ elif document is None:
+ return ''
else:
# strings, and everything else
return html.escape(str(document))
-def render_document(document):
+def render_document(document: HTML) -> str:
return '<!doctype html>\n' + _render_document(document)
-style = lambda: """
+style: HTML = lambda: """
nav {
display: block;
width: 100%;
@@ -110,14 +127,15 @@ style = lambda: """
"""
-def attachement_tree(mail):
+def attachement_tree(mail: EmailMessage) -> HTML:
ct = mail.get_content_type()
fn = mail.get_filename()
children = []
for child in mail.iter_parts():
- children.append(attachement_tree(child))
+ children.append(attachement_tree(cast(EmailMessage, child)))
+ content: HTML
if children:
content = ('ul', *children)
else:
@@ -132,7 +150,7 @@ def attachement_tree(mail):
# --------------------------------------------------
-def login_page(returnto=None):
+def login_page(returnto: Optional[str] = None) -> HTML:
return ('form', {'action': '/login', 'method': 'POST'},
('input', {'name': 'username', 'placeholder': 'Username'}),
('input', {'type': 'password',
@@ -146,17 +164,19 @@ def login_page(returnto=None):
)
-def user_info(username):
+def user_info(username: str) -> HTML:
return [('span', username),
('form', {'action': '/logout', 'method': 'POST'},
('input', {'type': 'submit', 'value': 'Logga ut'}))]
-def login_prompt():
+def login_prompt() -> HTML:
return ('a', {'href': '/login'}, 'Logga in')
-def page_base(title=None, body=[], username=None):
+def page_base(title: Optional[str] = None,
+ body: HTML = [],
+ username: Optional[str] = None) -> HTML:
return ('html',
('head',
('meta', {'charset': 'utf-8'}),
@@ -170,9 +190,9 @@ def page_base(title=None, body=[], username=None):
body))
-def response_for(id, username=None):
+def response_for(id: str, username: Optional[str] = None) -> str:
- mail = get_mail(id)
+ mail = cast(EmailMessage, get_mail(id))
headers = {}
for (key, value) in mail.items():
@@ -186,11 +206,14 @@ def response_for(id, username=None):
('dd', header_format(h.lower(), x))]
body_part = mail.get_body(preferencelist=('html', 'plain'))
+ if not body_part:
+ raise ValueError("No suitable body in email")
ct = body_part.get_content_type()
+ body: HTML
if ct == 'text/html':
- body = lambda: body_part.get_content()
+ body = lambda: cast(EmailMessage, body_part).get_content()
else:
- body = ('pre', body_part.get_content())
+ body = ('pre', cast(EmailMessage, body_part).get_content())
if t := headers.get('subject'):
title = f'Mail — {t}'
@@ -210,6 +233,91 @@ def response_for(id, username=None):
return html_str
+def search_field(q: str) -> HTML:
+ return ('form', {'action': '/search', 'method': 'GET'},
+ ('label', {'for': 'search'},
+ 'Mu Search Query'),
+ ('textarea', {'id': 'search', 'name': 'q'},
+ q),
+ ('input', {'type': 'Submit'}))
+
+
+class MuError(Exception):
+ codes = {
+ 1: 'General Error',
+ 2: 'No Matches',
+ 4: 'Database is corrupted'
+ }
+
+ def __init__(self, returncode: int):
+ self.returncode: int = returncode
+ self.msg: str = MuError.codes.get(returncode, 'Unknown Error')
+
+ def __repr__(self):
+ return f'MuError({self.returncode}, "{self.msg}")'
+
+
+def mu_search(query, sortfield='subject', reverse=False):
+ if not query:
+ raise ValueError('Query required for mu_search')
+ cmdline = ['mu', 'find',
+ '--format=xml',
+ query,
+ '--sortfield', sortfield]
+ if reverse:
+ cmdline.append('--reverse')
+ cmd = subprocess.Popen(cmdline, stdout=subprocess.PIPE)
+ dom = xml.dom.minidom.parse(cmd.stdout)
+ if returncode := cmd.wait():
+ raise MuError(returncode)
+
+ message_list = []
+ messages = dom.childNodes[0]
+ assert messages.localName == 'messages'
+ for message in messages.childNodes:
+ msg_dict = {}
+ if message.nodeType != xml.dom.Node.ELEMENT_NODE:
+ continue
+ for kv in message.childNodes:
+ if kv.nodeType != xml.dom.Node.ELEMENT_NODE:
+ continue
+ msg_dict[kv.localName] = kv.childNodes[0].data
+ message_list.append(msg_dict)
+
+ return message_list
+
+
+def search_result(q, by, reverse):
+
+ keys = ['From', 'To', 'Subject', 'Date', 'Size', 'Maildir', 'Msgid']
+
+ rows = mu_search(q, by, reverse)
+ body = []
+ for row in rows:
+ rowdata = ['tr']
+ for key in keys:
+ rowdata.append(row[key])
+ body.append(rowdata)
+
+ return ('table',
+ ('thead',
+ ('tr',
+ [('th', m) for m in keys])),
+ ('tbody',
+ body
+ ))
+
+
+def search_page(q, by, username=None):
+ main_body = [search_field(q)]
+
+ if q:
+ main_body.append(search_result(q, by, False))
+
+ return render_document(page_base(title='Serach',
+ body=main_body,
+ username=username))
+
def index_page(username):
ids = [
@@ -230,27 +338,28 @@ def index_page(username):
))
-valid_session_cookies = {}
+valid_session_cookies: dict[str, str] = {}
-def validate_session_cookie(cookie):
+def validate_session_cookie(cookie: http.cookies.Morsel) -> Optional[str]:
return valid_session_cookies.get(cookie.value)
-def remove_session_cookie(cookie):
+def remove_session_cookie(cookie: http.cookies.Morsel) -> http.cookies.Morsel:
if valid_session_cookies.get(cookie.value):
del valid_session_cookies[cookie.value]
cookie.set(cookie.key, '', '')
- cookie.expire = 0
+ # TODO how to expire cookie
+ # cookie.expires = 0
return cookie
-passwords = password.Passwords('passwords.json')
+passwords: Passwords = password.Passwords(cast(os.PathLike, 'passwords.json'))
-def new_session_cookie(username):
+def new_session_cookie(username: str) -> http.cookies.Morsel:
global valid_session_cookies
- m = http.cookies.Morsel()
+ m: http.cookies.Morsel = http.cookies.Morsel()
unique = str(uuid4())
valid_session_cookies[unique] = username
m.set('session', unique, unique)
@@ -277,14 +386,32 @@ class Handler(BaseHTTPRequestHandler):
else:
if id := query.get('id'):
print("id =", id)
- response = response_for(''.join(id).replace(' ', '+'), logged_in)
+ response = response_for(''.join(id).replace(' ', '+'),
+ logged_in)
self.send_response(200)
else:
response = index_page(logged_in)
self.send_response(200)
response = response.encode('UTF-8')
- self.send_header('Content-Type', 'text/html')
+ self.send_header('Content-Type', 'text/html; charset=UTF-8')
+ self.send_header('Content-Length', len(response))
+ self.end_headers()
+ self.wfile.write(response)
+
+ elif url.path == '/search':
+ if not logged_in:
+ self.send_response(307)
+ q = urlencode({'returnto': self.path})
+ self.send_header('location', '/login?' + q)
+ self.end_headers()
+ else:
+ response = search_page(query.get('q'),
+ query.get('by'),
+ logged_in)
+ self.send_response(200)
+ response = response.encode('UTF-8')
+ self.send_header('Content-Type', 'text/html; charset=UTF-8')
self.send_header('Content-Length', len(response))
self.end_headers()
self.wfile.write(response)
@@ -295,7 +422,7 @@ class Handler(BaseHTTPRequestHandler):
self.send_response(200)
content = render_document(page_base(title='Login', body=body))
content = content.encode('UTF-8')
- self.send_header('Content-Type', 'text/html')
+ self.send_header('Content-Type', 'text/html; charset=UTF-8')
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
diff --git a/password.py b/password.py
index 5e1f37e..16192be 100755
--- a/password.py
+++ b/password.py
@@ -3,59 +3,73 @@ import hashlib
import json
import os
import random
+from typing import (
+ TypedDict,
+)
-def gen_salt(length=10):
+
+def gen_salt(length: int = 10) -> str:
# TODO is this a sufficient source of randomness
return bytearray(random.randint(0, 256) for _ in range(length)).hex()
+
# Manual list of entries, to stop someone from executing arbitrary
# code by modyfying password database
hash_methods = {
- 'sha256': hashlib.sha256
- }
+ 'sha256': hashlib.sha256
+}
+
+
+class PasswordEntry(TypedDict):
+ hash: str
+ salt: str
+ # One of the keys of hash_methods
+ method: str
+
class Passwords:
- def __init__(self, fname):
+ def __init__(self, fname: os.PathLike):
self.fname = fname
+ self.db: dict[str, PasswordEntry]
try:
with open(fname) as f:
self.db = json.load(f)
- except:
+ except Exception:
self.db = {}
- def save(self):
+ def save(self) -> None:
try:
- with open(self.fname + '.tmp', 'w') as f:
+ with open(os.fspath(self.fname) + '.tmp', 'w') as f:
json.dump(self.db, f)
f.write('\n')
- os.rename(self.fname + '.tmp', self.fname)
- except e:
- print('Saving password failed {e}')
+ os.rename(os.fspath(self.fname) + '.tmp', self.fname)
+ except Exception as e:
+ print(f'Saving password failed {e}')
- def add(self, username, password):
+ def add(self, username: str, password: str) -> None:
if cur := self.db.get(username):
salt = cur['salt']
hashed = hashlib.sha256((salt + password).encode('UTF-8'))
self.db[username] = {
- 'hash': hashed.hexdigest(),
- 'salt': salt,
- 'method': 'sha256',
- }
+ 'hash': hashed.hexdigest(),
+ 'salt': salt,
+ 'method': 'sha256',
+ }
else:
salt = gen_salt()
hashed = hashlib.sha256((salt + password).encode('UTF-8'))
self.db[username] = {
- 'hash': hashed.hexdigest(),
- 'salt': salt,
- 'method': 'sha256'
- }
+ 'hash': hashed.hexdigest(),
+ 'salt': salt,
+ 'method': 'sha256'
+ }
- def validate(self, username, password):
+ def validate(self, username: str, password: str) -> bool:
# These shall fail when key is missing
data = self.db[username]
proc = hash_methods[data['method']]
- return data['hash'] == proc((data['salt'] + password).encode('UTF-8')).hexdigest()
-
+ digest = proc((data['salt'] + password).encode('UTF-8')).hexdigest()
+ return data['hash'] == digest
if __name__ == '__main__':
@@ -77,7 +91,6 @@ if __name__ == '__main__':
args = parser.parse_args()
-
passwords = Passwords(args.file)
if args.cmd == 'add':
passwords.add(args.username, args.password)