132 lines
3.8 KiB
Python
132 lines
3.8 KiB
Python
"""
|
|
Example
|
|
-------
|
|
|
|
Resources:
|
|
HttpApi:
|
|
Type: AWS::Serverless::HttpApi
|
|
Properties:
|
|
Auth:
|
|
DefaultAuthorizer: LambdaRequestAuthorizer
|
|
Authorizers:
|
|
LambdaRequestAuthorizer:
|
|
FunctionArn: !GetAtt Authorizer.Arn
|
|
AuthorizerPayloadFormatVersion: "2.0"
|
|
EnableFunctionDefaultPermissions: true
|
|
EnableSimpleResponses: true
|
|
Identity:
|
|
Headers: [Authorization]
|
|
Authorizer:
|
|
Type: AWS::Serverless::Function
|
|
Properties:
|
|
Handler: auth.lambda_handler
|
|
|
|
"""
|
|
|
|
from dataclasses import asdict, dataclass
|
|
|
|
import boto3
|
|
from aws_lambda_powertools import Logger, Tracer
|
|
from aws_lambda_powertools.utilities.data_classes import event_source
|
|
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
|
|
APIGatewayAuthorizerEventV2,
|
|
APIGatewayAuthorizerResponseV2,
|
|
)
|
|
from aws_lambda_powertools.utilities.typing import LambdaContext
|
|
from botocore.endpoint_provider import Enum
|
|
from layercake.dynamodb import DynamoDBCollection, DynamoDBPersistenceLayer, KeyPair
|
|
from layercake.funcs import pick
|
|
|
|
from boto3clients import dynamodb_client
|
|
from cognito import get_user
|
|
from settings import USER_TABLE
|
|
|
|
APIKEY_PREFIX = 'sk-'
|
|
|
|
tracer = Tracer()
|
|
logger = Logger(__name__)
|
|
idp_client = boto3.client('cognito-idp')
|
|
user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
|
|
collect = DynamoDBCollection(user_layer)
|
|
|
|
|
|
@tracer.capture_lambda_handler
|
|
@logger.inject_lambda_context
|
|
@event_source(data_class=APIGatewayAuthorizerEventV2)
|
|
def lambda_handler(event: APIGatewayAuthorizerEventV2, context: LambdaContext):
|
|
bearer = _parse_bearer_token(event.headers.get('authorization', ''))
|
|
|
|
if not bearer:
|
|
return APIGatewayAuthorizerResponseV2(authorize=False).asdict()
|
|
|
|
kwargs = asdict(_authorizer(bearer))
|
|
return APIGatewayAuthorizerResponseV2(**kwargs).asdict()
|
|
|
|
|
|
class TokenType(str, Enum):
|
|
API_KEY = 'API_KEY'
|
|
USER_TOKEN = 'USER_TOKEN'
|
|
|
|
|
|
@dataclass
|
|
class BearerToken:
|
|
auth_type: TokenType
|
|
token: str
|
|
|
|
|
|
@dataclass
|
|
class Authorizer:
|
|
authorize: bool = False
|
|
context: dict | None = None
|
|
|
|
|
|
def _authorizer(bearer: BearerToken) -> Authorizer:
|
|
"""
|
|
Build an Authorizer object based on the bearer token's auth type.
|
|
|
|
Parameters
|
|
----------
|
|
bearer : BearerToken
|
|
The bearer token containing authentication information.
|
|
|
|
Returns
|
|
-------
|
|
Authorizer
|
|
An Authorizer object with the appropriate authorization status and context.
|
|
"""
|
|
try:
|
|
match bearer.auth_type:
|
|
case TokenType.USER_TOKEN:
|
|
user = get_user(bearer.token, idp_client=idp_client)
|
|
return Authorizer(True, {'user': user})
|
|
case TokenType.API_KEY:
|
|
apikey = collect.get_item(KeyPair('apikey', bearer.token))
|
|
return Authorizer(
|
|
True,
|
|
pick(
|
|
(
|
|
'user',
|
|
'tenant',
|
|
),
|
|
apikey,
|
|
),
|
|
)
|
|
except Exception:
|
|
return Authorizer()
|
|
|
|
|
|
def _parse_bearer_token(s: str) -> BearerToken | None:
|
|
"""Parses and identifies a bearer token as either an API key or a user token."""
|
|
try:
|
|
_, bearer_token = s.split(' ')
|
|
|
|
if bearer_token.startswith(APIKEY_PREFIX):
|
|
return BearerToken(
|
|
TokenType.API_KEY,
|
|
bearer_token.removeprefix(APIKEY_PREFIX),
|
|
)
|
|
except ValueError:
|
|
return None
|
|
else:
|
|
return BearerToken(TokenType.USER_TOKEN, bearer_token)
|