import json from http import HTTPStatus from typing import Annotated import boto3 from aws_lambda_powertools.event_handler.api_gateway import ( Response, Router, ) from elasticsearch import Elasticsearch from layercake.dynamodb import ( ComposeKey, DynamoDBCollection, DynamoDBPersistenceLayer, KeyPair, PartitionKey, ) from pydantic import UUID4, BaseModel, StringConstraints import elastic from models import User from settings import ELASTIC_CONN, USER_TABLE router = Router() dynamodb_client = boto3.client('dynamodb') user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) collect = DynamoDBCollection(user_layer) elastic_client = Elasticsearch(**ELASTIC_CONN) @router.get('/', compress=True, tags=['User'], summary='Get users') def get_users(): event = router.current_event query = event.get_query_string_value('query', '{}') page_size = event.get_query_string_value('page_size', '25') return elastic.search( index=USER_TABLE, page_size=int(page_size), query=json.loads(query), elastic_client=elastic_client, ) @router.post('/', compress=True, tags=['User'], summary='Create user') def post_user(payload: User): return Response(status_code=HTTPStatus.CREATED) class ResetPasswordPayload(BaseModel): cognito_sub: UUID4 new_password: Annotated[str, StringConstraints(min_length=6)] @router.patch('/', compress=True, tags=['User']) def patch_reset(id: str, payload: ResetPasswordPayload): return Response(status_code=HTTPStatus.OK) @router.get('//emails', compress=True, tags=['User'], summary='Get user emails') def get_emails(id: str): start_key = router.current_event.get_query_string_value('start_key', None) return collect.get_items( key=KeyPair(id, 'emails'), start_key=start_key, ) @router.get('//logs', compress=True, tags=['User'], summary='Get user logs') def get_logs(id: str): start_key = router.current_event.get_query_string_value('start_key', None) return collect.get_items( key=PartitionKey(ComposeKey(id, prefix='log', delimiter=':')), start_key=start_key, ) @router.get('//orgs', compress=True, tags=['User'], summary='Get user orgs') def get_orgs(id: str): start_key = router.current_event.get_query_string_value('start_key', None) return collect.get_items( key=KeyPair(id, 'orgs'), start_key=start_key, )