from aws_lambda_powertools.event_handler.api_gateway import ( APIGatewayHttpResolver, Response, ) from aws_lambda_powertools.event_handler.middlewares import ( BaseMiddlewareHandler, NextMiddleware, ) from aws_lambda_powertools.shared.functions import ( extract_event_from_common_models, ) from layercake.dateutils import now, ttl from layercake.dynamodb import ( ComposeKey, DynamoDBCollection, KeyPair, ) from layercake.funcs import pick from .authentication_middleware import User YEAR_DAYS = 365 LOG_RETENTION_DAYS = YEAR_DAYS * 2 class AuditLogMiddleware(BaseMiddlewareHandler): """This middleware logs audit details for successful requests, storing user, action, and IP info with a specified retention period. Parameters ---------- action: str The identifier for the audit log action. collect: DynamoDBCollection The collection instance used to persist the audit log data. audit_attrs: tuple of str, optional A tuple of attribute names to extract from the response body for logging. These represent the specific fields to include in the audit log. retention_days: int or None, optional The number of days the log is retained on the server. If None, no time-to-live (TTL) will be applied. """ def __init__( self, action: str, /, collection: DynamoDBCollection, audit_attrs: tuple[str, ...] = (), retention_days: int | None = LOG_RETENTION_DAYS, ) -> None: self.action = action self.collection = collection self.audit_attrs = audit_attrs self.retention_days = retention_days def handler( self, app: APIGatewayHttpResolver, next_middleware: NextMiddleware, ) -> Response: user: User | None = app.context.get('user') req_context = app.current_event.request_context ip_addr = req_context.http.source_ip response = next_middleware(app) # Successful response # https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#successful_responses if 200 <= response.status_code < 300 and user: now_ = now() author = pick(('id', 'name'), dict(user)) data = ( pick(self.audit_attrs, extract_event_from_common_models(response.body)) if response.is_json() else None ) retention_days = ( ttl(start_dt=now_, days=self.retention_days) if self.retention_days else None ) self.collection.put_item( key=KeyPair( # Post-migration: remove `delimiter` and update prefix # from `log` to `logs#user` in ComposeKey. pk=ComposeKey(user.id, prefix='log', delimiter=':'), sk=now_.isoformat(), ), action=self.action, data=data, ip=ip_addr, author=author, ttl=retention_days, ) return response