72 lines
2.0 KiB
Python
72 lines
2.0 KiB
Python
from dataclasses import dataclass
|
|
|
|
import boto3
|
|
from aws_lambda_powertools import Logger, Tracer
|
|
from aws_lambda_powertools.utilities.data_classes import event_source
|
|
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
|
|
APIGatewayAuthorizerEventV2,
|
|
APIGatewayAuthorizerResponseV2,
|
|
)
|
|
from aws_lambda_powertools.utilities.typing import LambdaContext
|
|
from botocore.endpoint_provider import Enum
|
|
|
|
from cognito import get_user
|
|
|
|
APIKEY_PREFIX = 'edxg'
|
|
|
|
tracer = Tracer()
|
|
logger = Logger(__name__)
|
|
idp_client = boto3.client('cognito-idp')
|
|
|
|
|
|
@tracer.capture_lambda_handler
|
|
@logger.inject_lambda_context
|
|
@event_source(data_class=APIGatewayAuthorizerEventV2)
|
|
def lambda_handler(event: APIGatewayAuthorizerEventV2, context: LambdaContext):
|
|
bearer = _parse_bearer_token(event.headers.get('authorization', ''))
|
|
|
|
if not bearer:
|
|
return APIGatewayAuthorizerResponseV2(authorize=False).asdict()
|
|
|
|
if bearer.auth_type == TokenType.USER_TOKEN:
|
|
user = get_user(bearer.token, idp_client=idp_client)
|
|
|
|
if user:
|
|
return APIGatewayAuthorizerResponseV2(
|
|
authorize=True,
|
|
context=dict(user=user),
|
|
).asdict()
|
|
|
|
return APIGatewayAuthorizerResponseV2(authorize=False).asdict()
|
|
|
|
|
|
class TokenType(str, Enum):
|
|
API_KEY = 'API_KEY'
|
|
USER_TOKEN = 'USER_TOKEN'
|
|
|
|
|
|
@dataclass
|
|
class BearerToken:
|
|
auth_type: TokenType
|
|
token: str
|
|
|
|
|
|
def _parse_bearer_token(
|
|
s: str,
|
|
*,
|
|
apikey_prefix: str = APIKEY_PREFIX,
|
|
) -> BearerToken | None:
|
|
"""Parses and identifies a bearer token as either an API key or a user token."""
|
|
try:
|
|
_, bearer_token = s.split(' ')
|
|
|
|
if bearer_token.startswith(f'{apikey_prefix}-'):
|
|
return BearerToken(
|
|
TokenType.API_KEY,
|
|
bearer_token.removeprefix(f'{apikey_prefix}-'),
|
|
)
|
|
except ValueError:
|
|
return None
|
|
else:
|
|
return BearerToken(TokenType.USER_TOKEN, bearer_token)
|