telemetry.auth.auth
Docstring for telemetry.auth.auth
This module is responsible for Google OAuth OIDC authentication. This allows users of this module to get the name, unique stable identifier, and role of the authenticating user.
Please note that use of this module requires the relevant environment variables to be set, i.e. OIDC_ISSUER, OIDC_CLIENT_ID, and OIDC_CLIENT_SECRET.
1""" 2Docstring for telemetry.auth.auth 3 4This module is responsible for Google OAuth OIDC authentication. 5This allows users of this module to get the name, unique stable 6identifier, and role of the authenticating user. 7 8Please note that use of this module requires the relevant environment 9variables to be set, i.e. OIDC_ISSUER, OIDC_CLIENT_ID, and 10OIDC_CLIENT_SECRET. 11""" 12 13import os 14import requests 15import threading 16 17import base64 18import hashlib 19import secrets 20 21import json 22from enum import Enum 23 24import logging 25 26import webbrowser 27from http.server import HTTPServer, BaseHTTPRequestHandler 28from urllib.parse import urlencode, urlparse, parse_qs 29 30 31logger = logging.getLogger(__name__) 32logger.setLevel(logging.INFO) 33 34logging_handler = logging.FileHandler("auth/logs.txt", encoding="utf-8") 35logging_handler.setLevel(logging.INFO) 36 37logging_handler.setFormatter(logging.Formatter( 38 fmt="%(asctime)s\n %(message)s", 39 datefmt="%Y-%m-%d %H:%M:%S" 40)) 41 42logger.addHandler(logging_handler) 43 44 45ISSUER = "https://accounts.google.com" 46CLIENT_ID = os.environ.get("OIDC_CLIENT_ID") 47CLIENT_SECRET = os.environ.get("OIDC_CLIENT_SECRET") 48 49# From Google's API documentation: "The client ID and client secret 50# obtained from the API Console are embedded in the source code of 51# your application. In this context, the client secret is 52# obviously not treated as a secret." 53# The above quote is from "https://googleapis.dev/ruby/google-api-client/v0.36.3/file.oauth-installed.html". 54# This is confirmed by "a client secret, which you embed in the source 55# code of your application. (In this context, the client secret is 56# obviously not treated as a secret.)". This quote is from "https://developers.google.com/identity/protocols/oauth2". 57 58SCOPES = ["profile", "email"] 59 60LOGGING_ENABLED: bool = True 61 62 63def validate_env_vars() -> None: 64 """ 65 Helper function which verifies all necessary environment 66 variables are set. 67 """ 68 if not ISSUER: 69 raise KeyError("OIDC_ISSUER environment variable is not set.") 70 if not CLIENT_ID: 71 raise KeyError("OIDC_CLIENT_ID environment variable is not set.") 72 if not CLIENT_SECRET: 73 raise KeyError("OIDC_SECRET environment variable is not set.") 74 return 75 76 77def get_oauth_config(): 78 url = str(ISSUER) + "/.well-known/openid-configuration" 79 req = requests.get(url=url, timeout=10) 80 req.raise_for_status() 81 return req.json() 82 83 84def open_browser(url: str) -> None: 85 """ 86 Helper function to open the web browser. 87 TODO: Compatible with different OSs? 88 """ 89 webbrowser.open(url) 90 91 92def b64url(data: bytes) -> str: 93 """Returns base 64 encoded url.""" 94 return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii") 95 96 97def make_pkce_pair() -> tuple[str, str]: 98 code_verifier = b64url(secrets.token_bytes(32)) 99 code_challenge = b64url(hashlib.sha256(code_verifier.encode("ascii")).digest()) 100 return code_verifier, code_challenge 101 102 103def server_run() -> tuple[HTTPServer, threading.Thread]: 104 """ 105 Starts the callback handler server in a separate thread. 106 Returns the server instance and the created thread. 107 """ 108 server = HTTPServer(("127.0.0.1", 0), CallbackHandler) 109 thread = threading.Thread( 110 target=server.handle_request, 111 daemon=True 112 ) 113 thread.start() 114 return server, thread 115 116 117class CallbackHandler(BaseHTTPRequestHandler): 118 def do_GET(self): 119 parsed_url = urlparse(self.path) 120 if parsed_url.path != "/callback": 121 self.send_response(404) 122 self.end_headers() 123 return 124 125 q = parse_qs(parsed_url.query) 126 self.server.auth_code = q.get("code", [None])[0] # type: ignore 127 self.server.auth_state = q.get("state", [None])[0] # type: ignore 128 self.server.auth_error = q.get("error", [None])[0] # type: ignore 129 self.server.auth_error_desc = q.get("error_description", [None])[0] # type: ignore 130 131 self.send_response(200) 132 self.send_header("Content-Type", "text/html; charset=utf-8") 133 self.end_headers() 134 self.wfile.write( 135 b"<html><body><h3>Login complete. " + 136 b"You can now close this tab.</h3></body></html>" + 137 b"""<style>body { 138 font-family: Arial, sans-serif; 139 display: flex; 140 justify-content: center; 141 align-items: center; 142 height: 100vh; 143 background: #2c2f33 144 } 145 h3 { 146 color: #9b59ff; 147 font-size: 36px; 148 font-weight: bold; 149 align-items: center; 150 justify-content: center; 151 }</style>""" 152 ) 153 154 155class Role(str, Enum): 156 PLAYER = "player" 157 DESIGNER = "designer" 158 DEVELOPER = "developer" 159 160 161def google_login() -> tuple[str, str, Role]: 162 """ 163 Prompts the user to log in via Google. 164 Opens browser to Google accounts log in page. 165 Returns the name and unique identifier of the user. 166 167 :return: User unique identifier, user name. 168 :rtype: tuple[str, str, Role] 169 :raises HTTPError: If an HTTP error occurs. 170 """ 171 validate_env_vars() 172 173 oauth_config = get_oauth_config() 174 auth_endpoint = oauth_config["authorization_endpoint"] 175 token_endpoint = oauth_config["token_endpoint"] 176 userinfo_endpoint = oauth_config["userinfo_endpoint"] 177 178 server, thread = server_run() 179 redirect_uri = f"http://127.0.0.1:{server.server_port}/callback" 180 181 state = secrets.token_urlsafe(24) 182 code_verifier, code_challenge = make_pkce_pair() 183 184 params = { 185 "response_type": "code", 186 "client_id": CLIENT_ID, 187 "redirect_uri": redirect_uri, 188 "scope": " ".join(SCOPES), 189 "state": state, 190 "code_challenge": code_challenge, 191 "code_challenge_method": "S256", 192 } 193 auth_url = f"{auth_endpoint}?{urlencode(params)}" 194 195 open_browser(auth_url) 196 if LOGGING_ENABLED: 197 logger.info("[SIE ] Sign-in prompted.") 198 199 thread.join() # Callback complete 200 server.server_close() 201 202 if not getattr(server, "auth_code", None): 203 if LOGGING_ENABLED: 204 logger.warning( 205 "[SIE ] Sign-in failed: no authorization code received." 206 ) 207 raise RuntimeError("No authorization code received.") 208 209 if server.auth_state != state: # type: ignore 210 if LOGGING_ENABLED: 211 logger.warning("[SIE ] Sign-in failed: state mismatch.") 212 raise RuntimeError("AUTH ERROR - State mismatch.") 213 214 token_resp = requests.post( 215 token_endpoint, 216 data={ 217 "grant_type": "authorization_code", 218 "code": server.auth_code, # type: ignore 219 "redirect_uri": redirect_uri, 220 "client_id": CLIENT_ID, 221 "client_secret": CLIENT_SECRET, 222 "code_verifier": code_verifier 223 }, 224 timeout=15, 225 ) 226 if not token_resp.ok: 227 if LOGGING_ENABLED: 228 logger.error("[SIE ] Sign-in failed: token exchange error" + 229 "(status = {token_resp.status_code})") 230 print("---- AUTH ERROR OCCURRED ----") 231 print("| Token status:", token_resp.status_code) 232 print("| Token body:", token_resp.text) 233 token_resp.raise_for_status() 234 235 tokens = token_resp.json() 236 237 userinfo_resp = requests.get( 238 userinfo_endpoint, 239 headers={"Authorization": f"Bearer {tokens['access_token']}"}, 240 timeout=15, 241 ) 242 userinfo_resp.raise_for_status() 243 userinfo = userinfo_resp.json() 244 245 if LOGGING_ENABLED: 246 logger.info(f"[SIE ] Sign-in successful: " + 247 f"user {userinfo.get('name')} authenticated.") 248 249 sub = userinfo.get("sub") 250 return sub, userinfo.get("name"), get_role( 251 "logins_file.json", 252 userID=sub 253 ) 254 255 256def get_role(filename: str, userID: str) -> Role: 257 """ 258 This function returns the role of the given user. It does so by 259 checking the user roles json file at the provided filepath. 260 If a profile does not exist for the given user, this function will 261 create one with the default role of "player". 262 263 :param filename: File path for the user roles json file. 264 :type filename: str 265 :param userID: The user ID of the user. 266 :type userID: int 267 :return: Returns the role of the user. 268 :rtype: Role 269 """ 270 try: 271 with open(filename, 'r') as f: 272 player_roles = json.load(f) 273 this_user = player_roles.get(str(userID)) 274 if this_user is None: 275 player_roles[str(userID)] = Role.PLAYER.value 276 with open(filename, 'w') as outfile: 277 json.dump(player_roles, outfile, indent=4) 278 if LOGGING_ENABLED: 279 logger.info(f"[AE ] New user authenticated " + 280 f"with role {Role.PLAYER.value}.") 281 return Role.PLAYER 282 else: 283 try: 284 if LOGGING_ENABLED: 285 logger.info(f"[AE ] Existing user authenticated " + 286 f"with role {this_user}.") 287 return Role(this_user) 288 except ValueError: 289 raise ValueError(f"Logins file at {filename}" + 290 f"contained an unknown role") 291 except FileNotFoundError: 292 raise FileNotFoundError( 293 f"Could not find user roles file at {filename}" 294 ) 295 except json.JSONDecodeError: 296 raise RuntimeError( 297 f"Could not parse user roles file at {filename}" + 298 f"- invalid json." 299 ) 300 301 302 303 304 305# TESTING 306def main(): 307 print(google_login()) 308 309if __name__ == "__main__": 310 main()
64def validate_env_vars() -> None: 65 """ 66 Helper function which verifies all necessary environment 67 variables are set. 68 """ 69 if not ISSUER: 70 raise KeyError("OIDC_ISSUER environment variable is not set.") 71 if not CLIENT_ID: 72 raise KeyError("OIDC_CLIENT_ID environment variable is not set.") 73 if not CLIENT_SECRET: 74 raise KeyError("OIDC_SECRET environment variable is not set.") 75 return
Helper function which verifies all necessary environment variables are set.
85def open_browser(url: str) -> None: 86 """ 87 Helper function to open the web browser. 88 TODO: Compatible with different OSs? 89 """ 90 webbrowser.open(url)
Helper function to open the web browser. TODO: Compatible with different OSs?
93def b64url(data: bytes) -> str: 94 """Returns base 64 encoded url.""" 95 return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
Returns base 64 encoded url.
104def server_run() -> tuple[HTTPServer, threading.Thread]: 105 """ 106 Starts the callback handler server in a separate thread. 107 Returns the server instance and the created thread. 108 """ 109 server = HTTPServer(("127.0.0.1", 0), CallbackHandler) 110 thread = threading.Thread( 111 target=server.handle_request, 112 daemon=True 113 ) 114 thread.start() 115 return server, thread
Starts the callback handler server in a separate thread. Returns the server instance and the created thread.
118class CallbackHandler(BaseHTTPRequestHandler): 119 def do_GET(self): 120 parsed_url = urlparse(self.path) 121 if parsed_url.path != "/callback": 122 self.send_response(404) 123 self.end_headers() 124 return 125 126 q = parse_qs(parsed_url.query) 127 self.server.auth_code = q.get("code", [None])[0] # type: ignore 128 self.server.auth_state = q.get("state", [None])[0] # type: ignore 129 self.server.auth_error = q.get("error", [None])[0] # type: ignore 130 self.server.auth_error_desc = q.get("error_description", [None])[0] # type: ignore 131 132 self.send_response(200) 133 self.send_header("Content-Type", "text/html; charset=utf-8") 134 self.end_headers() 135 self.wfile.write( 136 b"<html><body><h3>Login complete. " + 137 b"You can now close this tab.</h3></body></html>" + 138 b"""<style>body { 139 font-family: Arial, sans-serif; 140 display: flex; 141 justify-content: center; 142 align-items: center; 143 height: 100vh; 144 background: #2c2f33 145 } 146 h3 { 147 color: #9b59ff; 148 font-size: 36px; 149 font-weight: bold; 150 align-items: center; 151 justify-content: center; 152 }</style>""" 153 )
HTTP request handler base class.
The following explanation of HTTP serves to guide you through the code as well as to expose any misunderstandings I may have about HTTP (so you don't need to read the code to figure out I'm wrong :-).
HTTP (HyperText Transfer Protocol) is an extensible protocol on top of a reliable stream transport (e.g. TCP/IP). The protocol recognizes three parts to a request:
- One line identifying the request type and path
- An optional set of RFC-822-style headers
- An optional data part
The headers and data are separated by a blank line.
The first line of the request has the form
where
The specification specifies that lines are separated by CRLF but for compatibility with the widest range of clients recommends servers also handle LF. Similarly, whitespace in the request line is treated sensibly (allowing multiple spaces between components and allowing trailing whitespace).
Similarly, for output, lines ought to be separated by CRLF pairs but most clients grok LF characters just fine.
If the first line of the request has the form
(i.e.
The reply form of the HTTP 1.x protocol again has three parts:
- One line giving the response code
- An optional set of RFC-822-style headers
- The data
Again, the headers and data are separated by a blank line.
The response code line has the form
where
This server parses the request and the headers, and then calls a
function specific to the request type (
do_SPAM()
Note that the request name is case sensitive (i.e. SPAM and spam are different requests).
The various request details are stored in instance variables:
client_address is the client IP address in the form (host, port);
command, path and version are the broken-down request line;
headers is an instance of email.message.Message (or a derived class) containing the header information;
rfile is a file object open for reading positioned at the start of the optional input data part;
wfile is a file object open for writing.
IT IS IMPORTANT TO ADHERE TO THE PROTOCOL FOR WRITING!
The first thing to be written must be the response line. Then follow 0 or more header lines, then a blank line, and then the actual data (if any). The meaning of the header lines depends on the command executed by the server; in most cases, when data is returned, there should be at least one header line of the form
Content-type:
where
119 def do_GET(self): 120 parsed_url = urlparse(self.path) 121 if parsed_url.path != "/callback": 122 self.send_response(404) 123 self.end_headers() 124 return 125 126 q = parse_qs(parsed_url.query) 127 self.server.auth_code = q.get("code", [None])[0] # type: ignore 128 self.server.auth_state = q.get("state", [None])[0] # type: ignore 129 self.server.auth_error = q.get("error", [None])[0] # type: ignore 130 self.server.auth_error_desc = q.get("error_description", [None])[0] # type: ignore 131 132 self.send_response(200) 133 self.send_header("Content-Type", "text/html; charset=utf-8") 134 self.end_headers() 135 self.wfile.write( 136 b"<html><body><h3>Login complete. " + 137 b"You can now close this tab.</h3></body></html>" + 138 b"""<style>body { 139 font-family: Arial, sans-serif; 140 display: flex; 141 justify-content: center; 142 align-items: center; 143 height: 100vh; 144 background: #2c2f33 145 } 146 h3 { 147 color: #9b59ff; 148 font-size: 36px; 149 font-weight: bold; 150 align-items: center; 151 justify-content: center; 152 }</style>""" 153 )
156class Role(str, Enum): 157 PLAYER = "player" 158 DESIGNER = "designer" 159 DEVELOPER = "developer"
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
162def google_login() -> tuple[str, str, Role]: 163 """ 164 Prompts the user to log in via Google. 165 Opens browser to Google accounts log in page. 166 Returns the name and unique identifier of the user. 167 168 :return: User unique identifier, user name. 169 :rtype: tuple[str, str, Role] 170 :raises HTTPError: If an HTTP error occurs. 171 """ 172 validate_env_vars() 173 174 oauth_config = get_oauth_config() 175 auth_endpoint = oauth_config["authorization_endpoint"] 176 token_endpoint = oauth_config["token_endpoint"] 177 userinfo_endpoint = oauth_config["userinfo_endpoint"] 178 179 server, thread = server_run() 180 redirect_uri = f"http://127.0.0.1:{server.server_port}/callback" 181 182 state = secrets.token_urlsafe(24) 183 code_verifier, code_challenge = make_pkce_pair() 184 185 params = { 186 "response_type": "code", 187 "client_id": CLIENT_ID, 188 "redirect_uri": redirect_uri, 189 "scope": " ".join(SCOPES), 190 "state": state, 191 "code_challenge": code_challenge, 192 "code_challenge_method": "S256", 193 } 194 auth_url = f"{auth_endpoint}?{urlencode(params)}" 195 196 open_browser(auth_url) 197 if LOGGING_ENABLED: 198 logger.info("[SIE ] Sign-in prompted.") 199 200 thread.join() # Callback complete 201 server.server_close() 202 203 if not getattr(server, "auth_code", None): 204 if LOGGING_ENABLED: 205 logger.warning( 206 "[SIE ] Sign-in failed: no authorization code received." 207 ) 208 raise RuntimeError("No authorization code received.") 209 210 if server.auth_state != state: # type: ignore 211 if LOGGING_ENABLED: 212 logger.warning("[SIE ] Sign-in failed: state mismatch.") 213 raise RuntimeError("AUTH ERROR - State mismatch.") 214 215 token_resp = requests.post( 216 token_endpoint, 217 data={ 218 "grant_type": "authorization_code", 219 "code": server.auth_code, # type: ignore 220 "redirect_uri": redirect_uri, 221 "client_id": CLIENT_ID, 222 "client_secret": CLIENT_SECRET, 223 "code_verifier": code_verifier 224 }, 225 timeout=15, 226 ) 227 if not token_resp.ok: 228 if LOGGING_ENABLED: 229 logger.error("[SIE ] Sign-in failed: token exchange error" + 230 "(status = {token_resp.status_code})") 231 print("---- AUTH ERROR OCCURRED ----") 232 print("| Token status:", token_resp.status_code) 233 print("| Token body:", token_resp.text) 234 token_resp.raise_for_status() 235 236 tokens = token_resp.json() 237 238 userinfo_resp = requests.get( 239 userinfo_endpoint, 240 headers={"Authorization": f"Bearer {tokens['access_token']}"}, 241 timeout=15, 242 ) 243 userinfo_resp.raise_for_status() 244 userinfo = userinfo_resp.json() 245 246 if LOGGING_ENABLED: 247 logger.info(f"[SIE ] Sign-in successful: " + 248 f"user {userinfo.get('name')} authenticated.") 249 250 sub = userinfo.get("sub") 251 return sub, userinfo.get("name"), get_role( 252 "logins_file.json", 253 userID=sub 254 )
Prompts the user to log in via Google. Opens browser to Google accounts log in page. Returns the name and unique identifier of the user.
Returns
User unique identifier, user name.
Raises
- HTTPError: If an HTTP error occurs.
257def get_role(filename: str, userID: str) -> Role: 258 """ 259 This function returns the role of the given user. It does so by 260 checking the user roles json file at the provided filepath. 261 If a profile does not exist for the given user, this function will 262 create one with the default role of "player". 263 264 :param filename: File path for the user roles json file. 265 :type filename: str 266 :param userID: The user ID of the user. 267 :type userID: int 268 :return: Returns the role of the user. 269 :rtype: Role 270 """ 271 try: 272 with open(filename, 'r') as f: 273 player_roles = json.load(f) 274 this_user = player_roles.get(str(userID)) 275 if this_user is None: 276 player_roles[str(userID)] = Role.PLAYER.value 277 with open(filename, 'w') as outfile: 278 json.dump(player_roles, outfile, indent=4) 279 if LOGGING_ENABLED: 280 logger.info(f"[AE ] New user authenticated " + 281 f"with role {Role.PLAYER.value}.") 282 return Role.PLAYER 283 else: 284 try: 285 if LOGGING_ENABLED: 286 logger.info(f"[AE ] Existing user authenticated " + 287 f"with role {this_user}.") 288 return Role(this_user) 289 except ValueError: 290 raise ValueError(f"Logins file at {filename}" + 291 f"contained an unknown role") 292 except FileNotFoundError: 293 raise FileNotFoundError( 294 f"Could not find user roles file at {filename}" 295 ) 296 except json.JSONDecodeError: 297 raise RuntimeError( 298 f"Could not parse user roles file at {filename}" + 299 f"- invalid json." 300 )
This function returns the role of the given user. It does so by checking the user roles json file at the provided filepath. If a profile does not exist for the given user, this function will create one with the default role of "player".
Parameters
- filename: File path for the user roles json file.
- userID: The user ID of the user.
Returns
Returns the role of the user.