""" Example ------- Resources: HttpApi: Type: AWS::Serverless::HttpApi Properties: Auth: DefaultAuthorizer: LambdaRequestAuthorizer Authorizers: LambdaRequestAuthorizer: FunctionArn: !GetAtt Authorizer.Arn AuthorizerPayloadFormatVersion: "2.0" EnableFunctionDefaultPermissions: true EnableSimpleResponses: true Identity: Headers: [Authorization] Authorizer: Type: AWS::Serverless::Function Properties: Handler: auth.lambda_handler """ from dataclasses import asdict, dataclass from enum import Enum from typing import Any from aws_lambda_powertools import Logger, Tracer from aws_lambda_powertools.utilities.data_classes import event_source from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( APIGatewayAuthorizerEventV2, APIGatewayAuthorizerResponseV2, ) from aws_lambda_powertools.utilities.typing import LambdaContext from layercake.dynamodb import DynamoDBCollection, DynamoDBPersistenceLayer, KeyPair from layercake.funcs import pick from boto3clients import dynamodb_client, idp_client from cognito import get_user from config import USER_TABLE APIKEY_PREFIX = 'sk-' tracer = Tracer() logger = Logger(__name__) user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) user_collect = DynamoDBCollection(user_layer) @tracer.capture_lambda_handler @logger.inject_lambda_context @event_source(data_class=APIGatewayAuthorizerEventV2) def lambda_handler(event: APIGatewayAuthorizerEventV2, context: LambdaContext) -> dict: """Authenticates a user using a bearer token (for user or API). Only handles authentication; any additional logic (e.g., tenant) is performed afterward.""" bearer = _parse_bearer_token(event.headers.get('authorization', '')) if not bearer: return APIGatewayAuthorizerResponseV2(authorize=False).asdict() attrs = _authorizer(bearer, user_collect).asdict() return APIGatewayAuthorizerResponseV2(**attrs).asdict() class AuthFlowType(str, Enum): USER_AUTH = 'USER_AUTH' API_AUTH = 'API_AUTH' @dataclass class BearerToken: auth_flow_type: AuthFlowType token: str @dataclass class AuthorizerResponseV2: authorize: bool = False context: dict[str, Any] | None = None auth_flow_type: AuthFlowType = AuthFlowType.USER_AUTH def asdict(self) -> dict: data = asdict(self) auth_flow_type = data.pop('auth_flow_type') # If authorization is enabled, add `auth_flow_type` to the context if self.authorize: data['context'].update(auth_flow_type=auth_flow_type) return data def _get_apikey(token: str, /, collect: DynamoDBCollection) -> dict[str, dict | str]: return collect.get_item(KeyPair('apikey', token)) def _authorizer( bearer: BearerToken, /, collect: DynamoDBCollection, ) -> AuthorizerResponseV2: """Build an Authorizer object based on the bearer token's auth type. Parameters ---------- bearer : BearerToken The bearer token containing authentication information. Returns ------- Authorizer An Authorizer object with the appropriate authorization status and context. """ try: if bearer.auth_flow_type == AuthFlowType.USER_AUTH: user = get_user(bearer.token, idp_client) return AuthorizerResponseV2(True, {'user': user}) apikey = _get_apikey(bearer.token, collect) context = pick(('tenant', 'user'), apikey) return AuthorizerResponseV2(True, context, AuthFlowType.API_AUTH) except Exception: return AuthorizerResponseV2() def _parse_bearer_token(s: str) -> BearerToken | None: """Parses and identifies a bearer token as either an API key or a user token.""" try: _, token = s.split(' ') if token.startswith(APIKEY_PREFIX): return BearerToken( AuthFlowType.API_AUTH, token.removeprefix(APIKEY_PREFIX), ) except ValueError: return None else: return BearerToken(AuthFlowType.USER_AUTH, token)