""" Example ------- Resources: HttpApi: Type: AWS::Serverless::HttpApi Properties: Auth: DefaultAuthorizer: LambdaRequestAuthorizer Authorizers: LambdaRequestAuthorizer: FunctionArn: !GetAtt Authorizer.Arn AuthorizerPayloadFormatVersion: "2.0" EnableFunctionDefaultPermissions: true EnableSimpleResponses: true Identity: Headers: [Authorization] Authorizer: Type: AWS::Serverless::Function Properties: Handler: auth.lambda_handler """ from dataclasses import asdict, dataclass import boto3 from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( APIGatewayAuthorizerEventV2, APIGatewayAuthorizerResponseV2, ) from aws_lambda_powertools.utilities.typing import LambdaContext from botocore.endpoint_provider import Enum from layercake.dynamodb import DynamoDBCollection, DynamoDBPersistenceLayer, KeyPair from boto3clients import dynamodb_client from cognito import get_user from settings import USER_TABLE APIKEY_PREFIX = 'sk-' tracer = Tracer() logger = Logger(__name__) idp_client = boto3.client('cognito-idp') user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) collect = DynamoDBCollection(user_layer) @tracer.capture_lambda_handler @logger.inject_lambda_context @event_source(data_class=APIGatewayAuthorizerEventV2) def lambda_handler(event: APIGatewayAuthorizerEventV2, context: LambdaContext): bearer = _parse_bearer_token(event.headers.get('authorization', '')) if not bearer: return APIGatewayAuthorizerResponseV2(authorize=False).asdict() kwargs = asdict(_authorizer(bearer)) return APIGatewayAuthorizerResponseV2(**kwargs).asdict() class TokenType(str, Enum): API_KEY = 'API_KEY' USER_TOKEN = 'USER_TOKEN' @dataclass class BearerToken: auth_type: TokenType token: str @dataclass class Authorizer: authorize: bool = False context: dict | None = None def _authorizer(bearer: BearerToken) -> Authorizer: try: match bearer.auth_type: case TokenType.USER_TOKEN: user = get_user(bearer.token, idp_client=idp_client) return Authorizer(True, {'user': user}) case TokenType.API_KEY: apikey = collect.get_item(KeyPair('apikey', bearer.token)) return Authorizer(True, {'tenant': apikey['tenant']}) except Exception: return Authorizer() def _parse_bearer_token(s: str) -> BearerToken | None: """Parses and identifies a bearer token as either an API key or a user token.""" try: _, bearer_token = s.split(' ') if bearer_token.startswith(APIKEY_PREFIX): return BearerToken( TokenType.API_KEY, bearer_token.removeprefix(APIKEY_PREFIX), ) except ValueError: return None else: return BearerToken(TokenType.USER_TOKEN, bearer_token)