from http import HTTPStatus from http.cookies import SimpleCookie from urllib.parse import ParseResult, quote, urlencode, urlunparse from authlib.oauth2 import OAuth2Error from authlib.oauth2.rfc6749 import errors from aws_lambda_powertools import Logger from aws_lambda_powertools.event_handler import Response from aws_lambda_powertools.event_handler.api_gateway import Router from jose.exceptions import JWTError from jose_ import verify_jwt from oauth2 import server router = Router() logger = Logger(__name__) @router.get('/authorize') def authorize(): current_event = router.current_event cookies = _parse_cookies(current_event.get('cookies', [])) id_token = cookies.get('id_token') continue_url = _build_continue_url( current_event.path, current_event.query_string_parameters, ) login_url = f'/login?continue={continue_url}' try: if not id_token: raise ValueError('Missing id_token') user = verify_jwt(id_token) except (ValueError, JWTError): return Response( status_code=HTTPStatus.FOUND, headers={'Location': login_url}, ) try: grant = server.get_consent_grant( request=router.current_event, end_user={'id': user['sub']}, ) except OAuth2Error as err: logger.exception(err) return dict(err.get_body()) try: return server.create_authorization_response( request=router.current_event, grant_user={'id': user['sub']}, grant=grant, ) except errors.OAuth2Error as err: logger.exception(err) return {} def _parse_cookies(cookies: list[str] | None) -> dict[str, str]: parsed_cookies = {} if not cookies: return parsed_cookies for s in cookies: c = SimpleCookie() c.load(s) parsed_cookies.update({k: morsel.value for k, morsel in c.items()}) return parsed_cookies def _build_continue_url( path: str, query_string_parameters: dict, ) -> str: query = urlencode(query_string_parameters) return quote(urlunparse(ParseResult('', '', path, '', query, '')), safe='')