from http import HTTPStatus from typing import Annotated from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.openapi.params import Body from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair from api_gateway import JSONResponse from boto3clients import dynamodb_client from config import USER_TABLE router = Router() dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) @router.get('//admins') def get_admins(org_id: str): return dyn.collection.query( # Post-migration: rename `admins` to `ADMIN` KeyPair(org_id, 'admins#'), limit=100, ) @router.delete('//admins') def revoke(org_id: str, user_id: Annotated[str, Body(embed=True)]): with dyn.transact_writer() as transact: transact.delete( # Post-migration: rename `admins` to `ADMIN` key=KeyPair(org_id, f'admins#{user_id}'), ) return JSONResponse(status_code=HTTPStatus.NO_CONTENT)