aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHugo Hörnquist <hugo@lysator.liu.se>2022-04-07 17:02:37 +0200
committerHugo Hörnquist <hugo@lysator.liu.se>2022-04-07 17:02:37 +0200
commit2e8b74f9aa2fbfe4f17611719cd648125bf92ddc (patch)
treeb2c57e4ea12eb73f7b54275c0a51a47909d24724
downloadmu4web-2e8b74f9aa2fbfe4f17611719cd648125bf92ddc.tar.gz
mu4web-2e8b74f9aa2fbfe4f17611719cd648125bf92ddc.tar.xz
Initial commit.
-rw-r--r--main.py350
-rwxr-xr-xpassword.py81
2 files changed, 431 insertions, 0 deletions
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..ce73c39
--- /dev/null
+++ b/main.py
@@ -0,0 +1,350 @@
+import subprocess
+import email
+from email.message import EmailMessage
+from subprocess import PIPE
+import html
+from email.parser import BytesParser
+import email.policy
+from urllib.parse import urlparse, urlencode, parse_qs
+from http.cookies import BaseCookie
+import http.cookies
+import password
+from uuid import uuid4
+
+from http.server import HTTPServer, BaseHTTPRequestHandler
+
+parser = BytesParser(policy=email.policy.default)
+
+def get_mail(id):
+ cmd = subprocess.run(['mu', 'find', '-u', f'i:{id}',
+ '--fields', 'l'],
+ stdout=PIPE)
+ filename = cmd.stdout.decode('UTF-8').strip()
+
+ with open(filename, "rb") as f:
+ mail = parser.parse(f)
+ return mail
+
+
+def header_format(key, value):
+ if key in ['to', 'cc', 'bcc']:
+ lst = []
+ for addr in value.addresses:
+ mail_addr = f'{addr.username}@{addr.domain}'
+ lst.append(('li',
+ str(addr.display_name),
+ ' <',
+ ('a', {'href': f'mail:{mail_addr}'}, mail_addr),
+ '>',
+ ))
+ return ('ul', lst)
+ elif key == 'from':
+ value = value.addresses[0]
+ mail_addr = f'{value.username}@{value.domain}'
+ return [str(value.display_name),
+ ' <',
+ ('a', {'href': f'mail:{mail_addr}'}, mail_addr),
+ '>',
+ ]
+ elif key == 'in-reply-to':
+ # type(value) == email.headerregistry._UnstructuredHeader
+ id = str(value).strip("<>")
+ return ['<', ('a', {'href': '?'+urlencode({'id': id})}, id), '>']
+ else:
+ return value
+
+standalones = ['hr', 'br', 'meta']
+
+def _render_document(document):
+ if type(document) == tuple:
+ tag, *body = document
+ if body and type(body[0]) == dict:
+ print(body[0])
+ attributes = ' '.join(f'{a}="{html.escape(b)}"'
+ for (a, b) in body[0].items())
+ body = body[1:]
+ start = f'<{tag} {attributes}>'
+ else:
+ start = f'<{tag}>'
+ if tag in standalones:
+ return start
+ else:
+ items = ''.join(_render_document(b) for b in body)
+ return start + f'{items}</{tag}>'
+ elif callable(document):
+ return str(document())
+ elif type(document) == list:
+ return ''.join(_render_document(e) for e in document)
+ else:
+ # strings, and everything else
+ return html.escape(str(document))
+
+def render_document(document):
+ return '<!doctype html>\n' + _render_document(document)
+
+style = lambda: """
+
+ nav {
+ display: block;
+ width: 100%;
+ height: 4em;
+ color: white;
+ background-color: darkgrey;
+ }
+
+ dl {
+ display: grid;
+ grid-template-columns: 10ch auto;
+ }
+ dt {
+ font-weight: bold;
+ }
+ dd {
+ font-family: mono;
+ font-size: 80%;
+ }
+ dd > * {
+ margin: 0;
+ }
+
+ """
+def attachement_tree(mail):
+ ct = mail.get_content_type()
+ fn = mail.get_filename()
+
+ children = []
+ for child in mail.iter_parts():
+ children.append(attachement_tree(child))
+
+ if children:
+ content = ('ul', *children)
+ else:
+ content = []
+
+ if fn:
+ body = f'{ct} {fn}'
+ else:
+ body = str(ct)
+ return ('li', body, content)
+
+# --------------------------------------------------
+
+
+def login_page(returnto=None):
+ return ('form', { 'action': '/login', 'method': 'POST' },
+ ('input', {'name': 'username', 'placeholder': 'Username'}),
+ ('input', {'type': 'password',
+ 'placeholder': 'Password',
+ 'name': 'password'}),
+ ('input', {'type': 'hidden',
+ 'name': 'returnto',
+ 'value': returnto})
+ if returnto else [],
+ ('input', {'type': 'submit'}),
+ )
+
+def user_info(username):
+ return [('span', username),
+ ('form', {'action': '/logout', 'method': 'POST'},
+ ('input', {'type': 'submit', 'value': 'Logga ut'}))]
+
+def login_prompt():
+ return ('a', {'href': '/login'}, 'Logga in')
+
+def page_base(title=None, body=[], username=None):
+ return ('html',
+ ('head',
+ ('meta', { 'charset': 'utf-8' }),
+ ('title', title),
+ ('style', style),
+ ),
+ ('body',
+ ('nav',
+ user_info(username) if username else login_prompt()
+ ),
+ body))
+
+
+def response_for(id, username=None):
+
+ mail = get_mail(id)
+
+
+ headers = {}
+ for (key, value) in mail.items():
+ headers[key.lower()] = value
+
+ head = []
+ for h in ['date', 'from', 'to', 'cc', 'bcc', 'subject', 'x-original-to',
+ 'in-reply-to']:
+ if x := headers.get(h.lower()):
+ head += [('dt', h.title()),
+ ('dd', header_format(h.lower(), x))]
+
+# print()
+ body_part = mail.get_body(preferencelist=('html','plain'))
+ ct = body_part.get_content_type()
+ if ct == 'text/html':
+ body = lambda: body_part.get_content()
+ else:
+ body = ('pre', body_part.get_content())
+
+ if t := headers.get('subject'):
+ title = f'Mail — {t}'
+ else:
+ title = 'Mail'
+
+ main_body = [ ('dl', *head),
+ ('hr',),
+ ('main', body),
+ ('hr',),
+ ('ul', attachement_tree(mail)),]
+ html_str = render_document(
+ page_base(title=title, body=main_body, username=username))
+
+ return html_str
+
+
+
+
+def index_page(username):
+ ids = [
+ 'CAEzixGsw-4zJ8_ejK_vDgmcQ9s-MbBc-ho+HL4arV4a+ghOOPg@mail.gmail.com',
+ 'CA+pcBt-gLb0GtbFOjJ5_7Q_WXtqApVPQ9w-3O7GH=VqCEQat6g@mail.gmail.com',
+ ]
+
+ body = [
+ ('h1', "Sample ID's"),
+ ('ul',
+ [('li', ('a', {'href': '?'+urlencode({'id': id})}, id))
+ for id in ids]
+ ),
+ ]
+
+ return render_document(
+ page_base(
+ title='Mail index',
+ body=body,
+ username=username
+ ))
+
+valid_session_cookies = {}
+
+def validate_session_cookie(cookie):
+ return valid_session_cookies.get(cookie.value)
+
+def remove_session_cookie(cookie):
+ if valid_session_cookies.get(cookie.value):
+ del valid_session_cookies[cookie.value]
+ cookie.set(cookie.key, '', '')
+ cookie.expire = 0
+ return cookie
+
+passwords = password.Passwords('passwords.json')
+
+def new_session_cookie(username):
+ global valid_session_cookies
+ m = http.cookies.Morsel()
+ unique = str(uuid4())
+ valid_session_cookies[unique] = username
+ m.set('session', unique, unique)
+ return m
+
+class Handler(BaseHTTPRequestHandler):
+ def do_GET(self):
+ url = urlparse(self.path)
+ query = parse_qs(url.query)
+ # print(type(self.headers))
+
+ cookies = BaseCookie(self.headers.get('Cookie'))
+ logged_in = None
+ if c := cookies.get('session'):
+ logged_in = validate_session_cookie(c)
+
+ if url.path == '/':
+ if not logged_in:
+ self.send_response(307)
+ q = urlencode({'returnto': self.path})
+ self.send_header('location', '/login?' + q)
+ self.end_headers()
+ else:
+ if id := query.get('id'):
+ response = response_for(''.join(id), logged_in)
+ self.send_response(200)
+ else:
+ response = index_page(logged_in)
+ self.send_response(200)
+
+ response = response.encode('UTF-8')
+ self.send_header('Content-Type', 'text/html')
+ self.send_header('Content-Length', len(response))
+ self.end_headers()
+ self.wfile.write(response)
+
+ elif url.path == '/login':
+ if not logged_in:
+ body = login_page(''.join(query.get('returnto')))
+ self.send_response(200)
+ content = render_document(page_base(title='Login', body=body))
+ content = content.encode('UTF-8')
+ self.send_header('Content-Type', 'text/html')
+ self.send_header('Content-Length', len(content))
+ self.end_headers()
+ self.wfile.write(content)
+ else:
+ # TODO do something sensible here
+ pass
+
+
+ def do_POST(self):
+ url = urlparse(self.path)
+ query = parse_qs(url.query)
+ cookies = BaseCookie(self.headers.get('Cookie'))
+ logged_in = None
+ if c := cookies.get('session'):
+ logged_in = validate_session_cookie(c)
+ print(cookies)
+ print(valid_session_cookies)
+
+ if url.path == '/login':
+ cl = content_length = self.headers.get('content-length')
+ data = parse_qs(self.rfile.read(int(cl)))
+ username = b''.join(data[b'username']).decode('UTF-8')
+ password = b''.join(data[b'password']).decode('UTF-8')
+ if passwords.validate(username, password):
+ cookie = new_session_cookie(username)
+ self.send_response(302)
+ self.send_header('set-cookie', cookie.OutputString())
+ if ret := data.get(b'returnto'):
+ returnto = b''.join(ret).decode('UTF-8')
+ self.send_header('location', returnto)
+ else:
+ self.send_header('location', '/')
+ else:
+ self.send_response(302)
+
+ self.end_headers()
+
+ if url.path == '/logout':
+ if not logged_in:
+ self.send_response(302)
+ self.send_header('Location', '/')
+ self.end_headers()
+ return
+ cookie = remove_session_cookie(cookies.get('session'))
+ self.send_response(302)
+ self.send_header('set-cookie', cookie)
+ # TODO use the referer header?
+ self.send_header('Location', '/')
+ self.end_headers()
+
+
+if __name__ == '__main__':
+ server = HTTPServer(('0', 8090), Handler)
+
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ pass
+
+ server.server_close()
diff --git a/password.py b/password.py
new file mode 100755
index 0000000..d5bc046
--- /dev/null
+++ b/password.py
@@ -0,0 +1,81 @@
+#!/usr/bin/env python3
+import hashlib
+import json
+import os
+import random
+
+def gen_salt(length=10):
+ # TODO is this a sufficient source of randomness
+ return bytearray(random.randint(0, 256) for _ in range(length)).hex()
+
+# Manual list of entries, to stop someone from executing arbitrary
+# code by modyfying password database
+hash_methods = {
+ 'sha256': hashlib.sha256
+ }
+
+class Passwords:
+ def __init__(self, fname):
+ self.fname = fname
+ try:
+ with open(fname) as f:
+ self.db = json.load(f)
+ except:
+ self.db = {}
+
+ def save(self):
+ try:
+ with open(self.fname + '.tmp', 'w') as f:
+ json.dump(self.db, f)
+ f.write('\n')
+ os.rename(self.fname + '.tmp', self.fname)
+ except e:
+ print('Saving password failed {e}')
+
+ def add(self, username, password):
+ if cur := self.db.get(username):
+ salt = cur['salt']
+ hashed = hashlib.sha256((salt + password).encode('UTF-8'))
+ self.db[username] = {
+ 'hash': hashed.hexdigest(),
+ 'salt': salt,
+ 'method': 'sha256',
+ }
+ else:
+ salt = gen_salt()
+ hashed = hashlib.sha256((salt + password).encode('UTF-8'))
+ self.db[username] = {
+ 'hash': hashed.hexdigest(),
+ 'salt': salt,
+ 'method': 'sha256'
+ }
+
+ def validate(self, username, password):
+ # These shall fail when key is missing
+ data = self.db[username]
+ proc = hash_methods[data['method']]
+ return data['hash'] == proc((data['salt'] + password).encode('UTF-8')).hexdigest()
+
+
+
+if __name__ == '__main__':
+
+ import argparse
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--file', default='passwords.json')
+ subparsers = parser.add_subparsers(dest='cmd')
+ add_parser = subparsers.add_parser('add')
+ add_parser.add_argument('username')
+ add_parser.add_argument('password')
+ val_parser = subparsers.add_parser('validate')
+ val_parser.add_argument('username')
+ val_parser.add_argument('password')
+ args = parser.parse_args()
+
+
+ passwords = Passwords(args.file)
+ if args.cmd == 'add':
+ passwords.add(args.username, args.password)
+ passwords.save()
+ elif args.cmd == 'validate':
+ print(passwords.validate(args.username, args.password))