Files
2025-08-17 19:38:04 -03:00

238 lines
6.4 KiB
Python

from datetime import timedelta
from types import SimpleNamespace
from typing import TypedDict
from aws_lambda_powertools.event_handler.exceptions import (
BadRequestError,
)
from layercake.dateutils import now, ttl
from layercake.dynamodb import (
DynamoDBPersistenceLayer,
KeyPair,
SortKey,
)
User = TypedDict('User', {'id': str, 'name': str, 'cpf': str})
class CPFConflictError(BadRequestError):
def __init__(self, *_):
super().__init__('CPF already exists')
class RateLimitError(BadRequestError):
def __init__(self, *_):
super().__init__('Update limit reached')
class EmailConflictError(BadRequestError):
def __init__(self, *_):
super().__init__('Email already exists')
def update_user(
obj: User,
/,
*,
persistence_layer: DynamoDBPersistenceLayer,
) -> bool:
now_ = now()
user = SimpleNamespace(**obj)
# Get the user's CPF, if it exists.
curr_cpf = persistence_layer.collection.get_item(
KeyPair(
pk=user.id,
sk=SortKey('0', path_spec='cpf'),
),
)
with persistence_layer.transact_writer() as transact:
transact.update(
key=KeyPair(user.id, '0'),
update_expr='SET #name = :name, cpf = :cpf, updated_at = :updated_at',
expr_attr_names={
'#name': 'name',
},
expr_attr_values={
':name': user.name,
':cpf': user.cpf,
':updated_at': now_,
},
cond_expr='attribute_exists(sk)',
)
# Prevent the user from updating more than once every 24 hours
transact.put(
item={
'id': user.id,
'sk': 'RATE_LIMIT#USER_UPDATE',
'created_at': now_,
'ttl': ttl(start_dt=now_ + timedelta(hours=24)),
},
exc_cls=RateLimitError,
cond_expr='attribute_not_exists(sk)',
)
if user.cpf != curr_cpf:
transact.put(
item={
'id': 'cpf',
'sk': user.cpf,
'user_id': user.id,
'created_at': now_,
},
cond_expr='attribute_not_exists(sk)',
exc_cls=CPFConflictError,
)
# Ensures that the old CPF is discarded
if curr_cpf:
transact.delete(key=KeyPair('cpf', curr_cpf))
return True
def add_email(
id: str,
email: str,
/,
*,
persistence_layer: DynamoDBPersistenceLayer,
):
now_ = now()
with persistence_layer.transact_writer() as transact:
# Ensure email is searchable
transact.update(
key=KeyPair(id, '0'),
update_expr='ADD emails :email',
expr_attr_values={
':email': {email},
},
)
transact.put(
item={
'id': id,
'sk': f'emails#{email}',
'email_primary': False,
'email_verified': False,
'create_date': now_,
},
cond_expr='attribute_not_exists(sk)',
)
# Prevent duplicate emails
transact.put(
item={
'id': 'email',
'sk': email,
'user_id': id,
'create_date': now_,
},
cond_expr='attribute_not_exists(sk)',
exc_cls=EmailConflictError,
)
return True
def remove_email(
id: str,
email: str,
/,
*,
persistence_layer: DynamoDBPersistenceLayer,
) -> bool:
"""Delete any email except the primary email."""
with persistence_layer.transact_writer() as transact:
transact.delete(key=KeyPair('email', email))
transact.delete(
key=KeyPair(id, f'emails#{email}'),
cond_expr='email_primary <> :email_primary',
expr_attr_values={
':email_primary': True,
},
exc_cls=BadRequestError,
)
transact.update(
key=KeyPair(id, '0'),
update_expr='DELETE emails :email',
expr_attr_values={
':email': {email},
},
)
return True
def set_email_as_primary(
id: str,
new_email: str,
old_email: str,
/,
*,
email_verified: bool = False,
persistence_layer: DynamoDBPersistenceLayer,
):
now_ = now()
expr = 'SET email_primary = :email_primary, updated_at = :updated_at'
with persistence_layer.transact_writer() as transact:
# Set the old email as non-primary
transact.update(
key=KeyPair(id, f'emails#{old_email}'),
update_expr=expr,
expr_attr_values={
':email_primary': False,
':updated_at': now_,
},
)
# Set the new email as primary
transact.update(
key=KeyPair(id, f'emails#{new_email}'),
update_expr=expr,
expr_attr_values={
':email_primary': True,
':updated_at': now_,
},
)
transact.update(
key=KeyPair(id, '0'),
update_expr='DELETE emails :email_set SET email = :email, \
email_verified = :email_verified, updated_at = :updated_at',
expr_attr_values={
':email': new_email,
':email_set': {new_email},
':email_verified': email_verified,
':updated_at': now_,
},
)
return True
def remove_org_member(
id: str,
*,
org_id: str,
persistence_layer: DynamoDBPersistenceLayer,
) -> bool:
now_ = now()
with persistence_layer.transact_writer() as transact:
# Remove the user's relationship with the organization and their privileges
transact.delete(key=KeyPair(id, f'acls#{org_id}'))
transact.delete(key=KeyPair(id, f'orgs#{org_id}'))
transact.update(
key=KeyPair(id, '0'),
update_expr='DELETE tenant_id :tenant_id SET updated_at = :updated_at',
expr_attr_values={
':tenant_id': {org_id},
':updated_at': now_,
},
)
# Remove the user from the organization's admins and members list
transact.delete(key=KeyPair(org_id, f'admins#{id}'))
transact.delete(key=KeyPair(f'orgmembers#{org_id}', id))
return True