diff options
author | Hugo Hörnquist <hugo@lysator.liu.se> | 2022-04-07 17:02:37 +0200 |
---|---|---|
committer | Hugo Hörnquist <hugo@lysator.liu.se> | 2022-04-07 17:02:37 +0200 |
commit | 2e8b74f9aa2fbfe4f17611719cd648125bf92ddc (patch) | |
tree | b2c57e4ea12eb73f7b54275c0a51a47909d24724 /main.py | |
download | mu4web-2e8b74f9aa2fbfe4f17611719cd648125bf92ddc.tar.gz mu4web-2e8b74f9aa2fbfe4f17611719cd648125bf92ddc.tar.xz |
Initial commit.
Diffstat (limited to 'main.py')
-rw-r--r-- | main.py | 350 |
1 files changed, 350 insertions, 0 deletions
@@ -0,0 +1,350 @@ +import subprocess +import email +from email.message import EmailMessage +from subprocess import PIPE +import html +from email.parser import BytesParser +import email.policy +from urllib.parse import urlparse, urlencode, parse_qs +from http.cookies import BaseCookie +import http.cookies +import password +from uuid import uuid4 + +from http.server import HTTPServer, BaseHTTPRequestHandler + +parser = BytesParser(policy=email.policy.default) + +def get_mail(id): + cmd = subprocess.run(['mu', 'find', '-u', f'i:{id}', + '--fields', 'l'], + stdout=PIPE) + filename = cmd.stdout.decode('UTF-8').strip() + + with open(filename, "rb") as f: + mail = parser.parse(f) + return mail + + +def header_format(key, value): + if key in ['to', 'cc', 'bcc']: + lst = [] + for addr in value.addresses: + mail_addr = f'{addr.username}@{addr.domain}' + lst.append(('li', + str(addr.display_name), + ' <', + ('a', {'href': f'mail:{mail_addr}'}, mail_addr), + '>', + )) + return ('ul', lst) + elif key == 'from': + value = value.addresses[0] + mail_addr = f'{value.username}@{value.domain}' + return [str(value.display_name), + ' <', + ('a', {'href': f'mail:{mail_addr}'}, mail_addr), + '>', + ] + elif key == 'in-reply-to': + # type(value) == email.headerregistry._UnstructuredHeader + id = str(value).strip("<>") + return ['<', ('a', {'href': '?'+urlencode({'id': id})}, id), '>'] + else: + return value + +standalones = ['hr', 'br', 'meta'] + +def _render_document(document): + if type(document) == tuple: + tag, *body = document + if body and type(body[0]) == dict: + print(body[0]) + attributes = ' '.join(f'{a}="{html.escape(b)}"' + for (a, b) in body[0].items()) + body = body[1:] + start = f'<{tag} {attributes}>' + else: + start = f'<{tag}>' + if tag in standalones: + return start + else: + items = ''.join(_render_document(b) for b in body) + return start + f'{items}</{tag}>' + elif callable(document): + return str(document()) + elif type(document) == list: + return ''.join(_render_document(e) for e in document) + else: + # strings, and everything else + return html.escape(str(document)) + +def render_document(document): + return '<!doctype html>\n' + _render_document(document) + +style = lambda: """ + + nav { + display: block; + width: 100%; + height: 4em; + color: white; + background-color: darkgrey; + } + + dl { + display: grid; + grid-template-columns: 10ch auto; + } + dt { + font-weight: bold; + } + dd { + font-family: mono; + font-size: 80%; + } + dd > * { + margin: 0; + } + + """ +def attachement_tree(mail): + ct = mail.get_content_type() + fn = mail.get_filename() + + children = [] + for child in mail.iter_parts(): + children.append(attachement_tree(child)) + + if children: + content = ('ul', *children) + else: + content = [] + + if fn: + body = f'{ct} {fn}' + else: + body = str(ct) + return ('li', body, content) + +# -------------------------------------------------- + + +def login_page(returnto=None): + return ('form', { 'action': '/login', 'method': 'POST' }, + ('input', {'name': 'username', 'placeholder': 'Username'}), + ('input', {'type': 'password', + 'placeholder': 'Password', + 'name': 'password'}), + ('input', {'type': 'hidden', + 'name': 'returnto', + 'value': returnto}) + if returnto else [], + ('input', {'type': 'submit'}), + ) + +def user_info(username): + return [('span', username), + ('form', {'action': '/logout', 'method': 'POST'}, + ('input', {'type': 'submit', 'value': 'Logga ut'}))] + +def login_prompt(): + return ('a', {'href': '/login'}, 'Logga in') + +def page_base(title=None, body=[], username=None): + return ('html', + ('head', + ('meta', { 'charset': 'utf-8' }), + ('title', title), + ('style', style), + ), + ('body', + ('nav', + user_info(username) if username else login_prompt() + ), + body)) + + +def response_for(id, username=None): + + mail = get_mail(id) + + + headers = {} + for (key, value) in mail.items(): + headers[key.lower()] = value + + head = [] + for h in ['date', 'from', 'to', 'cc', 'bcc', 'subject', 'x-original-to', + 'in-reply-to']: + if x := headers.get(h.lower()): + head += [('dt', h.title()), + ('dd', header_format(h.lower(), x))] + +# print() + body_part = mail.get_body(preferencelist=('html','plain')) + ct = body_part.get_content_type() + if ct == 'text/html': + body = lambda: body_part.get_content() + else: + body = ('pre', body_part.get_content()) + + if t := headers.get('subject'): + title = f'Mail — {t}' + else: + title = 'Mail' + + main_body = [ ('dl', *head), + ('hr',), + ('main', body), + ('hr',), + ('ul', attachement_tree(mail)),] + html_str = render_document( + page_base(title=title, body=main_body, username=username)) + + return html_str + + + + +def index_page(username): + ids = [ + 'CAEzixGsw-4zJ8_ejK_vDgmcQ9s-MbBc-ho+HL4arV4a+ghOOPg@mail.gmail.com', + 'CA+pcBt-gLb0GtbFOjJ5_7Q_WXtqApVPQ9w-3O7GH=VqCEQat6g@mail.gmail.com', + ] + + body = [ + ('h1', "Sample ID's"), + ('ul', + [('li', ('a', {'href': '?'+urlencode({'id': id})}, id)) + for id in ids] + ), + ] + + return render_document( + page_base( + title='Mail index', + body=body, + username=username + )) + +valid_session_cookies = {} + +def validate_session_cookie(cookie): + return valid_session_cookies.get(cookie.value) + +def remove_session_cookie(cookie): + if valid_session_cookies.get(cookie.value): + del valid_session_cookies[cookie.value] + cookie.set(cookie.key, '', '') + cookie.expire = 0 + return cookie + +passwords = password.Passwords('passwords.json') + +def new_session_cookie(username): + global valid_session_cookies + m = http.cookies.Morsel() + unique = str(uuid4()) + valid_session_cookies[unique] = username + m.set('session', unique, unique) + return m + +class Handler(BaseHTTPRequestHandler): + def do_GET(self): + url = urlparse(self.path) + query = parse_qs(url.query) + # print(type(self.headers)) + + cookies = BaseCookie(self.headers.get('Cookie')) + logged_in = None + if c := cookies.get('session'): + logged_in = validate_session_cookie(c) + + if url.path == '/': + if not logged_in: + self.send_response(307) + q = urlencode({'returnto': self.path}) + self.send_header('location', '/login?' + q) + self.end_headers() + else: + if id := query.get('id'): + response = response_for(''.join(id), logged_in) + self.send_response(200) + else: + response = index_page(logged_in) + self.send_response(200) + + response = response.encode('UTF-8') + self.send_header('Content-Type', 'text/html') + self.send_header('Content-Length', len(response)) + self.end_headers() + self.wfile.write(response) + + elif url.path == '/login': + if not logged_in: + body = login_page(''.join(query.get('returnto'))) + self.send_response(200) + content = render_document(page_base(title='Login', body=body)) + content = content.encode('UTF-8') + self.send_header('Content-Type', 'text/html') + self.send_header('Content-Length', len(content)) + self.end_headers() + self.wfile.write(content) + else: + # TODO do something sensible here + pass + + + def do_POST(self): + url = urlparse(self.path) + query = parse_qs(url.query) + cookies = BaseCookie(self.headers.get('Cookie')) + logged_in = None + if c := cookies.get('session'): + logged_in = validate_session_cookie(c) + print(cookies) + print(valid_session_cookies) + + if url.path == '/login': + cl = content_length = self.headers.get('content-length') + data = parse_qs(self.rfile.read(int(cl))) + username = b''.join(data[b'username']).decode('UTF-8') + password = b''.join(data[b'password']).decode('UTF-8') + if passwords.validate(username, password): + cookie = new_session_cookie(username) + self.send_response(302) + self.send_header('set-cookie', cookie.OutputString()) + if ret := data.get(b'returnto'): + returnto = b''.join(ret).decode('UTF-8') + self.send_header('location', returnto) + else: + self.send_header('location', '/') + else: + self.send_response(302) + + self.end_headers() + + if url.path == '/logout': + if not logged_in: + self.send_response(302) + self.send_header('Location', '/') + self.end_headers() + return + cookie = remove_session_cookie(cookies.get('session')) + self.send_response(302) + self.send_header('set-cookie', cookie) + # TODO use the referer header? + self.send_header('Location', '/') + self.end_headers() + + +if __name__ == '__main__': + server = HTTPServer(('0', 8090), Handler) + + try: + server.serve_forever() + except KeyboardInterrupt: + pass + + server.server_close() |