import json from http import HTTPStatus from typing import Annotated from aws_lambda_powertools.event_handler.api_gateway import ( Response, Router, ) from aws_lambda_powertools.event_handler.exceptions import ( BadRequestError as PowertoolsBadRequestError, ) from elasticsearch import Elasticsearch from layercake.dynamodb import ( ComposeKey, DynamoDBCollection, DynamoDBPersistenceLayer, KeyPair, MissingError, PartitionKey, PrefixKey, ) from pydantic import UUID4, BaseModel, StringConstraints import elastic from boto3clients import dynamodb_client from models import User from settings import ELASTIC_CONN, USER_TABLE class BadRequestError(MissingError, PowertoolsBadRequestError): ... router = Router() user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) collect = DynamoDBCollection(user_layer, exception_cls=BadRequestError) elastic_client = Elasticsearch(**ELASTIC_CONN) @router.get( '/', compress=True, tags=['User'], summary='Get users', ) def get_users(): event = router.current_event query = event.get_query_string_value('query', '{}') page_size = event.get_query_string_value('page_size', '25') return elastic.search( index=USER_TABLE, page_size=int(page_size), query=json.loads(query), elastic_client=elastic_client, ) @router.post( '/', compress=True, tags=['User'], summary='Create user', ) def post_user(payload: User): return Response(status_code=HTTPStatus.CREATED) class NewPasswordPayload(BaseModel): cognito_sub: UUID4 new_password: Annotated[str, StringConstraints(min_length=6)] @router.patch('/', compress=True, tags=['User']) def patch_reset(id: str, payload: NewPasswordPayload): return Response(status_code=HTTPStatus.OK) @router.get( '/', compress=True, tags=['User'], summary='Get user', ) def get_user(id: str): return collect.get_item(KeyPair(id, '0')) @router.get('//idp', compress=True, include_in_schema=False) def get_idp(id: str): return [] @router.get( '//emails', compress=True, tags=['User'], summary='Get user emails', ) def get_emails(id: str): return collect.get_items( KeyPair(id, PrefixKey('emails#')), start_key=router.current_event.get_query_string_value('start_key', None), ) @router.get( '//logs', compress=True, tags=['User'], summary='Get user logs', ) def get_logs(id: str): return collect.get_items( # Post-migration: uncomment to enable PartitionKey with a composite key (id with `logs` prefix). # PartitionKey(ComposeKey(id, prefix='logs')), PartitionKey(ComposeKey(id, prefix='log', delimiter=':')), start_key=router.current_event.get_query_string_value('start_key', None), ) @router.get( '//orgs', compress=True, tags=['User'], summary='Get user orgs', ) def get_orgs(id: str): return collect.get_items( KeyPair(id, PrefixKey('orgs#')), start_key=router.current_event.get_query_string_value('start_key', None), )