from aws_lambda_powertools.event_handler.api_gateway import ( APIGatewayHttpResolver, Response, ) from aws_lambda_powertools.event_handler.middlewares import ( BaseMiddlewareHandler, NextMiddleware, ) from aws_lambda_powertools.shared.functions import ( extract_event_from_common_models, ) from layercake.dateutils import now, ttl from layercake.dynamodb import ComposeKey, DynamoDBCollection, KeyPair from layercake.funcs import pick from pydantic import UUID4, BaseModel, EmailStr, Field from auth import AuthFlowType LOG_RETENTION_DAYS = 365 * 2 # 2 years class User(BaseModel): id: str name: str email: EmailStr class CognitoUser(User): id: str = Field(alias='custom:user_id') email_verified: bool sub: UUID4 class AuthorizerMiddleware(BaseMiddlewareHandler): def handler( self, app: APIGatewayHttpResolver, next_middleware: NextMiddleware, ) -> Response: # Gets the Lambda authorizer associated with the current API Gateway event. # You can check the file `auth.py` for more details. context = app.current_event.request_context.authorizer.get_lambda auth_flow_type = context.get('auth_flow_type') if not auth_flow_type: return next_middleware(app) cls = { AuthFlowType.USER_AUTH: CognitoUser, AuthFlowType.API_AUTH: User, }.get(auth_flow_type) if cls: app.append_context(user=cls(**context['user'])) return next_middleware(app) class TenantMiddleware(BaseMiddlewareHandler): def handler( self, app: APIGatewayHttpResolver, next_middleware: NextMiddleware, ) -> Response: context = app.current_event.request_context.authorizer.get_lambda tenant = app.current_event.headers.get('x-tenant') auth_flow_type = context.get('auth_flow_type') match auth_flow_type, tenant: case AuthFlowType.API_AUTH, None: app.append_context(tenant=context['tenant']) case AuthFlowType.USER_AUTH, str(): print(tenant) return next_middleware(app) class AuditLogMiddleware(BaseMiddlewareHandler): """This middleware logs audit details for successful requests, storing user, action, and IP info with a specified retention period. Parameters ---------- action : str The identifier for the audit log action. collect : DynamoDBCollection The collection instance used to persist the audit log data. audit_attrs : tuple of str, optional A tuple of attribute names to extract from the response body for logging. These represent the specific fields to include in the audit log. retention_days : int or None, optional The number of days the log is retained on the server. If None, no time-to-live (TTL) will be applied. """ def __init__( self, action: str, /, collect: DynamoDBCollection, audit_attrs: tuple[str, ...] = (), retention_days: int | None = LOG_RETENTION_DAYS, ) -> None: self.action = action self.collect = collect self.audit_attrs = audit_attrs self.retention_days = retention_days def handler( self, app: APIGatewayHttpResolver, next_middleware: NextMiddleware, ) -> Response: user = app.context.get('user') req_context = app.current_event.request_context ip_addr = req_context.http.source_ip response = next_middleware(app) # Successful response # https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#successful_responses if 200 <= response.status_code < 300 and user: now_ = now() data = ( pick(self.audit_attrs, extract_event_from_common_models(response.body)) if response.is_json() else None ) retention_days = ( ttl(start_dt=now_, days=self.retention_days) if self.retention_days else None ) self.collect.put_item( key=KeyPair( # Post-migration: remove `delimiter` from ComposeKey. pk=ComposeKey(user.id, prefix='logs', delimiter=':'), sk=now_.isoformat(), ), action=self.action, data=data, ip=ip_addr, ttl=retention_days, ) return response