aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHugo Hörnquist <hugo@lysator.liu.se>2022-10-31 09:08:31 +0100
committerHugo Hörnquist <hugo@lysator.liu.se>2022-10-31 09:08:31 +0100
commit448350fdd44b745a1f9fbfec7a5ce4b7f4866c4d (patch)
tree87b7926b8be8f231e5cc34f5206cbe9af812b57b
parentMove stuff to sub-files. (diff)
downloadmu4web-448350fdd44b745a1f9fbfec7a5ce4b7f4866c4d.tar.gz
mu4web-448350fdd44b745a1f9fbfec7a5ce4b7f4866c4d.tar.xz
Replace wed server with flask.
-rw-r--r--main.py224
1 files changed, 89 insertions, 135 deletions
diff --git a/main.py b/main.py
index a8b5205..6cd3355 100644
--- a/main.py
+++ b/main.py
@@ -1,7 +1,6 @@
from email.message import EmailMessage
from email.headerregistry import Address
-from urllib.parse import urlparse, urlencode, parse_qs
-from http.cookies import BaseCookie
+from urllib.parse import urlencode
import http.cookies
import password
from password import Passwords
@@ -14,7 +13,15 @@ from typing import (
from mu import mu_search, get_mail
from html_render import HTML, render_document
-from http.server import HTTPServer, BaseHTTPRequestHandler
+
+from flask import (
+ Flask,
+ request,
+ redirect,
+ url_for,
+ flash,
+ get_flashed_messages
+)
def mailto(addr: str) -> HTML:
@@ -114,6 +121,11 @@ def login_prompt() -> HTML:
return ('a', {'href': '/login'}, 'Logga in')
+def flashed_messages() -> HTML:
+ return ('ul', {'class': 'flashes'},
+ *[('li', msg) for msg in get_flashed_messages()])
+
+
def page_base(title: Optional[str] = None,
body: HTML = [],
username: Optional[str] = None) -> HTML:
@@ -127,6 +139,7 @@ def page_base(title: Optional[str] = None,
('nav',
user_info(username) if username else login_prompt()
),
+ flashed_messages(),
body))
@@ -235,147 +248,88 @@ def index_page(username):
valid_session_cookies: dict[str, str] = {}
+passwords: Passwords = password.Passwords(cast(os.PathLike, 'passwords.json'))
-def validate_session_cookie(cookie: http.cookies.Morsel) -> Optional[str]:
- return valid_session_cookies.get(cookie.value)
+def is_logged_in():
+ c = request.cookies.get('session')
+ if c and valid_session_cookies.get(c):
+ return valid_session_cookies[c]
+ return False
-def remove_session_cookie(cookie: http.cookies.Morsel) -> http.cookies.Morsel:
- if valid_session_cookies.get(cookie.value):
- del valid_session_cookies[cookie.value]
- cookie.set(cookie.key, '', '')
- # TODO how to expire cookie
- # cookie.expires = 0
- return cookie
+app = Flask(__name__)
-passwords: Passwords = password.Passwords(cast(os.PathLike, 'passwords.json'))
+@app.route('/')
+def index():
+ login = is_logged_in()
+ if not login:
+ return redirect(url_for('login_page_', returnto=request.path))
+ if id := request.args.get('id'):
+ print("id =", id)
+ response = response_for(''.join(id).replace(' ', '+'),
+ login)
+ else:
+ response = index_page(login)
+ return response
-def new_session_cookie(username: str) -> http.cookies.Morsel:
- global valid_session_cookies
- m: http.cookies.Morsel = http.cookies.Morsel()
- unique = str(uuid4())
- valid_session_cookies[unique] = username
- m.set('session', unique, unique)
- return m
-
-
-class Handler(BaseHTTPRequestHandler):
- def do_GET(self):
- url = urlparse(self.path)
- query = parse_qs(url.query)
- # print(type(self.headers))
-
- cookies = BaseCookie(self.headers.get('Cookie'))
- logged_in = None
- if c := cookies.get('session'):
- logged_in = validate_session_cookie(c)
-
- if url.path == '/':
- if not logged_in:
- self.send_response(307)
- q = urlencode({'returnto': self.path})
- self.send_header('location', '/login?' + q)
- self.end_headers()
- else:
- if id := query.get('id'):
- print("id =", id)
- response = response_for(''.join(id).replace(' ', '+'),
- logged_in)
- self.send_response(200)
- else:
- response = index_page(logged_in)
- self.send_response(200)
-
- response = response.encode('UTF-8')
- self.send_header('Content-Type', 'text/html; charset=UTF-8')
- self.send_header('Content-Length', len(response))
- self.end_headers()
- self.wfile.write(response)
-
- elif url.path == '/search':
- if not logged_in:
- self.send_response(307)
- q = urlencode({'returnto': self.path})
- self.send_header('location', '/login?' + q)
- self.end_headers()
- else:
- response = search_page(query.get('q'),
- query.get('by'),
- logged_in)
- self.send_response(200)
- response = response.encode('UTF-8')
- self.send_header('Content-Type', 'text/html; charset=UTF-8')
- self.send_header('Content-Length', len(response))
- self.end_headers()
- self.wfile.write(response)
-
- elif url.path == '/login':
- if not logged_in:
- body = login_page(''.join(query.get('returnto')))
- self.send_response(200)
- content = render_document(page_base(title='Login', body=body))
- content = content.encode('UTF-8')
- self.send_header('Content-Type', 'text/html; charset=UTF-8')
- self.send_header('Content-Length', len(content))
- self.end_headers()
- self.wfile.write(content)
- else:
- # TODO do something sensible here
- pass
-
- def do_POST(self):
- url = urlparse(self.path)
- # query = parse_qs(url.query)
- cookies = BaseCookie(self.headers.get('Cookie'))
- logged_in = None
- if c := cookies.get('session'):
- logged_in = validate_session_cookie(c)
- print(cookies)
- print(valid_session_cookies)
-
- if url.path == '/login':
- # cl = content_length = self.headers.get('content-length')
- cl = self.headers.get('content-length')
- data = parse_qs(self.rfile.read(int(cl)))
- username = b''.join(data[b'username']).decode('UTF-8')
- password = b''.join(data[b'password']).decode('UTF-8')
- if passwords.validate(username, password):
- cookie = new_session_cookie(username)
- self.send_response(302)
- self.send_header('set-cookie', cookie.OutputString())
- if ret := data.get(b'returnto'):
- returnto = b''.join(ret).decode('UTF-8')
- self.send_header('location', returnto)
- else:
- self.send_header('location', '/')
- else:
- self.send_response(302)
- self.send_header('location', '/')
-
- self.end_headers()
-
- if url.path == '/logout':
- if not logged_in:
- self.send_response(302)
- self.send_header('Location', '/')
- self.end_headers()
- return
- cookie = remove_session_cookie(cookies.get('session'))
- self.send_response(302)
- self.send_header('set-cookie', cookie)
- # TODO use the referer header?
- self.send_header('Location', '/')
- self.end_headers()
+@app.route('/search')
+def search_page_():
+ login = is_logged_in()
+ if not login:
+ return redirect(url_for('login_page_', returnto=request.path))
+ return search_page(request.args.get('q'),
+ request.args.get('by'),
+ login)
-if __name__ == '__main__':
- server = HTTPServer(('0', 8090), Handler)
- try:
- server.serve_forever()
- except KeyboardInterrupt:
+@app.route('/login', methods=['GET'])
+def login_page_():
+ if not is_logged_in():
+ body = login_page(request.args.get('returnto'))
+ return render_document(page_base(title='Login', body=body))
+ else:
+ # TODO do something sensible here
pass
- server.server_close()
+
+@app.route('/login', methods=['POST'])
+def login_form():
+ global valid_session_cookies
+ logged_in = is_logged_in()
+
+ resp = redirect(request.args.get('returnto', url_for('index')))
+ if logged_in:
+ flash('Already loged in')
+ return resp
+
+ username = request.form['username']
+ password = request.form['password']
+ if passwords.validate(username, password):
+ unique = str(uuid4())
+ valid_session_cookies[unique] = username
+ resp.set_cookie('session', unique)
+ else:
+ flash('Invalid username or password')
+ return resp
+
+
+@app.route('/logout', methods=['POST'])
+def logout_form():
+ global valid_session_cookies
+ logged_in = is_logged_in()
+ if not logged_in:
+ flash('Not logged in')
+ return redirect(url_for('index'))
+ c = request.cookies.get('session')
+ if valid_session_cookies.get(c):
+ del valid_session_cookies[c]
+ resp = redirect(url_for('index'))
+ resp.set_cookie('session', '')
+ return resp
+
+
+if __name__ == '__main__':
+ app.run(debug=True, port=8090)