This commit is contained in:
2025-03-28 20:37:04 -03:00
parent a1141dcce8
commit dbe7a924e2
10 changed files with 89 additions and 105 deletions

View File

@@ -24,6 +24,7 @@ Example
"""
from dataclasses import asdict, dataclass
from typing import Any
import boto3
from aws_lambda_powertools import Logger, Tracer
@@ -53,31 +54,46 @@ collect = DynamoDBCollection(user_layer)
@tracer.capture_lambda_handler
@logger.inject_lambda_context
@event_source(data_class=APIGatewayAuthorizerEventV2)
def lambda_handler(event: APIGatewayAuthorizerEventV2, context: LambdaContext):
def lambda_handler(event: APIGatewayAuthorizerEventV2, context: LambdaContext) -> dict:
bearer = _parse_bearer_token(event.headers.get('authorization', ''))
if not bearer:
return APIGatewayAuthorizerResponseV2(authorize=False).asdict()
kwargs = asdict(_authorizer(bearer))
return APIGatewayAuthorizerResponseV2(**kwargs).asdict()
attrs = _authorizer(bearer).asdict()
return APIGatewayAuthorizerResponseV2(**attrs).asdict()
class TokenType(str, Enum):
API_KEY = 'API_KEY'
USER_TOKEN = 'USER_TOKEN'
class AuthFlowType(str, Enum):
USER_AUTH = 'USER_AUTH'
API_AUTH = 'API_AUTH'
@dataclass
class BearerToken:
auth_type: TokenType
auth_flow_type: AuthFlowType
token: str
@dataclass
class Authorizer:
authorize: bool = False
context: dict | None = None
context: dict[str, Any] | None = None
auth_flow_type: AuthFlowType = AuthFlowType.USER_AUTH
def asdict(self) -> dict:
data = asdict(self)
auth_flow_type = data.pop('auth_flow_type')
# If authorization is enabled, add `auth_flow_type` to the context
if self.authorize:
data['context'].update(auth_flow_type=auth_flow_type)
return data
def _get_apikey(token: str) -> dict[str, dict | str]:
return collect.get_item(KeyPair('apikey', token))
def _authorizer(bearer: BearerToken) -> Authorizer:
@@ -95,22 +111,13 @@ def _authorizer(bearer: BearerToken) -> Authorizer:
An Authorizer object with the appropriate authorization status and context.
"""
try:
match bearer.auth_type:
case TokenType.USER_TOKEN:
user = get_user(bearer.token, idp_client=idp_client)
return Authorizer(True, {'user': user})
case TokenType.API_KEY:
apikey = collect.get_item(KeyPair('apikey', bearer.token))
return Authorizer(
True,
pick(
(
'user',
'tenant',
),
apikey,
),
)
if bearer.auth_flow_type == AuthFlowType.USER_AUTH:
user = get_user(bearer.token, idp_client)
return Authorizer(True, {'user': user})
apikey = _get_apikey(bearer.token)
context = pick(('tenant', 'user'), apikey)
return Authorizer(True, context, AuthFlowType.API_AUTH)
except Exception:
return Authorizer()
@@ -118,14 +125,14 @@ def _authorizer(bearer: BearerToken) -> Authorizer:
def _parse_bearer_token(s: str) -> BearerToken | None:
"""Parses and identifies a bearer token as either an API key or a user token."""
try:
_, bearer_token = s.split(' ')
_, token = s.split(' ')
if bearer_token.startswith(APIKEY_PREFIX):
if token.startswith(APIKEY_PREFIX):
return BearerToken(
TokenType.API_KEY,
bearer_token.removeprefix(APIKEY_PREFIX),
AuthFlowType.API_AUTH,
token.removeprefix(APIKEY_PREFIX),
)
except ValueError:
return None
else:
return BearerToken(TokenType.USER_TOKEN, bearer_token)
return BearerToken(AuthFlowType.USER_AUTH, token)