import http.cookies
import http.server
import socketserver
import random
import time
import os
from urllib.parse import parse_qs
from cgi import parse_header, parse_multipart
import pyotp
PORT = 8000
TOKEN_LIFETIME = 60 * 60 * 2
LAST_LOGIN_ATTEMPT = 0
#SECRET = open('.totp_secret').read().strip()
SECRET = os.getenv('SECRET')
SECURE_COOKIE = False
FORM = """
Please Log In
"""
class TokenManager(object):
"""Who needs a database when you can just store everything in memory?"""
def __init__(self):
self.tokens = {}
self.random = random.SystemRandom()
def generate(self):
t = '%064x' % self.random.getrandbits(8*32)
self.tokens[t] = time.time()
return t
def is_valid(self, t):
try:
return time.time() - self.tokens.get(t, 0) < TOKEN_LIFETIME
except Exception:
return False
def invalidate(self, t):
if t in self.tokens:
del self.tokens[t]
TOKEN_MANAGER = TokenManager()
class AuthHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/auth/check':
# Check if they have a valid token
cookie = http.cookies.SimpleCookie(self.headers.get('Cookie'))
if 'token' in cookie and TOKEN_MANAGER.is_valid(cookie['token'].value):
print('cookie ok')
self.send_response(200)
self.end_headers()
return
# Otherwise return 401, which will be redirected to '/auth/login' upstream
print('no valid cookie found')
self.send_response(401)
self.end_headers()
return
if self.path == '/auth/login':
# Render out the login form
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(bytes(FORM, 'UTF-8'))
return
if self.path == '/auth/logout':
# Invalidate any tokens
cookie = http.cookies.SimpleCookie(self.headers.get('Cookie'))
if 'token' in cookie:
TOKEN_MANAGER.invalidate(cookie['token'].value)
# This just replaces the token with garbage
self.send_response(302)
cookie = http.cookies.SimpleCookie()
cookie["token"] = '***'
cookie["token"]["path"] = '/'
cookie["token"]["secure"] = SECURE_COOKIE
self.send_header('Set-Cookie', cookie.output(header=''))
self.send_header('Location', '/')
self.end_headers()
return
# Otherwise return 404
self.send_response(404)
self.end_headers()
def do_POST(self):
if self.path == '/auth/login':
# Rate limit login attempts to once per second
global LAST_LOGIN_ATTEMPT
if time.time() - LAST_LOGIN_ATTEMPT < 1.0:
self.send_response(429)
self.end_headers()
self.wfile.write(bytes('Slow down. Hold your horses', 'UTF-8'))
return
LAST_LOGIN_ATTEMPT = time.time()
# Check the TOTP Secret
params = self.parse_POST()
if (params.get(b'token') or [None])[0] == bytes(pyotp.TOTP(SECRET).now(), 'UTF-8'):
print('otp ok')
cookie = http.cookies.SimpleCookie()
cookie["token"] = TOKEN_MANAGER.generate()
cookie["token"]["path"] = "/"
cookie["token"]["secure"] = SECURE_COOKIE
self.send_response(302)
self.send_header('Set-Cookie', cookie.output(header=''))
self.send_header('Location', '/')
self.end_headers()
return
# Otherwise redirect back to the login page
else:
print('otp not ok')
self.send_response(302)
self.send_header('Location', '/auth/login')
self.end_headers()
return
# Otherwise return 404
self.send_response(404)
self.end_headers()
def parse_POST(self):
"""Lifted from https://stackoverflow.com/questions/4233218/"""
ctype, pdict = parse_header(self.headers['content-type'])
if ctype == 'multipart/form-data':
postvars = parse_multipart(self.rfile, pdict)
elif ctype == 'application/x-www-form-urlencoded':
length = int(self.headers['Content-Length'])
postvars = parse_qs( self.rfile.read(length), keep_blank_values=1)
else:
postvars = {}
return postvars
socketserver.TCPServer.allow_reuse_address = True
httpd = socketserver.TCPServer(("", PORT), AuthHandler)
try:
print("serving at port", PORT)
httpd.serve_forever()
finally:
httpd.server_close()