main.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import http.cookies
  2. import http.server
  3. import socketserver
  4. import random
  5. import time
  6. import os
  7. from urllib.parse import parse_qs
  8. from cgi import parse_header, parse_multipart
  9. import pyotp
  10. PORT = 8000
  11. TOKEN_LIFETIME = 60 * 60 * 2
  12. LAST_LOGIN_ATTEMPT = 0
  13. #SECRET = open('.totp_secret').read().strip()
  14. SECRET = os.getenv('SECRET')
  15. SECURE_COOKIE = False
  16. FORM = """
  17. <html>
  18. <head>
  19. <title>Please Log In</title>
  20. </head>
  21. <body>
  22. <form action="/auth/login" method="POST">
  23. <input type="text" name="token">
  24. <input type="submit" value="Submit">
  25. </form>
  26. </body>
  27. </html>
  28. """
  29. class TokenManager(object):
  30. """Who needs a database when you can just store everything in memory?"""
  31. def __init__(self):
  32. self.tokens = {}
  33. self.random = random.SystemRandom()
  34. def generate(self):
  35. t = '%064x' % self.random.getrandbits(8*32)
  36. self.tokens[t] = time.time()
  37. return t
  38. def is_valid(self, t):
  39. try:
  40. return time.time() - self.tokens.get(t, 0) < TOKEN_LIFETIME
  41. except Exception:
  42. return False
  43. def invalidate(self, t):
  44. if t in self.tokens:
  45. del self.tokens[t]
  46. TOKEN_MANAGER = TokenManager()
  47. class AuthHandler(http.server.BaseHTTPRequestHandler):
  48. def do_GET(self):
  49. if self.path == '/auth/check':
  50. # Check if they have a valid token
  51. cookie = http.cookies.SimpleCookie(self.headers.get('Cookie'))
  52. if 'token' in cookie and TOKEN_MANAGER.is_valid(cookie['token'].value):
  53. print('cookie ok')
  54. self.send_response(200)
  55. self.end_headers()
  56. return
  57. # Otherwise return 401, which will be redirected to '/auth/login' upstream
  58. print('no valid cookie found')
  59. self.send_response(401)
  60. self.end_headers()
  61. return
  62. if self.path == '/auth/login':
  63. # Render out the login form
  64. self.send_response(200)
  65. self.send_header('Content-type', 'text/html')
  66. self.end_headers()
  67. self.wfile.write(bytes(FORM, 'UTF-8'))
  68. return
  69. if self.path == '/auth/logout':
  70. # Invalidate any tokens
  71. cookie = http.cookies.SimpleCookie(self.headers.get('Cookie'))
  72. if 'token' in cookie:
  73. TOKEN_MANAGER.invalidate(cookie['token'].value)
  74. # This just replaces the token with garbage
  75. self.send_response(302)
  76. cookie = http.cookies.SimpleCookie()
  77. cookie["token"] = '***'
  78. cookie["token"]["path"] = '/'
  79. cookie["token"]["secure"] = SECURE_COOKIE
  80. self.send_header('Set-Cookie', cookie.output(header=''))
  81. self.send_header('Location', '/')
  82. self.end_headers()
  83. return
  84. # Otherwise return 404
  85. self.send_response(404)
  86. self.end_headers()
  87. def do_POST(self):
  88. if self.path == '/auth/login':
  89. # Rate limit login attempts to once per second
  90. global LAST_LOGIN_ATTEMPT
  91. if time.time() - LAST_LOGIN_ATTEMPT < 1.0:
  92. self.send_response(429)
  93. self.end_headers()
  94. self.wfile.write(bytes('Slow down. Hold your horses', 'UTF-8'))
  95. return
  96. LAST_LOGIN_ATTEMPT = time.time()
  97. # Check the TOTP Secret
  98. params = self.parse_POST()
  99. if (params.get(b'token') or [None])[0] == bytes(pyotp.TOTP(SECRET).now(), 'UTF-8'):
  100. print('otp ok')
  101. cookie = http.cookies.SimpleCookie()
  102. cookie["token"] = TOKEN_MANAGER.generate()
  103. cookie["token"]["path"] = "/"
  104. cookie["token"]["secure"] = SECURE_COOKIE
  105. self.send_response(302)
  106. self.send_header('Set-Cookie', cookie.output(header=''))
  107. self.send_header('Location', '/')
  108. self.end_headers()
  109. return
  110. # Otherwise redirect back to the login page
  111. else:
  112. print('otp not ok')
  113. self.send_response(302)
  114. self.send_header('Location', '/auth/login')
  115. self.end_headers()
  116. return
  117. # Otherwise return 404
  118. self.send_response(404)
  119. self.end_headers()
  120. def parse_POST(self):
  121. """Lifted from https://stackoverflow.com/questions/4233218/"""
  122. ctype, pdict = parse_header(self.headers['content-type'])
  123. if ctype == 'multipart/form-data':
  124. postvars = parse_multipart(self.rfile, pdict)
  125. elif ctype == 'application/x-www-form-urlencoded':
  126. length = int(self.headers['Content-Length'])
  127. postvars = parse_qs( self.rfile.read(length), keep_blank_values=1)
  128. else:
  129. postvars = {}
  130. return postvars
  131. socketserver.TCPServer.allow_reuse_address = True
  132. httpd = socketserver.TCPServer(("", PORT), AuthHandler)
  133. try:
  134. print("serving at port", PORT)
  135. httpd.serve_forever()
  136. finally:
  137. httpd.server_close()