diff options
author | Hugo Hörnquist <hugo@lysator.liu.se> | 2022-10-31 09:08:31 +0100 |
---|---|---|
committer | Hugo Hörnquist <hugo@lysator.liu.se> | 2022-10-31 09:08:31 +0100 |
commit | 448350fdd44b745a1f9fbfec7a5ce4b7f4866c4d (patch) | |
tree | 87b7926b8be8f231e5cc34f5206cbe9af812b57b /main.py | |
parent | Move stuff to sub-files. (diff) | |
download | mu4web-448350fdd44b745a1f9fbfec7a5ce4b7f4866c4d.tar.gz mu4web-448350fdd44b745a1f9fbfec7a5ce4b7f4866c4d.tar.xz |
Replace wed server with flask.
Diffstat (limited to '')
-rw-r--r-- | main.py | 224 |
1 files changed, 89 insertions, 135 deletions
@@ -1,7 +1,6 @@ from email.message import EmailMessage from email.headerregistry import Address -from urllib.parse import urlparse, urlencode, parse_qs -from http.cookies import BaseCookie +from urllib.parse import urlencode import http.cookies import password from password import Passwords @@ -14,7 +13,15 @@ from typing import ( from mu import mu_search, get_mail from html_render import HTML, render_document -from http.server import HTTPServer, BaseHTTPRequestHandler + +from flask import ( + Flask, + request, + redirect, + url_for, + flash, + get_flashed_messages +) def mailto(addr: str) -> HTML: @@ -114,6 +121,11 @@ def login_prompt() -> HTML: return ('a', {'href': '/login'}, 'Logga in') +def flashed_messages() -> HTML: + return ('ul', {'class': 'flashes'}, + *[('li', msg) for msg in get_flashed_messages()]) + + def page_base(title: Optional[str] = None, body: HTML = [], username: Optional[str] = None) -> HTML: @@ -127,6 +139,7 @@ def page_base(title: Optional[str] = None, ('nav', user_info(username) if username else login_prompt() ), + flashed_messages(), body)) @@ -235,147 +248,88 @@ def index_page(username): valid_session_cookies: dict[str, str] = {} +passwords: Passwords = password.Passwords(cast(os.PathLike, 'passwords.json')) -def validate_session_cookie(cookie: http.cookies.Morsel) -> Optional[str]: - return valid_session_cookies.get(cookie.value) +def is_logged_in(): + c = request.cookies.get('session') + if c and valid_session_cookies.get(c): + return valid_session_cookies[c] + return False -def remove_session_cookie(cookie: http.cookies.Morsel) -> http.cookies.Morsel: - if valid_session_cookies.get(cookie.value): - del valid_session_cookies[cookie.value] - cookie.set(cookie.key, '', '') - # TODO how to expire cookie - # cookie.expires = 0 - return cookie +app = Flask(__name__) -passwords: Passwords = password.Passwords(cast(os.PathLike, 'passwords.json')) +@app.route('/') +def index(): + login = is_logged_in() + if not login: + return redirect(url_for('login_page_', returnto=request.path)) + if id := request.args.get('id'): + print("id =", id) + response = response_for(''.join(id).replace(' ', '+'), + login) + else: + response = index_page(login) + return response -def new_session_cookie(username: str) -> http.cookies.Morsel: - global valid_session_cookies - m: http.cookies.Morsel = http.cookies.Morsel() - unique = str(uuid4()) - valid_session_cookies[unique] = username - m.set('session', unique, unique) - return m - - -class Handler(BaseHTTPRequestHandler): - def do_GET(self): - url = urlparse(self.path) - query = parse_qs(url.query) - # print(type(self.headers)) - - cookies = BaseCookie(self.headers.get('Cookie')) - logged_in = None - if c := cookies.get('session'): - logged_in = validate_session_cookie(c) - - if url.path == '/': - if not logged_in: - self.send_response(307) - q = urlencode({'returnto': self.path}) - self.send_header('location', '/login?' + q) - self.end_headers() - else: - if id := query.get('id'): - print("id =", id) - response = response_for(''.join(id).replace(' ', '+'), - logged_in) - self.send_response(200) - else: - response = index_page(logged_in) - self.send_response(200) - - response = response.encode('UTF-8') - self.send_header('Content-Type', 'text/html; charset=UTF-8') - self.send_header('Content-Length', len(response)) - self.end_headers() - self.wfile.write(response) - - elif url.path == '/search': - if not logged_in: - self.send_response(307) - q = urlencode({'returnto': self.path}) - self.send_header('location', '/login?' + q) - self.end_headers() - else: - response = search_page(query.get('q'), - query.get('by'), - logged_in) - self.send_response(200) - response = response.encode('UTF-8') - self.send_header('Content-Type', 'text/html; charset=UTF-8') - self.send_header('Content-Length', len(response)) - self.end_headers() - self.wfile.write(response) - - elif url.path == '/login': - if not logged_in: - body = login_page(''.join(query.get('returnto'))) - self.send_response(200) - content = render_document(page_base(title='Login', body=body)) - content = content.encode('UTF-8') - self.send_header('Content-Type', 'text/html; charset=UTF-8') - self.send_header('Content-Length', len(content)) - self.end_headers() - self.wfile.write(content) - else: - # TODO do something sensible here - pass - - def do_POST(self): - url = urlparse(self.path) - # query = parse_qs(url.query) - cookies = BaseCookie(self.headers.get('Cookie')) - logged_in = None - if c := cookies.get('session'): - logged_in = validate_session_cookie(c) - print(cookies) - print(valid_session_cookies) - - if url.path == '/login': - # cl = content_length = self.headers.get('content-length') - cl = self.headers.get('content-length') - data = parse_qs(self.rfile.read(int(cl))) - username = b''.join(data[b'username']).decode('UTF-8') - password = b''.join(data[b'password']).decode('UTF-8') - if passwords.validate(username, password): - cookie = new_session_cookie(username) - self.send_response(302) - self.send_header('set-cookie', cookie.OutputString()) - if ret := data.get(b'returnto'): - returnto = b''.join(ret).decode('UTF-8') - self.send_header('location', returnto) - else: - self.send_header('location', '/') - else: - self.send_response(302) - self.send_header('location', '/') - - self.end_headers() - - if url.path == '/logout': - if not logged_in: - self.send_response(302) - self.send_header('Location', '/') - self.end_headers() - return - cookie = remove_session_cookie(cookies.get('session')) - self.send_response(302) - self.send_header('set-cookie', cookie) - # TODO use the referer header? - self.send_header('Location', '/') - self.end_headers() +@app.route('/search') +def search_page_(): + login = is_logged_in() + if not login: + return redirect(url_for('login_page_', returnto=request.path)) + return search_page(request.args.get('q'), + request.args.get('by'), + login) -if __name__ == '__main__': - server = HTTPServer(('0', 8090), Handler) - try: - server.serve_forever() - except KeyboardInterrupt: +@app.route('/login', methods=['GET']) +def login_page_(): + if not is_logged_in(): + body = login_page(request.args.get('returnto')) + return render_document(page_base(title='Login', body=body)) + else: + # TODO do something sensible here pass - server.server_close() + +@app.route('/login', methods=['POST']) +def login_form(): + global valid_session_cookies + logged_in = is_logged_in() + + resp = redirect(request.args.get('returnto', url_for('index'))) + if logged_in: + flash('Already loged in') + return resp + + username = request.form['username'] + password = request.form['password'] + if passwords.validate(username, password): + unique = str(uuid4()) + valid_session_cookies[unique] = username + resp.set_cookie('session', unique) + else: + flash('Invalid username or password') + return resp + + +@app.route('/logout', methods=['POST']) +def logout_form(): + global valid_session_cookies + logged_in = is_logged_in() + if not logged_in: + flash('Not logged in') + return redirect(url_for('index')) + c = request.cookies.get('session') + if valid_session_cookies.get(c): + del valid_session_cookies[c] + resp = redirect(url_for('index')) + resp.set_cookie('session', '') + return resp + + +if __name__ == '__main__': + app.run(debug=True, port=8090) |