from email.message import EmailMessage from email.headerregistry import Address from urllib.parse import urlencode import password from password import Passwords import os from datetime import datetime from flask_login import ( LoginManager, login_required, login_user, current_user, logout_user, ) from typing import ( Optional, cast, ) from mu import mu_search, get_mail from html_render import HTML, render_document from flask import ( Flask, session, request, redirect, url_for, flash, get_flashed_messages ) login_manager = LoginManager() def mailto(addr: str) -> HTML: return ('a', {'href': f'mailto:{addr}'}, addr) def format_email(addr: Address) -> list[HTML]: mail_addr = f'{addr.username}@{addr.domain}' return [addr.display_name, ' <', mailto(mail_addr), '>'] def header_format(key: str, value) -> HTML: if key in ['to', 'cc', 'bcc']: return ('ul', *[('li', *format_email(addr)) for addr in value.addresses]) elif key == 'from': return format_email(value.addresses[0]) elif key == 'in-reply-to': # type(value) == email.headerregistry._UnstructuredHeader id = str(value).strip("<>") return ['<', ('a', {'href': '?' + urlencode({'id': id})}, id), '>'] else: return value style: HTML = lambda: """ nav { display: block; width: 100%; height: 4em; color: white; background-color: darkgrey; } dl { display: grid; grid-template-columns: 10ch auto; } dt { font-weight: bold; } dd { font-family: mono; font-size: 80%; } dd > * { margin: 0; } tr:nth-child(2n) { background-color: lightblue; } """ def attachement_tree(mail: EmailMessage) -> HTML: ct = mail.get_content_type() fn = mail.get_filename() children = [] for child in mail.iter_parts(): children.append(attachement_tree(cast(EmailMessage, child))) content: HTML if children: content = ('ul', *children) else: content = [] if fn: body = f'{ct} {fn}' else: body = str(ct) return ('li', body, content) # -------------------------------------------------- def login_page(returnto: Optional[str] = None) -> HTML: return ('form', {'action': '/login', 'method': 'POST'}, ('input', {'name': 'username', 'placeholder': 'Username'}), ('input', {'type': 'password', 'placeholder': 'Password', 'name': 'password'}), ('input', {'type': 'hidden', 'name': 'returnto', 'value': returnto}) if returnto else [], ('input', {'type': 'submit'}), ) def user_info(username: str) -> HTML: return [('span', username), ('form', {'action': '/logout', 'method': 'POST'}, ('input', {'type': 'submit', 'value': 'Logga ut'}))] def login_prompt() -> HTML: return ('a', {'href': '/login'}, 'Logga in') def flashed_messages() -> HTML: return ('ul', {'class': 'flashes'}, *[('li', msg) for msg in get_flashed_messages()]) def page_base(title: Optional[str] = None, body: HTML = []) -> HTML: return ('html', ('head', ('meta', {'charset': 'utf-8'}), ('title', title), ('style', style), ), ('body', ('nav', user_info(current_user.get_id()) if current_user.is_authenticated else login_prompt() ), flashed_messages(), body)) def response_for(id: str, username: Optional[str] = None) -> str: mail = cast(EmailMessage, get_mail(id)) headers = {} for (key, value) in mail.items(): headers[key.lower()] = value head = [] for h in ['date', 'from', 'to', 'cc', 'bcc', 'subject', 'x-original-to', 'in-reply-to']: if x := headers.get(h.lower()): head += [('dt', h.title()), ('dd', header_format(h.lower(), x))] body_part = mail.get_body(preferencelist=('html', 'plain')) if not body_part: raise ValueError("No suitable body in email") ct = body_part.get_content_type() body: HTML if ct == 'text/html': body = lambda: cast(EmailMessage, body_part).get_content() else: body = ('pre', cast(EmailMessage, body_part).get_content()) if t := headers.get('subject'): title = f'Mail — {t}' else: title = 'Mail' main_body = [('dl', *head), ('hr',), ('main', body), ('hr',), ('ul', attachement_tree(mail)), ] html_str = render_document(page_base(title=title, body=main_body)) return html_str def search_field(q: str) -> HTML: return ('form', {'action': '/search', 'method': 'GET'}, ('label', {'for': 'search'}, 'Mu Search Query'), ('textarea', {'id': 'search', 'name': 'q'}, q), ('input', {'type': 'Submit'})) def search_result(q, by, reverse) -> HTML: # keys = ['from', 'to', 'subject', 'date', 'size', 'maildir', 'msgid'] keys = ['from', 'to', 'subject', 'date'] rows = mu_search(q, by, reverse) body: list[tuple] = [] for row in rows: rowdata = [] for key in keys: data = row.get(key, None) if data and key == 'date': dt = datetime.fromtimestamp(int(data)) data = dt.strftime('%Y-%m-%d %H:%M') rowdata.append(('td', ('a', {'href': '/?id=' + row['msgid']}, data))) body.append(('tr', rowdata)) # TODO print number of results return ('table', ('thead', ('tr', [('th', m.title()) for m in keys])), ('tbody', body )) def search_page(q, by): main_body = [search_field(q)] if q: main_body.append(search_result(q, by, False)) return render_document(page_base(title='Search', body=main_body)) def index_page(): ids = [ 'CAEzixGsw-4zJ8_ejK_vDgmcQ9s-MbBc-ho+HL4arV4a+ghOOPg@mail.gmail.com', 'CA+pcBt-gLb0GtbFOjJ5_7Q_WXtqApVPQ9w-3O7GH=VqCEQat6g@mail.gmail.com', ] body = [('h1', "Sample ID's"), ('ul', [('li', ('a', {'href': '?' + urlencode({'id': id})}, id)) for id in ids] ), ] return render_document(page_base(title='Mail index', body=body)) passwords: Passwords = password.Passwords(cast(os.PathLike, 'passwords.json')) app = Flask(__name__) login_manager.init_app(app) app.secret_key = 'THIS IS A RANDOM STRING' class User: def __init__(self, username: str): self._username = username self._authenticated = False # @property def is_authenticated(self): return self._authenticated # @property def is_active(self): return True # @property def is_anonymous(self): return False # @property def get_id(self): return self._username @login_manager.user_loader def load_user(user_id): # return User.get(user_id) return User(user_id) @app.route('/') def index(): if not current_user.is_authenticated: return redirect(url_for('login_page_', returnto=request.path)) if id := request.args.get('id'): print("id =", id) response = response_for(''.join(id).replace(' ', '+')) else: response = index_page() return response @app.route('/search') @login_required def search_page_(): return search_page(request.args.get('q'), request.args.get('by', None)) @app.route('/login', methods=['GET']) def login_page_(): body = login_page(request.args.get('returnto')) return render_document(page_base(title='Login', body=body)) @app.route('/login', methods=['POST']) def login_form(): resp = redirect(request.args.get('returnto', url_for('index'))) username = request.form['username'] password = request.form['password'] user = User(username) if passwords.validate(username, password): login_user(user) else: flash('Invalid username or password') return resp @app.route('/logout', methods=['POST']) @login_required def logout_form(): logout_user() return redirect(url_for('index')) if __name__ == '__main__': app.run(debug=True, port=8090)