telemetry.auth.auth

Docstring for telemetry.auth.auth

This module is responsible for Google OAuth OIDC authentication. This allows users of this module to get the name, unique stable identifier, and role of the authenticating user.

Please note that use of this module requires the relevant environment variables to be set, i.e. OIDC_ISSUER, OIDC_CLIENT_ID, and OIDC_CLIENT_SECRET.

  1"""
  2Docstring for telemetry.auth.auth
  3
  4This module is responsible for Google OAuth OIDC authentication.
  5This allows users of this module to get the name, unique stable 
  6identifier, and role of the authenticating user.
  7
  8Please note that use of this module requires the relevant environment
  9variables to be set, i.e. OIDC_ISSUER, OIDC_CLIENT_ID, and 
 10OIDC_CLIENT_SECRET.
 11"""
 12
 13import os
 14import requests
 15import threading
 16
 17import base64
 18import hashlib
 19import secrets
 20
 21import json
 22from enum import Enum
 23
 24import logging
 25
 26import webbrowser
 27from http.server import HTTPServer, BaseHTTPRequestHandler
 28from urllib.parse import urlencode, urlparse, parse_qs
 29
 30
 31logger = logging.getLogger(__name__)
 32logger.setLevel(logging.INFO)
 33
 34logging_handler = logging.FileHandler("auth/logs.txt", encoding="utf-8")
 35logging_handler.setLevel(logging.INFO)
 36
 37logging_handler.setFormatter(logging.Formatter(
 38    fmt="%(asctime)s\n    %(message)s",
 39    datefmt="%Y-%m-%d %H:%M:%S"
 40))
 41
 42logger.addHandler(logging_handler)
 43
 44
 45ISSUER = "https://accounts.google.com"
 46CLIENT_ID = os.environ.get("OIDC_CLIENT_ID")
 47CLIENT_SECRET = os.environ.get("OIDC_CLIENT_SECRET")
 48
 49# From Google's API documentation: "The client ID and client secret
 50# obtained from the API Console are embedded in the source code of
 51# your application. In this context, the client secret is 
 52# obviously not treated as a secret." 
 53# The above quote is from "https://googleapis.dev/ruby/google-api-client/v0.36.3/file.oauth-installed.html".
 54# This is confirmed by "a client secret, which you embed in the source
 55#  code of your application. (In this context, the client secret is 
 56# obviously not treated as a secret.)". This quote is from "https://developers.google.com/identity/protocols/oauth2".
 57
 58SCOPES = ["profile", "email"]
 59
 60LOGGING_ENABLED: bool = True
 61
 62
 63def validate_env_vars() -> None:
 64    """
 65    Helper function which verifies all necessary environment
 66    variables are set.
 67    """
 68    if not ISSUER:
 69        raise KeyError("OIDC_ISSUER environment variable is not set.")
 70    if not CLIENT_ID:
 71        raise KeyError("OIDC_CLIENT_ID environment variable is not set.")
 72    if not CLIENT_SECRET:
 73        raise KeyError("OIDC_SECRET environment variable is not set.")
 74    return
 75
 76
 77def get_oauth_config():
 78    url = str(ISSUER) + "/.well-known/openid-configuration"
 79    req = requests.get(url=url, timeout=10)
 80    req.raise_for_status()
 81    return req.json()
 82    
 83
 84def open_browser(url: str) -> None:
 85    """
 86    Helper function to open the web browser.
 87    TODO: Compatible with different OSs?
 88    """
 89    webbrowser.open(url)
 90
 91
 92def b64url(data: bytes) -> str:
 93    """Returns base 64 encoded url."""
 94    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")
 95
 96
 97def make_pkce_pair() -> tuple[str, str]:
 98    code_verifier = b64url(secrets.token_bytes(32))
 99    code_challenge = b64url(hashlib.sha256(code_verifier.encode("ascii")).digest())
100    return code_verifier, code_challenge
101
102
103def server_run() -> tuple[HTTPServer, threading.Thread]:
104    """
105    Starts the callback handler server in a separate thread.
106    Returns the server instance and the created thread.
107    """
108    server = HTTPServer(("127.0.0.1", 0), CallbackHandler)
109    thread = threading.Thread(
110        target=server.handle_request,
111        daemon=True
112    )
113    thread.start()
114    return server, thread
115
116
117class CallbackHandler(BaseHTTPRequestHandler):
118    def do_GET(self):
119        parsed_url = urlparse(self.path)
120        if parsed_url.path != "/callback":
121            self.send_response(404)
122            self.end_headers()
123            return
124
125        q = parse_qs(parsed_url.query)
126        self.server.auth_code = q.get("code", [None])[0] # type: ignore
127        self.server.auth_state = q.get("state", [None])[0] # type: ignore
128        self.server.auth_error = q.get("error", [None])[0] # type: ignore
129        self.server.auth_error_desc = q.get("error_description", [None])[0] # type: ignore
130
131        self.send_response(200)
132        self.send_header("Content-Type", "text/html; charset=utf-8")
133        self.end_headers()
134        self.wfile.write(
135            b"<html><body><h3>Login complete. " +
136            b"You can now close this tab.</h3></body></html>" +
137            b"""<style>body {
138                font-family: Arial, sans-serif; 
139                display: flex; 
140                justify-content: center; 
141                align-items: center; 
142                height: 100vh; 
143                background: #2c2f33 
144            } 
145            h3 {
146                color: #9b59ff; 
147                font-size: 36px; 
148                font-weight: bold; 
149                align-items: center; 
150                justify-content: center;
151            }</style>"""
152        )
153        
154
155class Role(str, Enum):
156    PLAYER = "player"
157    DESIGNER = "designer"
158    DEVELOPER = "developer"
159
160
161def google_login() -> tuple[str, str, Role]:
162    """
163    Prompts the user to log in via Google.
164    Opens browser to Google accounts log in page.
165    Returns the name and unique identifier of the user.
166
167    :return: User unique identifier, user name.
168    :rtype: tuple[str, str, Role]
169    :raises HTTPError: If an HTTP error occurs. 
170    """
171    validate_env_vars()
172
173    oauth_config = get_oauth_config()
174    auth_endpoint = oauth_config["authorization_endpoint"]
175    token_endpoint = oauth_config["token_endpoint"]
176    userinfo_endpoint = oauth_config["userinfo_endpoint"]
177
178    server, thread = server_run()
179    redirect_uri = f"http://127.0.0.1:{server.server_port}/callback"
180
181    state = secrets.token_urlsafe(24)
182    code_verifier, code_challenge = make_pkce_pair()
183
184    params = {
185        "response_type": "code",
186        "client_id": CLIENT_ID,
187        "redirect_uri": redirect_uri,
188        "scope": " ".join(SCOPES),
189        "state": state,
190        "code_challenge": code_challenge,
191        "code_challenge_method": "S256",
192    }
193    auth_url = f"{auth_endpoint}?{urlencode(params)}"
194
195    open_browser(auth_url)
196    if LOGGING_ENABLED:
197        logger.info("[SIE ] Sign-in prompted.")
198
199    thread.join() # Callback complete
200    server.server_close()
201
202    if not getattr(server, "auth_code", None):
203        if LOGGING_ENABLED:
204            logger.warning(
205                "[SIE ] Sign-in failed: no authorization code received."
206            )
207        raise RuntimeError("No authorization code received.")
208
209    if server.auth_state != state: # type: ignore
210        if LOGGING_ENABLED:
211            logger.warning("[SIE ] Sign-in failed: state mismatch.")
212        raise RuntimeError("AUTH ERROR - State mismatch.")
213
214    token_resp = requests.post(
215        token_endpoint,
216        data={
217            "grant_type": "authorization_code",
218            "code": server.auth_code, # type: ignore
219            "redirect_uri": redirect_uri,
220            "client_id": CLIENT_ID,
221            "client_secret": CLIENT_SECRET,
222            "code_verifier": code_verifier
223        },
224        timeout=15,
225    )
226    if not token_resp.ok:
227        if LOGGING_ENABLED:
228            logger.error("[SIE ] Sign-in failed: token exchange error" +
229                         "(status = {token_resp.status_code})")
230        print("---- AUTH ERROR OCCURRED ----")
231        print("|  Token status:", token_resp.status_code)
232        print("|  Token body:", token_resp.text)
233        token_resp.raise_for_status()
234
235    tokens = token_resp.json()
236
237    userinfo_resp = requests.get(
238        userinfo_endpoint,
239        headers={"Authorization": f"Bearer {tokens['access_token']}"},
240        timeout=15,
241    )
242    userinfo_resp.raise_for_status()
243    userinfo = userinfo_resp.json()
244
245    if LOGGING_ENABLED:
246        logger.info(f"[SIE ] Sign-in successful: " +
247                    f"user {userinfo.get('name')} authenticated.")
248    
249    sub = userinfo.get("sub")
250    return sub, userinfo.get("name"), get_role(
251        "logins_file.json", 
252        userID=sub
253    )
254
255
256def get_role(filename: str, userID: str) -> Role:
257    """
258    This function returns the role of the given user. It does so by
259    checking the user roles json file at the provided filepath.
260    If a profile does not exist for the given user, this function will
261    create one with the default role of "player".
262
263    :param filename: File path for the user roles json file.
264    :type filename: str
265    :param userID: The user ID of the user.
266    :type userID: int
267    :return: Returns the role of the user.
268    :rtype: Role
269    """
270    try:
271        with open(filename, 'r') as f:
272            player_roles = json.load(f)
273            this_user = player_roles.get(str(userID))
274            if this_user is None:
275                player_roles[str(userID)] = Role.PLAYER.value
276                with open(filename, 'w') as outfile:
277                    json.dump(player_roles, outfile, indent=4)
278                if LOGGING_ENABLED:
279                    logger.info(f"[AE  ] New user authenticated " + 
280                                f"with role {Role.PLAYER.value}.")
281                return Role.PLAYER
282            else:
283                try:
284                    if LOGGING_ENABLED:
285                        logger.info(f"[AE  ] Existing user authenticated " + 
286                                    f"with role {this_user}.")
287                    return Role(this_user) 
288                except ValueError:
289                    raise ValueError(f"Logins file at {filename}" +
290                                     f"contained an unknown role")
291    except FileNotFoundError:
292        raise FileNotFoundError(
293            f"Could not find user roles file at {filename}"
294        )
295    except json.JSONDecodeError:
296        raise RuntimeError(
297            f"Could not parse user roles file at {filename}" + 
298            f"- invalid json."
299        )
300
301
302
303
304
305# TESTING
306def main():
307    print(google_login())
308
309if __name__ == "__main__":
310    main()
logger = <Logger telemetry.auth.auth (INFO)>
logging_handler = <FileHandler /home/atlas/com2020/telemetry/auth/logs.txt (INFO)>
ISSUER = $OIDC_ISSUER
CLIENT_ID = $OIDC_CLIENT_ID
CLIENT_SECRET = None
SCOPES = ['profile', 'email']
LOGGING_ENABLED: bool = True
def validate_env_vars() -> None:
64def validate_env_vars() -> None:
65    """
66    Helper function which verifies all necessary environment
67    variables are set.
68    """
69    if not ISSUER:
70        raise KeyError("OIDC_ISSUER environment variable is not set.")
71    if not CLIENT_ID:
72        raise KeyError("OIDC_CLIENT_ID environment variable is not set.")
73    if not CLIENT_SECRET:
74        raise KeyError("OIDC_SECRET environment variable is not set.")
75    return

Helper function which verifies all necessary environment variables are set.

def get_oauth_config():
78def get_oauth_config():
79    url = str(ISSUER) + "/.well-known/openid-configuration"
80    req = requests.get(url=url, timeout=10)
81    req.raise_for_status()
82    return req.json()
def open_browser(url: str) -> None:
85def open_browser(url: str) -> None:
86    """
87    Helper function to open the web browser.
88    TODO: Compatible with different OSs?
89    """
90    webbrowser.open(url)

Helper function to open the web browser. TODO: Compatible with different OSs?

def b64url(data: bytes) -> str:
93def b64url(data: bytes) -> str:
94    """Returns base 64 encoded url."""
95    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")

Returns base 64 encoded url.

def make_pkce_pair() -> tuple[str, str]:
 98def make_pkce_pair() -> tuple[str, str]:
 99    code_verifier = b64url(secrets.token_bytes(32))
100    code_challenge = b64url(hashlib.sha256(code_verifier.encode("ascii")).digest())
101    return code_verifier, code_challenge
def server_run() -> tuple[http.server.HTTPServer, threading.Thread]:
104def server_run() -> tuple[HTTPServer, threading.Thread]:
105    """
106    Starts the callback handler server in a separate thread.
107    Returns the server instance and the created thread.
108    """
109    server = HTTPServer(("127.0.0.1", 0), CallbackHandler)
110    thread = threading.Thread(
111        target=server.handle_request,
112        daemon=True
113    )
114    thread.start()
115    return server, thread

Starts the callback handler server in a separate thread. Returns the server instance and the created thread.

class CallbackHandler(http.server.BaseHTTPRequestHandler):
118class CallbackHandler(BaseHTTPRequestHandler):
119    def do_GET(self):
120        parsed_url = urlparse(self.path)
121        if parsed_url.path != "/callback":
122            self.send_response(404)
123            self.end_headers()
124            return
125
126        q = parse_qs(parsed_url.query)
127        self.server.auth_code = q.get("code", [None])[0] # type: ignore
128        self.server.auth_state = q.get("state", [None])[0] # type: ignore
129        self.server.auth_error = q.get("error", [None])[0] # type: ignore
130        self.server.auth_error_desc = q.get("error_description", [None])[0] # type: ignore
131
132        self.send_response(200)
133        self.send_header("Content-Type", "text/html; charset=utf-8")
134        self.end_headers()
135        self.wfile.write(
136            b"<html><body><h3>Login complete. " +
137            b"You can now close this tab.</h3></body></html>" +
138            b"""<style>body {
139                font-family: Arial, sans-serif; 
140                display: flex; 
141                justify-content: center; 
142                align-items: center; 
143                height: 100vh; 
144                background: #2c2f33 
145            } 
146            h3 {
147                color: #9b59ff; 
148                font-size: 36px; 
149                font-weight: bold; 
150                align-items: center; 
151                justify-content: center;
152            }</style>"""
153        )

HTTP request handler base class.

The following explanation of HTTP serves to guide you through the code as well as to expose any misunderstandings I may have about HTTP (so you don't need to read the code to figure out I'm wrong :-).

HTTP (HyperText Transfer Protocol) is an extensible protocol on top of a reliable stream transport (e.g. TCP/IP). The protocol recognizes three parts to a request:

  1. One line identifying the request type and path
  2. An optional set of RFC-822-style headers
  3. An optional data part

The headers and data are separated by a blank line.

The first line of the request has the form

where is a (case-sensitive) keyword such as GET or POST, is a string containing path information for the request, and should be the string "HTTP/1.0" or "HTTP/1.1". is encoded using the URL encoding scheme (using %xx to signify the ASCII character with hex code xx).

The specification specifies that lines are separated by CRLF but for compatibility with the widest range of clients recommends servers also handle LF. Similarly, whitespace in the request line is treated sensibly (allowing multiple spaces between components and allowing trailing whitespace).

Similarly, for output, lines ought to be separated by CRLF pairs but most clients grok LF characters just fine.

If the first line of the request has the form

(i.e. is left out) then this is assumed to be an HTTP 0.9 request; this form has no optional headers and data part and the reply consists of just the data.

The reply form of the HTTP 1.x protocol again has three parts:

  1. One line giving the response code
  2. An optional set of RFC-822-style headers
  3. The data

Again, the headers and data are separated by a blank line.

The response code line has the form

where is the protocol version ("HTTP/1.0" or "HTTP/1.1"), is a 3-digit response code indicating success or failure of the request, and is an optional human-readable string explaining what the response code means.

This server parses the request and the headers, and then calls a function specific to the request type (). Specifically, a request SPAM will be handled by a method do_SPAM(). If no such method exists the server sends an error response to the client. If it exists, it is called with no arguments:

do_SPAM()

Note that the request name is case sensitive (i.e. SPAM and spam are different requests).

The various request details are stored in instance variables:

  • client_address is the client IP address in the form (host, port);

  • command, path and version are the broken-down request line;

  • headers is an instance of email.message.Message (or a derived class) containing the header information;

  • rfile is a file object open for reading positioned at the start of the optional input data part;

  • wfile is a file object open for writing.

IT IS IMPORTANT TO ADHERE TO THE PROTOCOL FOR WRITING!

The first thing to be written must be the response line. Then follow 0 or more header lines, then a blank line, and then the actual data (if any). The meaning of the header lines depends on the command executed by the server; in most cases, when data is returned, there should be at least one header line of the form

Content-type: /

where and should be registered MIME types, e.g. "text/html" or "text/plain".

def do_GET(self):
119    def do_GET(self):
120        parsed_url = urlparse(self.path)
121        if parsed_url.path != "/callback":
122            self.send_response(404)
123            self.end_headers()
124            return
125
126        q = parse_qs(parsed_url.query)
127        self.server.auth_code = q.get("code", [None])[0] # type: ignore
128        self.server.auth_state = q.get("state", [None])[0] # type: ignore
129        self.server.auth_error = q.get("error", [None])[0] # type: ignore
130        self.server.auth_error_desc = q.get("error_description", [None])[0] # type: ignore
131
132        self.send_response(200)
133        self.send_header("Content-Type", "text/html; charset=utf-8")
134        self.end_headers()
135        self.wfile.write(
136            b"<html><body><h3>Login complete. " +
137            b"You can now close this tab.</h3></body></html>" +
138            b"""<style>body {
139                font-family: Arial, sans-serif; 
140                display: flex; 
141                justify-content: center; 
142                align-items: center; 
143                height: 100vh; 
144                background: #2c2f33 
145            } 
146            h3 {
147                color: #9b59ff; 
148                font-size: 36px; 
149                font-weight: bold; 
150                align-items: center; 
151                justify-content: center;
152            }</style>"""
153        )
class Role(builtins.str, enum.Enum):
156class Role(str, Enum):
157    PLAYER = "player"
158    DESIGNER = "designer"
159    DEVELOPER = "developer"

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

PLAYER = <Role.PLAYER: 'player'>
DESIGNER = <Role.DESIGNER: 'designer'>
DEVELOPER = <Role.DEVELOPER: 'developer'>
def google_login() -> tuple[str, str, Role]:
162def google_login() -> tuple[str, str, Role]:
163    """
164    Prompts the user to log in via Google.
165    Opens browser to Google accounts log in page.
166    Returns the name and unique identifier of the user.
167
168    :return: User unique identifier, user name.
169    :rtype: tuple[str, str, Role]
170    :raises HTTPError: If an HTTP error occurs. 
171    """
172    validate_env_vars()
173
174    oauth_config = get_oauth_config()
175    auth_endpoint = oauth_config["authorization_endpoint"]
176    token_endpoint = oauth_config["token_endpoint"]
177    userinfo_endpoint = oauth_config["userinfo_endpoint"]
178
179    server, thread = server_run()
180    redirect_uri = f"http://127.0.0.1:{server.server_port}/callback"
181
182    state = secrets.token_urlsafe(24)
183    code_verifier, code_challenge = make_pkce_pair()
184
185    params = {
186        "response_type": "code",
187        "client_id": CLIENT_ID,
188        "redirect_uri": redirect_uri,
189        "scope": " ".join(SCOPES),
190        "state": state,
191        "code_challenge": code_challenge,
192        "code_challenge_method": "S256",
193    }
194    auth_url = f"{auth_endpoint}?{urlencode(params)}"
195
196    open_browser(auth_url)
197    if LOGGING_ENABLED:
198        logger.info("[SIE ] Sign-in prompted.")
199
200    thread.join() # Callback complete
201    server.server_close()
202
203    if not getattr(server, "auth_code", None):
204        if LOGGING_ENABLED:
205            logger.warning(
206                "[SIE ] Sign-in failed: no authorization code received."
207            )
208        raise RuntimeError("No authorization code received.")
209
210    if server.auth_state != state: # type: ignore
211        if LOGGING_ENABLED:
212            logger.warning("[SIE ] Sign-in failed: state mismatch.")
213        raise RuntimeError("AUTH ERROR - State mismatch.")
214
215    token_resp = requests.post(
216        token_endpoint,
217        data={
218            "grant_type": "authorization_code",
219            "code": server.auth_code, # type: ignore
220            "redirect_uri": redirect_uri,
221            "client_id": CLIENT_ID,
222            "client_secret": CLIENT_SECRET,
223            "code_verifier": code_verifier
224        },
225        timeout=15,
226    )
227    if not token_resp.ok:
228        if LOGGING_ENABLED:
229            logger.error("[SIE ] Sign-in failed: token exchange error" +
230                         "(status = {token_resp.status_code})")
231        print("---- AUTH ERROR OCCURRED ----")
232        print("|  Token status:", token_resp.status_code)
233        print("|  Token body:", token_resp.text)
234        token_resp.raise_for_status()
235
236    tokens = token_resp.json()
237
238    userinfo_resp = requests.get(
239        userinfo_endpoint,
240        headers={"Authorization": f"Bearer {tokens['access_token']}"},
241        timeout=15,
242    )
243    userinfo_resp.raise_for_status()
244    userinfo = userinfo_resp.json()
245
246    if LOGGING_ENABLED:
247        logger.info(f"[SIE ] Sign-in successful: " +
248                    f"user {userinfo.get('name')} authenticated.")
249    
250    sub = userinfo.get("sub")
251    return sub, userinfo.get("name"), get_role(
252        "logins_file.json", 
253        userID=sub
254    )

Prompts the user to log in via Google. Opens browser to Google accounts log in page. Returns the name and unique identifier of the user.

Returns

User unique identifier, user name.

Raises
  • HTTPError: If an HTTP error occurs.
def get_role(filename: str, userID: str) -> Role:
257def get_role(filename: str, userID: str) -> Role:
258    """
259    This function returns the role of the given user. It does so by
260    checking the user roles json file at the provided filepath.
261    If a profile does not exist for the given user, this function will
262    create one with the default role of "player".
263
264    :param filename: File path for the user roles json file.
265    :type filename: str
266    :param userID: The user ID of the user.
267    :type userID: int
268    :return: Returns the role of the user.
269    :rtype: Role
270    """
271    try:
272        with open(filename, 'r') as f:
273            player_roles = json.load(f)
274            this_user = player_roles.get(str(userID))
275            if this_user is None:
276                player_roles[str(userID)] = Role.PLAYER.value
277                with open(filename, 'w') as outfile:
278                    json.dump(player_roles, outfile, indent=4)
279                if LOGGING_ENABLED:
280                    logger.info(f"[AE  ] New user authenticated " + 
281                                f"with role {Role.PLAYER.value}.")
282                return Role.PLAYER
283            else:
284                try:
285                    if LOGGING_ENABLED:
286                        logger.info(f"[AE  ] Existing user authenticated " + 
287                                    f"with role {this_user}.")
288                    return Role(this_user) 
289                except ValueError:
290                    raise ValueError(f"Logins file at {filename}" +
291                                     f"contained an unknown role")
292    except FileNotFoundError:
293        raise FileNotFoundError(
294            f"Could not find user roles file at {filename}"
295        )
296    except json.JSONDecodeError:
297        raise RuntimeError(
298            f"Could not parse user roles file at {filename}" + 
299            f"- invalid json."
300        )

This function returns the role of the given user. It does so by checking the user roles json file at the provided filepath. If a profile does not exist for the given user, this function will create one with the default role of "player".

Parameters
  • filename: File path for the user roles json file.
  • userID: The user ID of the user.
Returns

Returns the role of the user.

def main():
307def main():
308    print(google_login())