import re from http import HTTPStatus from typing import Annotated from aws_lambda_powertools.event_handler.api_gateway import Router from aws_lambda_powertools.event_handler.openapi.params import Body from layercake.dateutils import now from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair from pydantic import BaseModel, ConfigDict, field_validator from api_gateway import JSONResponse from boto3clients import dynamodb_client from config import USER_TABLE router = Router() dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) @router.get('//address') def address(org_id: str): return dyn.collection.get_item( KeyPair(org_id, 'METADATA#ADDRESS'), raise_on_error=False, default={}, ) class Address(BaseModel): model_config = ConfigDict(str_strip_whitespace=True) postcode: str address1: str address2: str | None = None neighborhood: str city: str state: str @field_validator('postcode') @classmethod def _only_numbers(cls, value: str) -> str: return re.sub(r'\D', '', value) @router.post('//address') def update(org_id: str, address: Annotated[Address, Body(embed=True)]): dyn.collection.put_item( key=KeyPair(org_id, 'METADATA#ADDRESS'), updated_at=now(), **address.model_dump(exclude_none=True), ) return JSONResponse( body=address, status_code=HTTPStatus.OK, )