from aws_lambda_powertools.event_handler.api_gateway import ( APIGatewayHttpResolver, Response, ) from aws_lambda_powertools.event_handler.middlewares import ( BaseMiddlewareHandler, NextMiddleware, ) from aws_lambda_powertools.shared.functions import ( extract_event_from_common_models, ) from layercake.dateutils import now, ttl from layercake.dynamodb import ComposeKey, DynamoDBCollection, KeyPair from pydantic import UUID4, BaseModel, Field LOG_RETENTION_DAYS = 365 * 2 # 2 years class AuthenticatedUser(BaseModel): id: str = Field(alias='custom:user_id') name: str email: str email_verified: bool sub: UUID4 class AuthorizerMiddleware(BaseMiddlewareHandler): def handler( self, app: APIGatewayHttpResolver, next_middleware: NextMiddleware, ) -> Response: # Gets the Lambda authorizer associated with the current API Gateway event. # You can check the file `auth.py` for more details. authorizer = app.current_event.request_context.authorizer.get_lambda if 'user' in authorizer: user = authorizer['user'] app.append_context(authenticated_user=AuthenticatedUser(**user)) return next_middleware(app) class AuditLogMiddleware(BaseMiddlewareHandler): """This middleware logs audit details for successful requests, storing user, action, and IP info with a specified retention period..""" def __init__( self, action: str, /, collect: DynamoDBCollection, retention_days: int | None = LOG_RETENTION_DAYS, ) -> None: self.action = action self.collect = collect self.retention_days = retention_days def handler( self, app: APIGatewayHttpResolver, next_middleware: NextMiddleware, ) -> Response: collect = self.collect response = next_middleware(app) user = app.context.get('authenticated_user') request_ctx = app.current_event.request_context ip_addr = request_ctx.http.source_ip # Successful request if 200 <= response.status_code < 300 and user: now_ = now() data = ( extract_event_from_common_models(response.body) if response.is_json() else None ) retention_days = ( ttl(start_dt=now_, days=self.retention_days) if self.retention_days else None ) collect.put_item( key=KeyPair( pk=ComposeKey(user.id, prefix='logs'), sk=now_.isoformat(), ), action=self.action, data=data, ip=ip_addr, ttl=retention_days, ) return response