aboutsummaryrefslogtreecommitdiff
path: root/mu4web/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'mu4web/main.py')
-rw-r--r--mu4web/main.py227
1 files changed, 193 insertions, 34 deletions
diff --git a/mu4web/main.py b/mu4web/main.py
index 5d2d975..e56a0df 100644
--- a/mu4web/main.py
+++ b/mu4web/main.py
@@ -1,3 +1,5 @@
+"""Web routes for mu4web."""
+
from email.message import EmailMessage
from email.headerregistry import Address
from urllib.parse import urlencode
@@ -13,6 +15,7 @@ from flask_login import (
logout_user,
)
from typing import (
+ Any,
Optional,
cast,
)
@@ -33,7 +36,6 @@ from flask import (
get_flashed_messages
)
-
#
# A few operations depend on the index of attachements. These index
# are all pre-order traversal indexes of the attachement tree, which
@@ -44,7 +46,7 @@ login_manager = LoginManager()
def mailto(addr: str) -> HTML:
- """Constructs a mailto anchor element."""
+ """Construct a mailto anchor element."""
return ('a', {'href': f'mailto:{addr}'}, addr)
@@ -54,7 +56,7 @@ def format_email(addr: Address) -> list[HTML]:
return [addr.display_name, ' <', mailto(mail_addr), '>']
-def header_format(key: str, value) -> HTML:
+def header_format(key: str, value: Any) -> HTML:
"""Format email headers to HTML."""
if key in ['to', 'cc', 'bcc']:
return ('ul', *[('li', *format_email(addr))
@@ -66,10 +68,14 @@ def header_format(key: str, value) -> HTML:
id = str(value).strip("<>")
return ['<', ('a', {'href': '?' + urlencode({'id': id})}, id), '>']
else:
- return value
+ # assert type(value) == str, f"Weird value in header {value!r}"
+ return str(value)
-def attachement_tree(id: str, mail: EmailMessage, idx=0) -> tuple[HTML, int]:
+def attachement_tree(id: str,
+ mail: EmailMessage,
+ idx: int = 0) -> tuple[HTML, int]:
+ """Construct a tree of all attachements for the given mail."""
ct = mail.get_content_type()
fn = mail.get_filename()
@@ -102,6 +108,7 @@ def attachement_tree(id: str, mail: EmailMessage, idx=0) -> tuple[HTML, int]:
def login_page(returnto: Optional[str] = None) -> HTML:
+ """HTML form for the login page."""
return ('form', {'action': '/login', 'method': 'POST', 'class': 'loginform'},
('label', {'for': 'username'}, 'Användarnamn'),
('input', {'id': 'username', 'name': 'username', 'placeholder': 'Användarnamn'}),
@@ -121,21 +128,29 @@ def login_page(returnto: Optional[str] = None) -> HTML:
def user_info(username: str) -> HTML:
+ """
+ Return user info for top bar.
+
+ Includes the users name, and a button for logging out.
+ """
return [('span', username),
('form', {'action': '/logout', 'method': 'POST'},
- ('input', {'type': 'submit', 'value': 'Logga ut'}))]
+ ('input', {'type': 'submit', 'value': 'Logga ut'}))]
def login_prompt() -> HTML:
+ """Return link to the login page."""
return ('a', {'href': '/login'}, 'Logga in')
def flashed_messages() -> HTML:
+ """Return Flasks flashed messages, formatted as a list."""
return ('ul', {'class': 'flashes'},
*[('li', msg) for msg in get_flashed_messages()])
-def include_stylesheet(path):
+def include_stylesheet(path: str) -> HTML:
+ """Return HTML for including a stylesheet inside the <head>."""
return ('link', {'type': 'text/css',
'rel': 'stylesheet',
'href': path})
@@ -143,12 +158,29 @@ def include_stylesheet(path):
def page_base(title: Optional[str] = None,
body: HTML = []) -> HTML:
+ """
+ Build base layout for almost all pages.
+
+ The base contents of our html page, from the <html/> tag and down.
+
+ :param title:
+ Local pagetitle, will be suffixed with site suffix.
+
+ :param body:
+ Contents of the page. Will work without this, but the page
+ would lack any actual contents.
+ """
+ if title:
+ full_title = f'{title} — Mu4Web'
+ else:
+ full_title = 'Mu4Web'
+
return ('html', {'lang': 'sv'},
('head',
('meta', {'charset': 'utf-8'}),
('meta', {'name': 'viewport',
'content': 'width=device-width, initial-scale=0.5'}),
- ('title', title, ' — Mu4Web'),
+ ('title', full_title),
include_stylesheet(url_for('static', filename='style.css')),
),
('body',
@@ -177,11 +209,19 @@ def page_base(title: Optional[str] = None,
))))
-def response_for(id: str) -> str:
-
- mail = cast(EmailMessage, get_mail(id))
+def response_for(id: str, mail: EmailMessage) -> str:
+ """
+ Build response page for an email or a tree.
+ :param id:
+ The message id of the root message
+ :param mail:
+ Either the root component of a mail, or a sub-component of
+ type message/rfc822.
+ """
+ # Setup headers
headers = {}
+ print("mail", mail)
for (key, value) in mail.items():
headers[key.lower()] = value
@@ -200,11 +240,13 @@ def response_for(id: str) -> str:
('summary', 'Alla mailhuvuden'),
('dl', *all_heads))
+ # Setup title
if t := headers.get('subject'):
title = f'Mail — {t}'
else:
title = 'Mail'
+ # Setup body
body: list[HTML] = []
# Manual walk to preserve attachement index
for idx, at in enumerate(mail.walk()):
@@ -229,6 +271,7 @@ def response_for(id: str) -> str:
body.append(('a', {'href': url},
at.get_filename() or at.get_content_type()))
+ # Setup attachements
tree, idx = attachement_tree(id, mail)
main_body: list[HTML] = [('dl', *head),
@@ -247,6 +290,7 @@ def response_for(id: str) -> str:
def search_field(q: str) -> HTML:
+ """Build large search form for search page."""
return ('form', {'id': 'searchform',
'action': '/search',
'method': 'GET'},
@@ -261,6 +305,18 @@ def search_field(q: str) -> HTML:
def search_result(q: str, by: Optional[str], direction: str) -> HTML:
+ """
+ Search database for query, and build resulting HTML body.
+
+ :param q:
+ Mu search query.
+
+ :param by:
+ Parameter to sort by.
+
+ :param direction:
+ Direction to sort results in. One of 'rising' or 'falling'.
+ """
assert direction in ('rising', 'falling')
# keys = ['from', 'to', 'subject', 'date', 'size', 'maildir', 'msgid']
@@ -269,16 +325,16 @@ def search_result(q: str, by: Optional[str], direction: str) -> HTML:
by = app.config['DEFAULT_SORT_COLUMN']
rows = mu.search(q, by, direction == 'falling')
- body: list[tuple] = []
+ body: list[HTML] = []
for row in rows:
- rowdata = []
+ rowdata: list[HTML] = []
for key in keys:
data = row.get(key, None)
if data and key == 'date':
dt = datetime.fromtimestamp(int(data))
data = dt.strftime('%Y-%m-%d %H:%M')
- rowdata.append(('td', ('a', {'href': '/?id=' + row['msgid']}, data)))
- body.append(('tr', rowdata))
+ rowdata.append(['td', ('a', {'href': '/?id=' + row['msgid']}, data)])
+ body.append(['tr', rowdata])
if len(rows) == 0:
return "Inga träffar"
@@ -311,6 +367,7 @@ def search_result(q: str, by: Optional[str], direction: str) -> HTML:
def search_page(q: str, by: Optional[str],
direction: str) -> str:
+ """Return rendered HTML for search page."""
main_body = [search_field(q)]
# TODO pagination
@@ -323,7 +380,8 @@ def search_page(q: str, by: Optional[str],
body=main_body))
-def index_page():
+def index_page() -> str:
+ """Return rendered HTML for index page."""
data = mu.info()
maildirs = find_maildirs(data['maildir'] + '/')
@@ -334,9 +392,9 @@ def index_page():
rows.append(('tr',
('td', key),
('td', value)))
- body = [('div', ('table', ('tbody', rows))),
- ('div', entries),
- ]
+ body: HTML = [('div', ('table', ('tbody', rows))),
+ ('div', entries),
+ ]
return render_document(page_base(title='E-postindex',
body=body))
@@ -361,18 +419,41 @@ app.config.from_pyfile('settings.py')
login_manager.init_app(app)
+def fix_id(id: str) -> str:
+ """Update an ID gotten through a GET parameter into something mu likes."""
+ return ''.join(id).replace(' ', '+')
+
+
@login_manager.user_loader
def load_user(user_id):
+ """
+ Find the user with the given id, and return its session object.
+
+ :param user_id:
+ The string id of the user.
+
+ .. todo::
+
+ Return None on invalid id.
+ """
# return User.get(user_id)
return LocalUser(user_id)
@app.route('/')
def index():
+ """
+ Return index page, mail page, or redirect to login page.
+
+ If the user isn't logged in, then the user is redirected to the login page.
+ Otherwise, if the ``id`` param is set, show the email with that message id,
+ and finally show the index page if nothing else matched.
+ """
if not current_user.is_authenticated:
return redirect(url_for('login_page_', returnto=request.path))
if id := request.args.get('id'):
- response = response_for(''.join(id).replace(' ', '+'))
+ fixed_id = fix_id(id)
+ response = response_for(fixed_id, cast(EmailMessage, get_mail(fixed_id)))
else:
response = index_page()
return response
@@ -381,6 +462,12 @@ def index():
@app.route('/search')
@login_required
def search_page_():
+ """
+ Search page response.
+
+ :param q:
+ :param by:
+ """
direction = request.args.get('direction', app.config['DEFAULT_DIRECTION'])
if direction not in ('rising', 'falling'):
direction = app.config['DEFAULT_DIRECTION']
@@ -413,9 +500,9 @@ def multipart_page(msg_id: str,
body=body))
-def attachement_response(attachement: EmailMessage):
+def attachement_response(attachement: EmailMessage) -> flask.Response:
"""
- Builds a response for a given attachement.
+ Build a response for a given attachement.
Gets content type and encoding from the attachements headers.
"""
@@ -432,6 +519,15 @@ def attachement_response(attachement: EmailMessage):
@app.route('/raw')
@login_required
def raw_message():
+ """
+ Get the "raw" bytes of an email.
+
+ Looks up a message from the mu database, and then returns the data
+ with no formatting, and a content type of message/rfc822.
+
+ :param id:
+ Message id
+ """
msg_id = request.args.get('id', '')
filename = mu.find_file(msg_id)
if not filename:
@@ -440,39 +536,53 @@ def raw_message():
class MutableString:
- def __init__(self):
+ """
+ A mutatable string.
+
+ Strings are immutable by default in python. This works almost
+ exactly like a regular string, but ``+=`` actually changes the
+ object in place.
+ """
+
+ def __init__(self) -> None:
self.str = ''
- def __iadd__(self, other):
+ def __iadd__(self, other: str) -> 'MutableString':
self.str += other
return self
- def __repr__(self):
+ def __repr__(self) -> str:
return f'MutableString("{self.str}")'
- def __str__(self):
+ def __str__(self) -> str:
return self.str
class IMGParser(HTMLParser):
"""
Rewrites HTML image tags to be safer/have more functionality.
+
Should only be fed the image tag. Everything else is assumed to be
directly copied externaly.
- [Parameters]
- result - A mutable string which should be appended with the
- (possibly changed) img tag.
- msg_id - The Email Message ID field, used to construct some links.
+ :param result:
+ A mutable string which should be appended with the (possibly
+ changed) img tag.
+ :param msg_id:
+ The Email Message ID field, used to construct some links.
"""
+
rx = re.compile('cid:(.*)')
- def __init__(self, result, msg_id):
+ def __init__(self, result: MutableString, msg_id: str) -> None:
super().__init__()
self.result = result
self.msg_id = msg_id
- def handle_starttag(self, tag, attrs):
+ def handle_starttag(self, # noqa: D102
+ tag: str,
+ attrs: list[tuple[str, Optional[str]]]
+ ) -> None:
# TODO this will also get called for self closing tags
# (<img/>), which will drop that slash. FIX
@@ -484,6 +594,8 @@ class IMGParser(HTMLParser):
# later added to unblock them on click
self.result += '<img '
for key, value in attrs:
+ if not value:
+ value = key
if key == 'src':
if m := IMGParser.rx.match(value):
params = urlencode({'id': self.msg_id, 'cid': m[1]})
@@ -521,7 +633,7 @@ class IMGParser(HTMLParser):
# in iframe:s (where the content will probably be shown)
# to open in the current (top level) page, instead of
# inside the iframe.
- args = ' '.join(f'{html.escape(key)}={html.escape(value)}'
+ args = ' '.join(f'{html.escape(key)}={html.escape(value or key)}'
for (key, value)
in [*attrs, ('target', '_parent')])
self.result += f'<a {args}>'
@@ -533,6 +645,29 @@ class IMGParser(HTMLParser):
@app.route('/part')
@login_required
def attachement_part_page():
+ """
+ Page for a specific attachment.
+
+ :param id:
+ Message id for the message we want.
+
+ :param idx:
+ Optional numeric index of the attachement of the message,
+ counted as the lines appear when printing the tree, one line
+ at a time.::
+
+ 0. ├── a
+ 1. │   ├── b
+ 2. │   └── c
+ 3. └── d
+
+ Defaults to 0 (the root element).
+
+ :param raw:
+ Optional boolean parameter, which if set, returns the
+ attachement verabtim as stored on disk, instead of rendered
+ into HTML.
+ """
msg_id = request.args.get('id')
raw = request.args.get('raw')
if not msg_id:
@@ -541,7 +676,10 @@ def attachement_part_page():
mail = cast(EmailMessage, get_mail(msg_id))
attachement = list(mail.walk())[attachement_idx]
- if attachement.is_multipart():
+ if attachement.get_content_type() == 'message/rfc822':
+ return response_for(fix_id(msg_id), attachement)
+
+ elif attachement.is_multipart():
return multipart_page(msg_id,
attachement,
attachement_idx)
@@ -586,6 +724,18 @@ def attachement_part_page():
# Possibly change this to actually use that form of URI:s
@app.route('/cid', methods=['GET'])
def cid():
+ """
+ Get contents of a cid.
+
+ :param id:
+ A message id.
+ :param cid:
+ A given content id.
+
+ :return:
+ A response with the given object, with correct content-type
+ headers.
+ """
msg_id = request.args.get('id')
cid = request.args.get('cid')
if not msg_id:
@@ -605,6 +755,7 @@ def cid():
@app.route('/login', methods=['GET'])
def login_page_():
+ """Login page."""
returnto = request.args.get('returnto')
if current_user.is_authenticated:
# Redirect away already logged in users
@@ -620,6 +771,7 @@ def login_page_():
@app.route('/login', methods=['POST'])
def login_form():
+ """Login a user."""
resp = redirect(request.args.get('returnto', url_for('index')))
username = request.form['username']
@@ -636,9 +788,16 @@ def login_form():
@app.route('/logout', methods=['POST'])
@login_required
def logout_form():
+ """Logout the currently logged in user."""
logout_user()
return redirect(url_for('index'))
+@app.errorhandler(500)
+def internal_server_error(e: Any) -> tuple[str, int]:
+ """Fallback error page for 500 errors."""
+ return ("error page", 500)
+
+
if __name__ == '__main__':
app.run(debug=True, port=8090)