Files
saladeaula.digital/http-api/app/auth.py

145 lines
4.3 KiB
Python

"""
Example
-------
Resources:
HttpApi:
Type: AWS::Serverless::HttpApi
Properties:
Auth:
DefaultAuthorizer: LambdaRequestAuthorizer
Authorizers:
LambdaRequestAuthorizer:
FunctionArn: !GetAtt Authorizer.Arn
AuthorizerPayloadFormatVersion: "2.0"
EnableFunctionDefaultPermissions: true
EnableSimpleResponses: true
Identity:
Headers: [Authorization]
Authorizer:
Type: AWS::Serverless::Function
Properties:
Handler: auth.lambda_handler
"""
from dataclasses import asdict, dataclass
from typing import Any
import boto3
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
APIGatewayAuthorizerEventV2,
APIGatewayAuthorizerResponseV2,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from botocore.endpoint_provider import Enum
from layercake.dynamodb import DynamoDBCollection, DynamoDBPersistenceLayer, KeyPair
from layercake.funcs import pick
from boto3clients import dynamodb_client
from cognito import get_user
from conf import USER_TABLE
APIKEY_PREFIX = 'sk-'
tracer = Tracer()
logger = Logger(__name__)
idp_client = boto3.client('cognito-idp')
user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
user_collect = DynamoDBCollection(user_layer)
@tracer.capture_lambda_handler
@logger.inject_lambda_context
@event_source(data_class=APIGatewayAuthorizerEventV2)
def lambda_handler(event: APIGatewayAuthorizerEventV2, context: LambdaContext) -> dict:
"""Authenticates a user using a bearer token (for user or API).
Only handles authentication; any additional logic (e.g., tenant)
is performed afterward."""
bearer = _parse_bearer_token(event.headers.get('authorization', ''))
if not bearer:
return APIGatewayAuthorizerResponseV2(authorize=False).asdict()
attrs = _authorizer(bearer, user_collect).asdict()
return APIGatewayAuthorizerResponseV2(**attrs).asdict()
class AuthFlowType(str, Enum):
USER_AUTH = 'USER_AUTH'
API_AUTH = 'API_AUTH'
@dataclass
class BearerToken:
auth_flow_type: AuthFlowType
token: str
@dataclass
class AuthorizerResponseV2:
authorize: bool = False
context: dict[str, Any] | None = None
auth_flow_type: AuthFlowType = AuthFlowType.USER_AUTH
def asdict(self) -> dict:
data = asdict(self)
auth_flow_type = data.pop('auth_flow_type')
# If authorization is enabled, add `auth_flow_type` to the context
if self.authorize:
data['context'].update(auth_flow_type=auth_flow_type)
return data
def _get_apikey(token: str, /, collect: DynamoDBCollection) -> dict[str, dict | str]:
return collect.get_item(KeyPair('apikey', token))
def _authorizer(
bearer: BearerToken,
/,
collect: DynamoDBCollection,
) -> AuthorizerResponseV2:
"""Build an Authorizer object based on the bearer token's auth type.
Parameters
----------
bearer : BearerToken
The bearer token containing authentication information.
Returns
-------
Authorizer
An Authorizer object with the appropriate authorization status and context.
"""
try:
if bearer.auth_flow_type == AuthFlowType.USER_AUTH:
user = get_user(bearer.token, idp_client)
return AuthorizerResponseV2(True, {'user': user})
apikey = _get_apikey(bearer.token, collect)
context = pick(('tenant', 'user'), apikey)
return AuthorizerResponseV2(True, context, AuthFlowType.API_AUTH)
except Exception:
return AuthorizerResponseV2()
def _parse_bearer_token(s: str) -> BearerToken | None:
"""Parses and identifies a bearer token as either an API key or a user token."""
try:
_, token = s.split(' ')
if token.startswith(APIKEY_PREFIX):
return BearerToken(
AuthFlowType.API_AUTH,
token.removeprefix(APIKEY_PREFIX),
)
except ValueError:
return None
else:
return BearerToken(AuthFlowType.USER_AUTH, token)