Files
saladeaula.digital/api.saladeaula.digital/app/routes/orgs/users/__init__.py

276 lines
8.1 KiB
Python

from http import HTTPStatus
from typing import Annotated
from uuid import uuid4
from aws_lambda_powertools.event_handler.api_gateway import Router
from aws_lambda_powertools.event_handler.exceptions import (
NotFoundError,
ServiceError,
)
from aws_lambda_powertools.event_handler.openapi.params import Body
from layercake.dateutils import now, ttl
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, SortKey
from layercake.extra_types import CnpjStr, CpfStr, NameStr
from pydantic import BaseModel, EmailStr, Field
from api_gateway import JSONResponse
from boto3clients import dynamodb_client
from config import INTERNAL_EMAIL_DOMAIN, USER_TABLE
router = Router()
dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
class Org(BaseModel):
id: str | None = Field(default=None, exclude=True)
name: str
cnpj: CnpjStr
class User(BaseModel):
name: NameStr
cpf: CpfStr
email: EmailStr
class ConflictError(ServiceError):
def __init__(self, msg: str | dict):
super().__init__(HTTPStatus.CONFLICT, msg)
class CPFConflictError(ConflictError): ...
class EmailConflictError(ConflictError): ...
class UserConflictError(ConflictError): ...
class UserNotFoundError(NotFoundError): ...
class OrgNotFoundError(NotFoundError): ...
@router.post('/<org_id>/users')
def add(
org_id: str,
user: Annotated[User, Body(embed=True)],
org: Annotated[Org, Body(embed=True)],
):
org.id = org_id
if _create_user(user, org):
return JSONResponse(HTTPStatus.CREATED)
user_id = _get_user_id(user)
_add_member(user_id, org)
return JSONResponse(HTTPStatus.NO_CONTENT)
@router.delete('/<org_id>/users/<user_id>')
def unlink(org_id: str, user_id: str):
with dyn.transact_writer() as transact:
transact.delete(
key=KeyPair(
pk=f'orgmembers#{org_id}',
# Post-migration: uncomment the following line
# pk=f'MEMBER#ORG#{org_id}',
sk=user_id,
)
)
transact.delete(
key=KeyPair(org_id, f'admins#{user_id}'),
# Post-migration: uncomment the following line
# key=KeyPair(org_id, f'ADMIN#{user_id}'),
)
transact.delete(
key=KeyPair(
pk=user_id,
sk=f'orgs#{org_id}',
# Post-migration: uncomment the following line
# pk=f'ORG#{org_id}',
)
)
transact.update(
key=KeyPair(user_id, '0'),
update_expr='DELETE tenant_id :org_id',
expr_attr_values={':org_id': {org_id}},
)
return JSONResponse(HTTPStatus.NO_CONTENT)
def _create_user(user: User, org: Org) -> bool:
now_ = now()
user_id = uuid4()
email_verified = INTERNAL_EMAIL_DOMAIN in user.email
try:
with dyn.transact_writer() as transact:
transact.put(
item=user.model_dump()
| {
'id': user_id,
'sk': '0',
'email_verified': email_verified,
'tenant_id': {org.id},
# Post-migration (users): uncomment the folloing line
# 'org_id': {org.id},
# 'created_at': now_,
'createDate': now_,
# Makes the email searchable
'emails': {user.email},
},
)
transact.put(
item={
'id': user_id,
'sk': 'NEVER_LOGGED',
'created_at': now_,
}
)
transact.put(
item={
'id': user_id,
# Post-migration: rename `emails` to `EMAIL`
'sk': f'emails#{user.email}',
'email_verified': email_verified,
'email_primary': True,
'created_at': now_,
**({'mx_record_exists': True} if email_verified else {}),
}
)
if not email_verified:
transact.put(
item={
'id': user_id,
'sk': f'EMAIL_VERIFICATION#{uuid4()}',
'welcome': True,
'name': user.name,
'email': user.email,
'org_name': org.name,
'ttl': ttl(start_dt=now_, days=30),
'created_at': now_,
}
)
transact.put(
item={
# Post-migration (users): rename `cpf` to `CPF`
'id': 'cpf',
'sk': user.cpf,
'user_id': user_id,
'created_at': now_,
},
cond_expr='attribute_not_exists(sk)',
exc_cls=CPFConflictError,
)
transact.put(
item={
# Post-migration (users): rename `email` to `EMAIL`
'id': 'email',
'sk': user.email,
'user_id': user_id,
'created_at': now_,
},
cond_expr='attribute_not_exists(sk)',
exc_cls=EmailConflictError,
)
transact.put(
item={
'id': user_id,
'sk': f'orgs#{org.id}',
# Post-migration (users): uncomment the following line
# pk=f'ORG#{org.id}',
'name': org.name,
'cnpj': org.cnpj,
'created_at': now_,
}
)
transact.put(
item={
'id': f'orgmembers#{org.id}',
# Post-migration (users): uncomment the following line
# pk=f'MEMBER#ORG#{org_id}',
'sk': user_id,
'created_at': now_,
}
)
transact.condition(
key=KeyPair(org.id, '0'), # type: ignore
cond_expr='attribute_exists(sk)',
exc_cls=OrgNotFoundError,
)
except (CPFConflictError, EmailConflictError):
return False
else:
return True
def _add_member(user_id: str, org: Org) -> None:
now_ = now()
with dyn.transact_writer() as transact:
transact.update(
key=KeyPair(user_id, '0'),
# Post-migration (users): uncomment the following line
# update_expr='ADD tenant_id :org_id',
update_expr='ADD tenant_id :org_id',
expr_attr_values={
':org_id': {org.id},
},
cond_expr='attribute_exists(sk)',
exc_cls=UserNotFoundError,
)
transact.put(
item={
'id': user_id,
# Post-migration (users): rename `orgs` to `ORG`
'sk': f'orgs#{org.id}',
'name': org.name,
'cnpj': org.cnpj,
'created_at': now_,
}
)
transact.put(
item={
# Post-migration (users): uncomment the following line
# pk=f'MEMBER#ORG#{org_id}',
'id': f'orgmembers#{org.id}',
'sk': user_id,
'created_at': now_,
},
cond_expr='attribute_not_exists(sk)',
exc_cls=UserConflictError,
)
transact.condition(
key=KeyPair(org.id, '0'), # type: ignore
cond_expr='attribute_exists(sk)',
exc_cls=OrgNotFoundError,
)
def _get_user_id(user: User) -> str:
user_id = dyn.collection.get_items(
KeyPair(
pk='email',
sk=SortKey(user.email, path_spec='user_id'),
rename_key='id',
)
+ KeyPair(
pk='cpf',
sk=SortKey(user.cpf, path_spec='user_id'),
rename_key='id',
),
flatten_top=False,
).get('id')
if not user_id:
raise UserNotFoundError('User not found')
return user_id