from email.message import EmailMessage from email.headerregistry import Address from urllib.parse import urlencode import password from password import Passwords import os from datetime import datetime from flask_login import ( LoginManager, login_required, login_user, current_user, logout_user, ) from typing import ( Optional, cast, ) from mu import get_mail import mu from html_render import HTML, render_document from user.local import LocalUser from user.pam import PamUser import subprocess from flask import ( Flask, session, request, redirect, url_for, flash, get_flashed_messages ) login_manager = LoginManager() def mailto(addr: str) -> HTML: return ('a', {'href': f'mailto:{addr}'}, addr) def format_email(addr: Address) -> list[HTML]: mail_addr = f'{addr.username}@{addr.domain}' return [addr.display_name, ' <', mailto(mail_addr), '>'] def header_format(key: str, value) -> HTML: if key in ['to', 'cc', 'bcc']: return ('ul', *[('li', *format_email(addr)) for addr in value.addresses]) elif key == 'from': return format_email(value.addresses[0]) elif key == 'in-reply-to': # type(value) == email.headerregistry._UnstructuredHeader id = str(value).strip("<>") return ['<', ('a', {'href': '?' + urlencode({'id': id})}, id), '>'] else: return value def attachement_tree(mail: EmailMessage) -> HTML: ct = mail.get_content_type() fn = mail.get_filename() children = [] for child in mail.iter_parts(): children.append(attachement_tree(cast(EmailMessage, child))) content: HTML if children: content = ('ul', *children) else: content = [] if fn: body = f'{ct} {fn}' else: body = str(ct) return ('li', body, content) # -------------------------------------------------- def login_page(returnto: Optional[str] = None) -> HTML: return ('form', {'action': '/login', 'method': 'POST', 'class': 'loginform'}, ('label', {'for': 'username'}, 'Användarnamn'), ('input', {'id': 'username', 'name': 'username', 'placeholder': 'Användarnamn'}), ('label', {'for': 'password'}, 'Lösenord'), ('input', {'type': 'password', 'placeholder': 'Lösenord', 'name': 'password'}), ('input', {'type': 'hidden', 'name': 'returnto', 'value': returnto}) if returnto else [], ('input', {'type': 'submit', 'value': 'Logga in'}), ) def user_info(username: str) -> HTML: return [('span', username), ('form', {'action': '/logout', 'method': 'POST'}, ('input', {'type': 'submit', 'value': 'Logga ut'}))] def login_prompt() -> HTML: return ('a', {'href': '/login'}, 'Logga in') def flashed_messages() -> HTML: return ('ul', {'class': 'flashes'}, *[('li', msg) for msg in get_flashed_messages()]) def include_stylesheet(path): return ('link', {'type': 'text/css', 'rel': 'stylesheet', 'href': path}) def page_base(title: Optional[str] = None, body: HTML = []) -> HTML: return ('html', ('head', ('meta', {'charset': 'utf-8'}), ('title', title), include_stylesheet('/static/style.css'), ), ('body', ('nav', ('menu', ('li', ('h1', ('a', {'href': '/'}, 'Mu4Web')), ('li', user_info(current_user.get_id()) if current_user.is_authenticated else login_prompt()) ))), ('main', flashed_messages(), body), ('footer', ('menu', ('li', ('a', {'href': 'https://www.djcbsoftware.nl/code/mu/'}, 'mu')), ('li', ('a', {'href': 'https://git.hornquist.se/mu4web'}, 'Source')), )))) def response_for(id: str, username: Optional[str] = None) -> str: mail = cast(EmailMessage, get_mail(id)) headers = {} for (key, value) in mail.items(): headers[key.lower()] = value head = [] for h in ['date', 'from', 'to', 'cc', 'bcc', 'subject', 'x-original-to', 'in-reply-to']: if x := headers.get(h.lower()): head += [('dt', h.title()), ('dd', header_format(h.lower(), x))] body_part = mail.get_body(preferencelist=('html', 'plain')) if not body_part: raise ValueError("No suitable body in email") ct = body_part.get_content_type() body: HTML if ct == 'text/html': body = lambda: cast(EmailMessage, body_part).get_content() else: body = ('pre', cast(EmailMessage, body_part).get_content()) if t := headers.get('subject'): title = f'Mail — {t}' else: title = 'Mail' main_body = [('dl', *head), ('hr',), ('main', body), ('hr',), ('ul', attachement_tree(mail)), ] html_str = render_document(page_base(title=title, body=main_body)) return html_str def search_field(q: str) -> HTML: return ('form', {'id': 'searchform', 'action': '/search', 'method': 'GET'}, ('label', {'for': 'search'}, 'Mu Search Query'), ('input', {'id': 'search', 'type': 'text', 'placeholder': 'Sök...', 'name': 'q', 'value': q}), ('input', {'type': 'Submit', 'value': 'Sök'})) def search_result(q, by, reverse) -> HTML: # keys = ['from', 'to', 'subject', 'date', 'size', 'maildir', 'msgid'] keys = ['from', 'to', 'subject', 'date'] rows = mu.search(q, by, reverse) body: list[tuple] = [] for row in rows: rowdata = [] for key in keys: data = row.get(key, None) if data and key == 'date': dt = datetime.fromtimestamp(int(data)) data = dt.strftime('%Y-%m-%d %H:%M') rowdata.append(('td', ('a', {'href': '/?id=' + row['msgid']}, data))) body.append(('tr', rowdata)) if len(rows) == 0: return "No results" else: return ('div', ('p', f"{len(rows)} träffar"), ('table', ('thead', ('tr', [('th', m.title()) for m in keys])), ('tbody', body))) def search_page(q, by): main_body = [search_field(q)] if q: main_body.append(search_result(q, by, False)) return render_document(page_base(title='Search', body=main_body)) def mu_info(): d = mu.info() rows = [] for key, value in d.items(): rows.append(('tr', ('td', key), ('td', value))) return ('table', ('tbody', rows)) def find_maildirs(basedir) -> dict[str, list[str]]: cmd = subprocess.run(['find', basedir, '-type', 'd', '-name', 'cur', '-print0'], capture_output=True) groups = {} # Group by first component for entry in cmd.stdout.split(b'\0'): dir = os.path.split(entry)[0][len(basedir) + 1:].decode('UTF-8') if not dir: continue parts = dir.split(os.path.sep) groups.setdefault(parts[0], []).append(parts[1:]) return groups def index_page(): ids = [ 'CAEzixGsw-4zJ8_ejK_vDgmcQ9s-MbBc-ho+HL4arV4a+ghOOPg@mail.gmail.com', 'CA+pcBt-gLb0GtbFOjJ5_7Q_WXtqApVPQ9w-3O7GH=VqCEQat6g@mail.gmail.com', ] data = mu.info() groups = find_maildirs(data['maildir']) entries = [] for key, values in sorted(groups.items(), key=lambda p: p[0]): entries.append(('li', ('details', ('summary', key), ('ul', [('li', ('a', {'href': 'search?' + urlencode({'q': f'maildir:"/{key}/{v}"'})}, v)) for v in sorted(os.path.sep.join(value) for value in values)])))) body = [('div', mu_info()), ('din', ('ul', entries)), ] return render_document(page_base(title='Mail index', body=body)) app = Flask(__name__) login_manager.init_app(app) app.secret_key = 'THIS IS A RANDOM STRING' @login_manager.user_loader def load_user(user_id): # return User.get(user_id) return LocalUser(user_id) @app.route('/') def index(): if not current_user.is_authenticated: return redirect(url_for('login_page_', returnto=request.path)) if id := request.args.get('id'): print("id =", id) response = response_for(''.join(id).replace(' ', '+')) else: response = index_page() return response @app.route('/search') @login_required def search_page_(): return search_page(request.args.get('q'), request.args.get('by', None)) # TODO this page is really weird if you are already logged in @app.route('/login', methods=['GET']) def login_page_(): body = login_page(request.args.get('returnto')) return render_document(page_base(title='Login', body=body)) @app.route('/login', methods=['POST']) def login_form(): resp = redirect(request.args.get('returnto', url_for('index'))) username = request.form['username'] password = request.form['password'] user = PamUser(username) if user.validate(password): login_user(user) else: flash('Invalid username or password') return resp @app.route('/logout', methods=['POST']) @login_required def logout_form(): logout_user() return redirect(url_for('index')) if __name__ == '__main__': app.run(debug=True, port=8090)