From 4587e5e124afcb13c81dea68e373420670d076b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20H=C3=B6rnquist?= Date: Tue, 29 Nov 2022 02:51:29 +0100 Subject: Properly specify version requirements, and add directory layout. --- html_render.py | 71 ---------- main.py | 350 -------------------------------------------------- mu.py | 128 ------------------ mu4web/__init__.py | 1 + mu4web/html_render.py | 71 ++++++++++ mu4web/main.py | 350 ++++++++++++++++++++++++++++++++++++++++++++++++++ mu4web/mu.py | 128 ++++++++++++++++++ mu4web/password.py | 120 +++++++++++++++++ password.py | 120 ----------------- setup.cfg | 22 ++++ setup.py | 2 + 11 files changed, 694 insertions(+), 669 deletions(-) delete mode 100644 html_render.py delete mode 100644 main.py delete mode 100644 mu.py create mode 100644 mu4web/__init__.py create mode 100644 mu4web/html_render.py create mode 100644 mu4web/main.py create mode 100644 mu4web/mu.py create mode 100755 mu4web/password.py delete mode 100755 password.py create mode 100644 setup.py diff --git a/html_render.py b/html_render.py deleted file mode 100644 index 99140ad..0000000 --- a/html_render.py +++ /dev/null @@ -1,71 +0,0 @@ -import html -from typing import ( - Callable, - TypeAlias, - Union, -) - -HTML: TypeAlias = Union[tuple, - list['HTML'], - Callable[[], str], - None, str, int, float] - - -standalones = ['hr', 'br', 'meta'] -"""Tags which can't have a closing tag.""" - - -def _render_document(document: HTML) -> str: - if isinstance(document, tuple): - tag, *body = document - if body and isinstance(body[0], dict): - print(body[0]) - attributes = ' '.join(f'{a}="{html.escape(b)}"' - for a, b in body[0].items()) - body = body[1:] - start = f'<{tag} {attributes}>' - else: - start = f'<{tag}>' - - if tag in standalones: - return start - else: - if body: - items = ''.join(_render_document(b) for b in body) - else: - items = '' - return start + f'{items}' - elif callable(document): - return str(document()) - elif isinstance(document, list): - return ''.join(_render_document(e) for e in document) - elif document is None: - return '' - else: - # strings, and everything else - return html.escape(str(document)) - - -def render_document(document: HTML) -> str: - """ - Render an HTML structure to an Html string. - - The following Python types are converted as follows: - - Tuples - - The first value becomes the tags name - - The second value, if a dictionary, becomes the tags attributes - - All following values (including the second if not a dictionary) - gets individually passed to render_document. - - Lists - Each element gets passed to render_document - - Callable[[], str] - Gets called, and its output is included verbatim. Useful for - including strings which shouldn't be escaped. - - str - Gets escaped, and included - - int, float - Gets included as their default string representation. - - None - Becomes an empty string - """ - return '\n' + _render_document(document) diff --git a/main.py b/main.py deleted file mode 100644 index 7c3d7bd..0000000 --- a/main.py +++ /dev/null @@ -1,350 +0,0 @@ -from email.message import EmailMessage -from email.headerregistry import Address -from urllib.parse import urlencode -import password -from password import Passwords -import os -from datetime import datetime -from flask_login import ( - LoginManager, - login_required, - login_user, - current_user, - logout_user, -) -from typing import ( - Optional, - cast, -) -from mu import mu_search, get_mail -from html_render import HTML, render_document - - -from flask import ( - Flask, - session, - request, - redirect, - url_for, - flash, - get_flashed_messages -) - -login_manager = LoginManager() - - -def mailto(addr: str) -> HTML: - return ('a', {'href': f'mailto:{addr}'}, addr) - - -def format_email(addr: Address) -> list[HTML]: - mail_addr = f'{addr.username}@{addr.domain}' - return [addr.display_name, ' <', mailto(mail_addr), '>'] - - -def header_format(key: str, value) -> HTML: - if key in ['to', 'cc', 'bcc']: - return ('ul', *[('li', *format_email(addr)) - for addr in value.addresses]) - elif key == 'from': - return format_email(value.addresses[0]) - elif key == 'in-reply-to': - # type(value) == email.headerregistry._UnstructuredHeader - id = str(value).strip("<>") - return ['<', ('a', {'href': '?' + urlencode({'id': id})}, id), '>'] - else: - return value - - -style: HTML = lambda: """ - nav { - display: block; - width: 100%; - height: 4em; - color: white; - background-color: darkgrey; - } - - dl { - display: grid; - grid-template-columns: 10ch auto; - } - dt { - font-weight: bold; - } - dd { - font-family: mono; - font-size: 80%; - } - dd > * { - margin: 0; - } - - tr:nth-child(2n) { - background-color: lightblue; - } - - """ - - -def attachement_tree(mail: EmailMessage) -> HTML: - ct = mail.get_content_type() - fn = mail.get_filename() - - children = [] - for child in mail.iter_parts(): - children.append(attachement_tree(cast(EmailMessage, child))) - - content: HTML - if children: - content = ('ul', *children) - else: - content = [] - - if fn: - body = f'{ct} {fn}' - else: - body = str(ct) - return ('li', body, content) - -# -------------------------------------------------- - - -def login_page(returnto: Optional[str] = None) -> HTML: - return ('form', {'action': '/login', 'method': 'POST'}, - ('input', {'name': 'username', 'placeholder': 'Username'}), - ('input', {'type': 'password', - 'placeholder': 'Password', - 'name': 'password'}), - ('input', {'type': 'hidden', - 'name': 'returnto', - 'value': returnto}) - if returnto else [], - ('input', {'type': 'submit'}), - ) - - -def user_info(username: str) -> HTML: - return [('span', username), - ('form', {'action': '/logout', 'method': 'POST'}, - ('input', {'type': 'submit', 'value': 'Logga ut'}))] - - -def login_prompt() -> HTML: - return ('a', {'href': '/login'}, 'Logga in') - - -def flashed_messages() -> HTML: - return ('ul', {'class': 'flashes'}, - *[('li', msg) for msg in get_flashed_messages()]) - - -def page_base(title: Optional[str] = None, - body: HTML = []) -> HTML: - return ('html', - ('head', - ('meta', {'charset': 'utf-8'}), - ('title', title), - ('style', style), - ), - ('body', - ('nav', - user_info(current_user.get_id()) if current_user.is_authenticated else login_prompt() - ), - flashed_messages(), - body)) - - -def response_for(id: str, username: Optional[str] = None) -> str: - - mail = cast(EmailMessage, get_mail(id)) - - headers = {} - for (key, value) in mail.items(): - headers[key.lower()] = value - - head = [] - for h in ['date', 'from', 'to', 'cc', 'bcc', 'subject', 'x-original-to', - 'in-reply-to']: - if x := headers.get(h.lower()): - head += [('dt', h.title()), - ('dd', header_format(h.lower(), x))] - - body_part = mail.get_body(preferencelist=('html', 'plain')) - if not body_part: - raise ValueError("No suitable body in email") - ct = body_part.get_content_type() - body: HTML - if ct == 'text/html': - body = lambda: cast(EmailMessage, body_part).get_content() - else: - body = ('pre', cast(EmailMessage, body_part).get_content()) - - if t := headers.get('subject'): - title = f'Mail — {t}' - else: - title = 'Mail' - - main_body = [('dl', *head), - ('hr',), - ('main', body), - ('hr',), - ('ul', attachement_tree(mail)), - ] - html_str = render_document(page_base(title=title, - body=main_body)) - - return html_str - - -def search_field(q: str) -> HTML: - return ('form', {'action': '/search', 'method': 'GET'}, - ('label', {'for': 'search'}, - 'Mu Search Query'), - ('textarea', {'id': 'search', 'name': 'q'}, - q), - ('input', {'type': 'Submit'})) - - -def search_result(q, by, reverse) -> HTML: - - # keys = ['from', 'to', 'subject', 'date', 'size', 'maildir', 'msgid'] - keys = ['from', 'to', 'subject', 'date'] - - rows = mu_search(q, by, reverse) - body: list[tuple] = [] - for row in rows: - rowdata = [] - for key in keys: - data = row.get(key, None) - if data and key == 'date': - dt = datetime.fromtimestamp(int(data)) - data = dt.strftime('%Y-%m-%d %H:%M') - rowdata.append(('td', ('a', {'href': '/?id=' + row['msgid']}, data))) - body.append(('tr', rowdata)) - - - # TODO print number of results - - return ('table', - ('thead', - ('tr', - [('th', m.title()) for m in keys])), - ('tbody', - body - )) - - -def search_page(q, by): - main_body = [search_field(q)] - - if q: - main_body.append(search_result(q, by, False)) - - return render_document(page_base(title='Search', - body=main_body)) - - -def index_page(): - ids = [ - 'CAEzixGsw-4zJ8_ejK_vDgmcQ9s-MbBc-ho+HL4arV4a+ghOOPg@mail.gmail.com', - 'CA+pcBt-gLb0GtbFOjJ5_7Q_WXtqApVPQ9w-3O7GH=VqCEQat6g@mail.gmail.com', - ] - - body = [('h1', "Sample ID's"), - ('ul', - [('li', ('a', {'href': '?' + urlencode({'id': id})}, id)) - for id in ids] - ), - ] - - return render_document(page_base(title='Mail index', - body=body)) - - -passwords: Passwords = password.Passwords(cast(os.PathLike, 'passwords.json')) - - -app = Flask(__name__) -login_manager.init_app(app) -app.secret_key = 'THIS IS A RANDOM STRING' - - -class User: - def __init__(self, username: str): - self._username = username - self._authenticated = False - - # @property - def is_authenticated(self): - return self._authenticated - - # @property - def is_active(self): - return True - - # @property - def is_anonymous(self): - return False - - # @property - def get_id(self): - return self._username - - - - -@login_manager.user_loader -def load_user(user_id): - # return User.get(user_id) - return User(user_id) - - -@app.route('/') -def index(): - if not current_user.is_authenticated: - return redirect(url_for('login_page_', returnto=request.path)) - if id := request.args.get('id'): - print("id =", id) - response = response_for(''.join(id).replace(' ', '+')) - else: - response = index_page() - return response - - -@app.route('/search') -@login_required -def search_page_(): - return search_page(request.args.get('q'), - request.args.get('by', None)) - - -@app.route('/login', methods=['GET']) -def login_page_(): - body = login_page(request.args.get('returnto')) - return render_document(page_base(title='Login', body=body)) - - -@app.route('/login', methods=['POST']) -def login_form(): - resp = redirect(request.args.get('returnto', url_for('index'))) - - username = request.form['username'] - password = request.form['password'] - user = User(username) - if passwords.validate(username, password): - login_user(user) - else: - flash('Invalid username or password') - return resp - - -@app.route('/logout', methods=['POST']) -@login_required -def logout_form(): - logout_user() - return redirect(url_for('index')) - - -if __name__ == '__main__': - app.run(debug=True, port=8090) diff --git a/mu.py b/mu.py deleted file mode 100644 index d52a362..0000000 --- a/mu.py +++ /dev/null @@ -1,128 +0,0 @@ -""" -Wrapper for the `mu` command line. -""" - -import email.message -import email.policy -from email.parser import BytesParser -import subprocess -from subprocess import PIPE -import xml.dom.minidom -import xml.dom - -from typing import ( - Literal, - Optional, - Union, -) - -parser = BytesParser(policy=email.policy.default) - - -def get_mail(id: str) -> email.message.Message: - """ - Lookup email by Message-ID. - - [Raises] - MuError - """ - cmd = subprocess.run(['mu', 'find', '-u', f'i:{id}', - '--fields', 'l'], - stdout=PIPE) - filename = cmd.stdout.decode('UTF-8').strip() - - if cmd.returncode != 0: - raise MuError(cmd.returncode) - - with open(filename, "rb") as f: - mail = parser.parse(f) - return mail - - -class MuError(Exception): - codes = { - 1: 'General Error', - 2: 'No Matches', - 4: 'Database is corrupted' - } - - def __init__(self, returncode: int): - self.returncode: int = returncode - self.msg: str = MuError.codes.get(returncode, 'Unknown Error') - - def __repr__(self): - return f'MuError({self.returncode}, "{self.msg}")' - - def __str__(self): - return repr(self) - - -Sortfield = Union[Literal['cc'], - Literal['c'], - Literal['bcc'], - Literal['h'], - Literal['date'], - Literal['d'], - Literal['from'], - Literal['f'], - Literal['maildir'], - Literal['m'], - Literal['msgid'], - Literal['i'], - Literal['prio'], - Literal['p'], - Literal['subject'], - Literal['s'], - Literal['to'], - Literal['t'], - Literal['list'], - Literal['v']] - - - -def mu_search(query, sortfield: Optional[Sortfield] = 'subject', reverse: bool = False) -> list[dict[str, str]]: - """ - [Parameters] - query - Search query as per mu-find(1). - sortfield - Field to sort the values by - reverse - If the sort should be reversed - - [Returns] - >>> {'from': 'Hugo Hörnquist ', - 'date': '1585678375', - 'size': '377', - 'msgid': 'SAMPLE-ID@localhost', - 'path': '/home/hugo/.local/var/mail/INBOX/cur/filename', - 'maildir': '/INBOX' - } - """ - - if not query: - raise ValueError('Query required for mu_search') - cmdline = ['mu', 'find', - '--format=xml', - query] - if sortfield: - cmdline.extend(['--sortfield', sortfield]) - if reverse: - cmdline.append('--reverse') - print(cmdline) - cmd = subprocess.run(cmdline, capture_output=True) - if cmd.returncode != 0: - raise MuError(cmd.returncode) - dom = xml.dom.minidom.parseString(cmd.stdout.decode('UTF-8')) - - message_list = [] - messages = dom.childNodes[0] - assert messages.localName == 'messages' - for message in messages.childNodes: - msg_dict = {} - if message.nodeType != xml.dom.Node.ELEMENT_NODE: - continue - for kv in message.childNodes: - if kv.nodeType != xml.dom.Node.ELEMENT_NODE: - continue - msg_dict[kv.localName] = kv.childNodes[0].data - message_list.append(msg_dict) - - return message_list diff --git a/mu4web/__init__.py b/mu4web/__init__.py new file mode 100644 index 0000000..72bdd01 --- /dev/null +++ b/mu4web/__init__.py @@ -0,0 +1 @@ +VERSION = "0.1" diff --git a/mu4web/html_render.py b/mu4web/html_render.py new file mode 100644 index 0000000..99140ad --- /dev/null +++ b/mu4web/html_render.py @@ -0,0 +1,71 @@ +import html +from typing import ( + Callable, + TypeAlias, + Union, +) + +HTML: TypeAlias = Union[tuple, + list['HTML'], + Callable[[], str], + None, str, int, float] + + +standalones = ['hr', 'br', 'meta'] +"""Tags which can't have a closing tag.""" + + +def _render_document(document: HTML) -> str: + if isinstance(document, tuple): + tag, *body = document + if body and isinstance(body[0], dict): + print(body[0]) + attributes = ' '.join(f'{a}="{html.escape(b)}"' + for a, b in body[0].items()) + body = body[1:] + start = f'<{tag} {attributes}>' + else: + start = f'<{tag}>' + + if tag in standalones: + return start + else: + if body: + items = ''.join(_render_document(b) for b in body) + else: + items = '' + return start + f'{items}' + elif callable(document): + return str(document()) + elif isinstance(document, list): + return ''.join(_render_document(e) for e in document) + elif document is None: + return '' + else: + # strings, and everything else + return html.escape(str(document)) + + +def render_document(document: HTML) -> str: + """ + Render an HTML structure to an Html string. + + The following Python types are converted as follows: + - Tuples + - The first value becomes the tags name + - The second value, if a dictionary, becomes the tags attributes + - All following values (including the second if not a dictionary) + gets individually passed to render_document. + - Lists + Each element gets passed to render_document + - Callable[[], str] + Gets called, and its output is included verbatim. Useful for + including strings which shouldn't be escaped. + - str + Gets escaped, and included + - int, float + Gets included as their default string representation. + - None + Becomes an empty string + """ + return '\n' + _render_document(document) diff --git a/mu4web/main.py b/mu4web/main.py new file mode 100644 index 0000000..7c3d7bd --- /dev/null +++ b/mu4web/main.py @@ -0,0 +1,350 @@ +from email.message import EmailMessage +from email.headerregistry import Address +from urllib.parse import urlencode +import password +from password import Passwords +import os +from datetime import datetime +from flask_login import ( + LoginManager, + login_required, + login_user, + current_user, + logout_user, +) +from typing import ( + Optional, + cast, +) +from mu import mu_search, get_mail +from html_render import HTML, render_document + + +from flask import ( + Flask, + session, + request, + redirect, + url_for, + flash, + get_flashed_messages +) + +login_manager = LoginManager() + + +def mailto(addr: str) -> HTML: + return ('a', {'href': f'mailto:{addr}'}, addr) + + +def format_email(addr: Address) -> list[HTML]: + mail_addr = f'{addr.username}@{addr.domain}' + return [addr.display_name, ' <', mailto(mail_addr), '>'] + + +def header_format(key: str, value) -> HTML: + if key in ['to', 'cc', 'bcc']: + return ('ul', *[('li', *format_email(addr)) + for addr in value.addresses]) + elif key == 'from': + return format_email(value.addresses[0]) + elif key == 'in-reply-to': + # type(value) == email.headerregistry._UnstructuredHeader + id = str(value).strip("<>") + return ['<', ('a', {'href': '?' + urlencode({'id': id})}, id), '>'] + else: + return value + + +style: HTML = lambda: """ + nav { + display: block; + width: 100%; + height: 4em; + color: white; + background-color: darkgrey; + } + + dl { + display: grid; + grid-template-columns: 10ch auto; + } + dt { + font-weight: bold; + } + dd { + font-family: mono; + font-size: 80%; + } + dd > * { + margin: 0; + } + + tr:nth-child(2n) { + background-color: lightblue; + } + + """ + + +def attachement_tree(mail: EmailMessage) -> HTML: + ct = mail.get_content_type() + fn = mail.get_filename() + + children = [] + for child in mail.iter_parts(): + children.append(attachement_tree(cast(EmailMessage, child))) + + content: HTML + if children: + content = ('ul', *children) + else: + content = [] + + if fn: + body = f'{ct} {fn}' + else: + body = str(ct) + return ('li', body, content) + +# -------------------------------------------------- + + +def login_page(returnto: Optional[str] = None) -> HTML: + return ('form', {'action': '/login', 'method': 'POST'}, + ('input', {'name': 'username', 'placeholder': 'Username'}), + ('input', {'type': 'password', + 'placeholder': 'Password', + 'name': 'password'}), + ('input', {'type': 'hidden', + 'name': 'returnto', + 'value': returnto}) + if returnto else [], + ('input', {'type': 'submit'}), + ) + + +def user_info(username: str) -> HTML: + return [('span', username), + ('form', {'action': '/logout', 'method': 'POST'}, + ('input', {'type': 'submit', 'value': 'Logga ut'}))] + + +def login_prompt() -> HTML: + return ('a', {'href': '/login'}, 'Logga in') + + +def flashed_messages() -> HTML: + return ('ul', {'class': 'flashes'}, + *[('li', msg) for msg in get_flashed_messages()]) + + +def page_base(title: Optional[str] = None, + body: HTML = []) -> HTML: + return ('html', + ('head', + ('meta', {'charset': 'utf-8'}), + ('title', title), + ('style', style), + ), + ('body', + ('nav', + user_info(current_user.get_id()) if current_user.is_authenticated else login_prompt() + ), + flashed_messages(), + body)) + + +def response_for(id: str, username: Optional[str] = None) -> str: + + mail = cast(EmailMessage, get_mail(id)) + + headers = {} + for (key, value) in mail.items(): + headers[key.lower()] = value + + head = [] + for h in ['date', 'from', 'to', 'cc', 'bcc', 'subject', 'x-original-to', + 'in-reply-to']: + if x := headers.get(h.lower()): + head += [('dt', h.title()), + ('dd', header_format(h.lower(), x))] + + body_part = mail.get_body(preferencelist=('html', 'plain')) + if not body_part: + raise ValueError("No suitable body in email") + ct = body_part.get_content_type() + body: HTML + if ct == 'text/html': + body = lambda: cast(EmailMessage, body_part).get_content() + else: + body = ('pre', cast(EmailMessage, body_part).get_content()) + + if t := headers.get('subject'): + title = f'Mail — {t}' + else: + title = 'Mail' + + main_body = [('dl', *head), + ('hr',), + ('main', body), + ('hr',), + ('ul', attachement_tree(mail)), + ] + html_str = render_document(page_base(title=title, + body=main_body)) + + return html_str + + +def search_field(q: str) -> HTML: + return ('form', {'action': '/search', 'method': 'GET'}, + ('label', {'for': 'search'}, + 'Mu Search Query'), + ('textarea', {'id': 'search', 'name': 'q'}, + q), + ('input', {'type': 'Submit'})) + + +def search_result(q, by, reverse) -> HTML: + + # keys = ['from', 'to', 'subject', 'date', 'size', 'maildir', 'msgid'] + keys = ['from', 'to', 'subject', 'date'] + + rows = mu_search(q, by, reverse) + body: list[tuple] = [] + for row in rows: + rowdata = [] + for key in keys: + data = row.get(key, None) + if data and key == 'date': + dt = datetime.fromtimestamp(int(data)) + data = dt.strftime('%Y-%m-%d %H:%M') + rowdata.append(('td', ('a', {'href': '/?id=' + row['msgid']}, data))) + body.append(('tr', rowdata)) + + + # TODO print number of results + + return ('table', + ('thead', + ('tr', + [('th', m.title()) for m in keys])), + ('tbody', + body + )) + + +def search_page(q, by): + main_body = [search_field(q)] + + if q: + main_body.append(search_result(q, by, False)) + + return render_document(page_base(title='Search', + body=main_body)) + + +def index_page(): + ids = [ + 'CAEzixGsw-4zJ8_ejK_vDgmcQ9s-MbBc-ho+HL4arV4a+ghOOPg@mail.gmail.com', + 'CA+pcBt-gLb0GtbFOjJ5_7Q_WXtqApVPQ9w-3O7GH=VqCEQat6g@mail.gmail.com', + ] + + body = [('h1', "Sample ID's"), + ('ul', + [('li', ('a', {'href': '?' + urlencode({'id': id})}, id)) + for id in ids] + ), + ] + + return render_document(page_base(title='Mail index', + body=body)) + + +passwords: Passwords = password.Passwords(cast(os.PathLike, 'passwords.json')) + + +app = Flask(__name__) +login_manager.init_app(app) +app.secret_key = 'THIS IS A RANDOM STRING' + + +class User: + def __init__(self, username: str): + self._username = username + self._authenticated = False + + # @property + def is_authenticated(self): + return self._authenticated + + # @property + def is_active(self): + return True + + # @property + def is_anonymous(self): + return False + + # @property + def get_id(self): + return self._username + + + + +@login_manager.user_loader +def load_user(user_id): + # return User.get(user_id) + return User(user_id) + + +@app.route('/') +def index(): + if not current_user.is_authenticated: + return redirect(url_for('login_page_', returnto=request.path)) + if id := request.args.get('id'): + print("id =", id) + response = response_for(''.join(id).replace(' ', '+')) + else: + response = index_page() + return response + + +@app.route('/search') +@login_required +def search_page_(): + return search_page(request.args.get('q'), + request.args.get('by', None)) + + +@app.route('/login', methods=['GET']) +def login_page_(): + body = login_page(request.args.get('returnto')) + return render_document(page_base(title='Login', body=body)) + + +@app.route('/login', methods=['POST']) +def login_form(): + resp = redirect(request.args.get('returnto', url_for('index'))) + + username = request.form['username'] + password = request.form['password'] + user = User(username) + if passwords.validate(username, password): + login_user(user) + else: + flash('Invalid username or password') + return resp + + +@app.route('/logout', methods=['POST']) +@login_required +def logout_form(): + logout_user() + return redirect(url_for('index')) + + +if __name__ == '__main__': + app.run(debug=True, port=8090) diff --git a/mu4web/mu.py b/mu4web/mu.py new file mode 100644 index 0000000..d52a362 --- /dev/null +++ b/mu4web/mu.py @@ -0,0 +1,128 @@ +""" +Wrapper for the `mu` command line. +""" + +import email.message +import email.policy +from email.parser import BytesParser +import subprocess +from subprocess import PIPE +import xml.dom.minidom +import xml.dom + +from typing import ( + Literal, + Optional, + Union, +) + +parser = BytesParser(policy=email.policy.default) + + +def get_mail(id: str) -> email.message.Message: + """ + Lookup email by Message-ID. + + [Raises] + MuError + """ + cmd = subprocess.run(['mu', 'find', '-u', f'i:{id}', + '--fields', 'l'], + stdout=PIPE) + filename = cmd.stdout.decode('UTF-8').strip() + + if cmd.returncode != 0: + raise MuError(cmd.returncode) + + with open(filename, "rb") as f: + mail = parser.parse(f) + return mail + + +class MuError(Exception): + codes = { + 1: 'General Error', + 2: 'No Matches', + 4: 'Database is corrupted' + } + + def __init__(self, returncode: int): + self.returncode: int = returncode + self.msg: str = MuError.codes.get(returncode, 'Unknown Error') + + def __repr__(self): + return f'MuError({self.returncode}, "{self.msg}")' + + def __str__(self): + return repr(self) + + +Sortfield = Union[Literal['cc'], + Literal['c'], + Literal['bcc'], + Literal['h'], + Literal['date'], + Literal['d'], + Literal['from'], + Literal['f'], + Literal['maildir'], + Literal['m'], + Literal['msgid'], + Literal['i'], + Literal['prio'], + Literal['p'], + Literal['subject'], + Literal['s'], + Literal['to'], + Literal['t'], + Literal['list'], + Literal['v']] + + + +def mu_search(query, sortfield: Optional[Sortfield] = 'subject', reverse: bool = False) -> list[dict[str, str]]: + """ + [Parameters] + query - Search query as per mu-find(1). + sortfield - Field to sort the values by + reverse - If the sort should be reversed + + [Returns] + >>> {'from': 'Hugo Hörnquist ', + 'date': '1585678375', + 'size': '377', + 'msgid': 'SAMPLE-ID@localhost', + 'path': '/home/hugo/.local/var/mail/INBOX/cur/filename', + 'maildir': '/INBOX' + } + """ + + if not query: + raise ValueError('Query required for mu_search') + cmdline = ['mu', 'find', + '--format=xml', + query] + if sortfield: + cmdline.extend(['--sortfield', sortfield]) + if reverse: + cmdline.append('--reverse') + print(cmdline) + cmd = subprocess.run(cmdline, capture_output=True) + if cmd.returncode != 0: + raise MuError(cmd.returncode) + dom = xml.dom.minidom.parseString(cmd.stdout.decode('UTF-8')) + + message_list = [] + messages = dom.childNodes[0] + assert messages.localName == 'messages' + for message in messages.childNodes: + msg_dict = {} + if message.nodeType != xml.dom.Node.ELEMENT_NODE: + continue + for kv in message.childNodes: + if kv.nodeType != xml.dom.Node.ELEMENT_NODE: + continue + msg_dict[kv.localName] = kv.childNodes[0].data + message_list.append(msg_dict) + + return message_list diff --git a/mu4web/password.py b/mu4web/password.py new file mode 100755 index 0000000..ff0df21 --- /dev/null +++ b/mu4web/password.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 + +""" +Simple password store, backed by a JSON file. + +Also contains an entry point for managing the store. +""" + +import hashlib +import json +import os +import random +from typing import ( + TypedDict, +) + + +def gen_salt(length: int = 10) -> str: + # TODO is this a sufficient source of randomness + return bytearray(random.randint(0, 256) for _ in range(length)).hex() + + +# Manual list of entries, to stop someone from executing arbitrary +# code by modyfying password database +hash_methods = { + 'sha256': hashlib.sha256 +} + + +class PasswordEntry(TypedDict): + hash: str + salt: str + # One of the keys of hash_methods + method: str + + +class Passwords: + """ + Simple password store. + + [Parameters] + fname - Path of json file to load and store from. + """ + def __init__(self, fname: os.PathLike): + self.fname = fname + self.db: dict[str, PasswordEntry] + try: + with open(fname) as f: + self.db = json.load(f) + except Exception: + self.db = {} + + def save(self) -> None: + """Dump current data to disk.""" + try: + with open(os.fspath(self.fname) + '.tmp', 'w') as f: + json.dump(self.db, f) + f.write('\n') + os.rename(os.fspath(self.fname) + '.tmp', self.fname) + except Exception as e: + print(f'Saving password failed {e}') + + def add(self, username: str, password: str) -> None: + """Add (or modify) entry in store.""" + if cur := self.db.get(username): + salt = cur['salt'] + hashed = hashlib.sha256((salt + password).encode('UTF-8')) + self.db[username] = { + 'hash': hashed.hexdigest(), + 'salt': salt, + 'method': 'sha256', + } + else: + salt = gen_salt() + hashed = hashlib.sha256((salt + password).encode('UTF-8')) + self.db[username] = { + 'hash': hashed.hexdigest(), + 'salt': salt, + 'method': 'sha256' + } + + def validate(self, username: str, password: str) -> bool: + """Check if user exists, and if it has a correct password.""" + # These shall fail when key is missing + data = self.db[username] + proc = hash_methods[data['method']] + digest = proc((data['salt'] + password).encode('UTF-8')).hexdigest() + return data['hash'] == digest + + +def main(): + import argparse + parser = argparse.ArgumentParser() + + parser.add_argument('--file', default='passwords.json') + + subparsers = parser.add_subparsers(dest='cmd') + + add_parser = subparsers.add_parser('add') + add_parser.add_argument('username') + add_parser.add_argument('password') + + val_parser = subparsers.add_parser('validate') + val_parser.add_argument('username') + val_parser.add_argument('password') + + args = parser.parse_args() + + passwords = Passwords(args.file) + if args.cmd == 'add': + passwords.add(args.username, args.password) + passwords.save() + elif args.cmd == 'validate': + print(passwords.validate(args.username, args.password)) + else: + parser.print_help() + + +if __name__ == '__main__': + main() diff --git a/password.py b/password.py deleted file mode 100755 index ff0df21..0000000 --- a/password.py +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/bin/env python3 - -""" -Simple password store, backed by a JSON file. - -Also contains an entry point for managing the store. -""" - -import hashlib -import json -import os -import random -from typing import ( - TypedDict, -) - - -def gen_salt(length: int = 10) -> str: - # TODO is this a sufficient source of randomness - return bytearray(random.randint(0, 256) for _ in range(length)).hex() - - -# Manual list of entries, to stop someone from executing arbitrary -# code by modyfying password database -hash_methods = { - 'sha256': hashlib.sha256 -} - - -class PasswordEntry(TypedDict): - hash: str - salt: str - # One of the keys of hash_methods - method: str - - -class Passwords: - """ - Simple password store. - - [Parameters] - fname - Path of json file to load and store from. - """ - def __init__(self, fname: os.PathLike): - self.fname = fname - self.db: dict[str, PasswordEntry] - try: - with open(fname) as f: - self.db = json.load(f) - except Exception: - self.db = {} - - def save(self) -> None: - """Dump current data to disk.""" - try: - with open(os.fspath(self.fname) + '.tmp', 'w') as f: - json.dump(self.db, f) - f.write('\n') - os.rename(os.fspath(self.fname) + '.tmp', self.fname) - except Exception as e: - print(f'Saving password failed {e}') - - def add(self, username: str, password: str) -> None: - """Add (or modify) entry in store.""" - if cur := self.db.get(username): - salt = cur['salt'] - hashed = hashlib.sha256((salt + password).encode('UTF-8')) - self.db[username] = { - 'hash': hashed.hexdigest(), - 'salt': salt, - 'method': 'sha256', - } - else: - salt = gen_salt() - hashed = hashlib.sha256((salt + password).encode('UTF-8')) - self.db[username] = { - 'hash': hashed.hexdigest(), - 'salt': salt, - 'method': 'sha256' - } - - def validate(self, username: str, password: str) -> bool: - """Check if user exists, and if it has a correct password.""" - # These shall fail when key is missing - data = self.db[username] - proc = hash_methods[data['method']] - digest = proc((data['salt'] + password).encode('UTF-8')).hexdigest() - return data['hash'] == digest - - -def main(): - import argparse - parser = argparse.ArgumentParser() - - parser.add_argument('--file', default='passwords.json') - - subparsers = parser.add_subparsers(dest='cmd') - - add_parser = subparsers.add_parser('add') - add_parser.add_argument('username') - add_parser.add_argument('password') - - val_parser = subparsers.add_parser('validate') - val_parser.add_argument('username') - val_parser.add_argument('password') - - args = parser.parse_args() - - passwords = Passwords(args.file) - if args.cmd == 'add': - passwords.add(args.username, args.password) - passwords.save() - elif args.cmd == 'validate': - print(passwords.validate(args.username, args.password)) - else: - parser.print_help() - - -if __name__ == '__main__': - main() diff --git a/setup.cfg b/setup.cfg index 1b76850..bf73722 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,24 @@ +[metadata] +name = mu4web +version = attr: mu4web.VERSION +description = Web interface for the mu Maildir indexer +url = https://git.hornquist.se/mu4web +author = Hugo Hörnquist +author_email = hugo@lysator.liu.se +classifiers = + Programming Language :: Python :: 3 + Environment :: Web Environment + +[options] +python_requires = >= 3.9 +install_requires = + flask >= 2.2.2 + flask-login >= 0.6 + urllib3 >= 1.26 +setup_requires = + setuptools +py_modules = mu4web +packages = mu4web + [flake8] ignore = E731 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..8bf1ba9 --- /dev/null +++ b/setup.py @@ -0,0 +1,2 @@ +from setuptools import setup +setup() -- cgit v1.2.3