From 797a325cb001f20447af920e16aaca7a9c76451a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Rafael=20Siqueira?= Date: Wed, 28 May 2025 17:52:15 -0300 Subject: [PATCH] update layercake version --- .../app/events/set_status_as_archived.py | 46 +-- .../app/events/stopgap/del_vacancies.py | 6 +- enrollment-management/template.yaml | 2 +- .../events/stopgap/test_del_vacancies.py | 2 +- enrollment-management/uv.lock | 2 +- http-api/app/config.py | 36 +++ .../app/middlewares/audit_log_middleware.py | 6 +- http-api/app/middlewares/tenant_middelware.py | 14 +- http-api/app/routes/courses/__init__.py | 12 +- http-api/app/routes/enrollments/cancel.py | 55 ++++ http-api/app/routes/enrollments/enroll.py | 64 ++++ http-api/app/routes/settings/__init__.py | 6 +- http-api/app/routes/users/orgs.py | 8 +- http-api/app/rules/course.py | 75 ++--- http-api/app/rules/enrollment.py | 269 ++++++++--------- http-api/app/rules/org.py | 50 ++-- http-api/app/rules/user.py | 275 +++++++++--------- http-api/tests/routes/test_courses.py | 9 +- http-api/uv.lock | 2 +- layercake/layercake/dynamodb.py | 168 +++++------ layercake/pyproject.toml | 2 +- layercake/tests/test_dynamodb.py | 47 ++- layercake/uv.lock | 2 +- .../app/events/assign_tenant_cnpj.py | 52 ++-- .../app/events/stopgap/set_as_paid.py | 42 +-- order-management/template.yaml | 2 +- .../tests/events/test_assign_tenant.py | 2 +- order-management/uv.lock | 2 +- 28 files changed, 692 insertions(+), 566 deletions(-) create mode 100644 http-api/app/config.py create mode 100644 http-api/app/routes/enrollments/cancel.py create mode 100644 http-api/app/routes/enrollments/enroll.py diff --git a/enrollment-management/app/events/set_status_as_archived.py b/enrollment-management/app/events/set_status_as_archived.py index 33dd1ba..fd130af 100644 --- a/enrollment-management/app/events/set_status_as_archived.py +++ b/enrollment-management/app/events/set_status_as_archived.py @@ -5,7 +5,7 @@ from aws_lambda_powertools.utilities.data_classes import ( ) from aws_lambda_powertools.utilities.typing import LambdaContext from layercake.dateutils import now -from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, TransactItems +from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair from boto3clients import dynamodb_client from config import ENROLLMENT_TABLE @@ -19,28 +19,28 @@ enrollment_layer = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client) def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: new_image = event.detail['new_image'] now_ = now() - transact = TransactItems(enrollment_layer.table_name) - transact.update( - key=KeyPair(new_image['id'], '0'), - update_expr='SET #status = :archived, update_date = :update_date', - cond_expr='#status = :completed', - expr_attr_names={ - '#status': 'status', - }, - expr_attr_values={ - ':archived': 'ARCHIVED', - ':completed': 'COMPLETED', - ':update_date': now_, - }, - ) - transact.put( - item={ - 'id': new_image['id'], - 'sk': 'archived_date', - 'create_date': now_, - }, - ) - enrollment_layer.transact_write_items(transact) + with enrollment_layer.transact_items() as transact: + transact.update( + key=KeyPair(new_image['id'], '0'), + update_expr='SET #status = :archived, update_date = :update_date', + cond_expr='#status = :completed', + expr_attr_names={ + '#status': 'status', + }, + expr_attr_values={ + ':archived': 'ARCHIVED', + ':completed': 'COMPLETED', + ':update_date': now_, + }, + ) + transact.put( + item={ + 'id': new_image['id'], + 'sk': 'archived_date', + 'create_date': now_, + }, + ) + transact.write_items() return True diff --git a/enrollment-management/app/events/stopgap/del_vacancies.py b/enrollment-management/app/events/stopgap/del_vacancies.py index ef41bf2..fb9ac7e 100644 --- a/enrollment-management/app/events/stopgap/del_vacancies.py +++ b/enrollment-management/app/events/stopgap/del_vacancies.py @@ -26,7 +26,7 @@ enrollment_layer = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client) def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: new_image = event.detail['new_image'] order_id = new_image['id'] - data = order_layer.collect.get_items( + data = order_layer.collection.get_items( TransactKey(order_id) + SortKey('0') + KeyPair( @@ -42,7 +42,7 @@ def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: total = data['total'] tenant_id = data['tenant_id'].removeprefix('ORG#') - policy = user_layer.collect.get_item( + policy = user_layer.collection.get_item( KeyPair(pk=tenant_id, sk='metadata#billing_policy'), raise_on_error=False, default=False, @@ -51,7 +51,7 @@ def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: if not policy or total <= 0: return False - result = enrollment_layer.collect.query( + result = enrollment_layer.collection.query( KeyPair( ComposeKey(tenant_id, prefix='vacancies'), order_id, diff --git a/enrollment-management/template.yaml b/enrollment-management/template.yaml index e2f8a55..0e338c4 100644 --- a/enrollment-management/template.yaml +++ b/enrollment-management/template.yaml @@ -20,7 +20,7 @@ Globals: Architectures: - x86_64 Layers: - - !Sub arn:aws:lambda:sa-east-1:336641857101:layer:layercake:68 + - !Sub arn:aws:lambda:sa-east-1:336641857101:layer:layercake:72 Environment: Variables: TZ: America/Sao_Paulo diff --git a/enrollment-management/tests/events/stopgap/test_del_vacancies.py b/enrollment-management/tests/events/stopgap/test_del_vacancies.py index f73c51d..a485467 100644 --- a/enrollment-management/tests/events/stopgap/test_del_vacancies.py +++ b/enrollment-management/tests/events/stopgap/test_del_vacancies.py @@ -23,7 +23,7 @@ def test_del_vacancies( } assert app.lambda_handler(event, lambda_context) # type: ignore - result = dynamodb_persistence_layer.collect.query( + result = dynamodb_persistence_layer.collection.query( PartitionKey('vacancies#cJtK9SsnJhKPyxESe7g3DG') ) diff --git a/enrollment-management/uv.lock b/enrollment-management/uv.lock index 65176fc..7526fe8 100644 --- a/enrollment-management/uv.lock +++ b/enrollment-management/uv.lock @@ -522,7 +522,7 @@ wheels = [ [[package]] name = "layercake" -version = "0.4.0" +version = "0.6.2" source = { directory = "../layercake" } dependencies = [ { name = "arnparse" }, diff --git a/http-api/app/config.py b/http-api/app/config.py new file mode 100644 index 0000000..282c1b9 --- /dev/null +++ b/http-api/app/config.py @@ -0,0 +1,36 @@ +import os + +USER_TABLE: str = os.getenv('USER_TABLE') # type: ignore +ORDER_TABLE: str = os.getenv('ORDER_TABLE') # type: ignore +ENROLLMENT_TABLE: str = os.getenv('ENROLLMENT_TABLE') # type: ignore +COURSE_TABLE: str = os.getenv('COURSE_TABLE') # type: ignore + +KONVIVA_API_URL: str = os.getenv('KONVIVA_API_URL') # type: ignore +KONVIVA_SECRET_KEY: str = os.getenv('KONVIVA_SECRET_KEY') # type: ignore + +MEILISEARCH_HOST: str = os.getenv('MEILISEARCH_HOST') # type: ignore +MEILISEARCH_API_KEY: str = os.getenv('MEILISEARCH_API_KEY') # type: ignore + + +match os.getenv('AWS_SAM_LOCAL'), os.getenv('PYTEST_VERSION'): + case str() as SAM_LOCAL, _ if SAM_LOCAL: # Only when running `sam local start-api` + MEILISEARCH_HOST = 'http://host.docker.internal:7700' + ELASTIC_CONN = { + 'hosts': 'http://host.docker.internal:9200', + } + case _, str() as PYTEST if PYTEST: # Only when running `pytest` + MEILISEARCH_HOST = 'http://127.0.0.1:7700' + ELASTIC_CONN = { + 'hosts': 'http://127.0.0.1:9200', + } + case _: + MEILISEARCH_HOST: str = os.getenv('MEILISEARCH_HOST') # type: ignore + + ELASTIC_CLOUD_ID = os.getenv('ELASTIC_CLOUD_ID') + ELASTIC_AUTH_PASS = os.getenv('ELASTIC_AUTH_PASS') + ELASTIC_CONN = { + 'cloud_id': ELASTIC_CLOUD_ID, + 'basic_auth': ('elastic', ELASTIC_AUTH_PASS), + } + +USER_POOOL_ID = 'sa-east-1_s6YmVSfXj' diff --git a/http-api/app/middlewares/audit_log_middleware.py b/http-api/app/middlewares/audit_log_middleware.py index 0d55efa..60ddc0e 100644 --- a/http-api/app/middlewares/audit_log_middleware.py +++ b/http-api/app/middlewares/audit_log_middleware.py @@ -45,12 +45,12 @@ class AuditLogMiddleware(BaseMiddlewareHandler): self, action: str, /, - collect: DynamoDBCollection, + collection: DynamoDBCollection, audit_attrs: tuple[str, ...] = (), retention_days: int | None = LOG_RETENTION_DAYS, ) -> None: self.action = action - self.collect = collect + self.collection = collection self.audit_attrs = audit_attrs self.retention_days = retention_days @@ -80,7 +80,7 @@ class AuditLogMiddleware(BaseMiddlewareHandler): else None ) - self.collect.put_item( + self.collection.put_item( key=KeyPair( # Post-migration: remove `delimiter` and update prefix # from `log` to `logs` in ComposeKey. diff --git a/http-api/app/middlewares/tenant_middelware.py b/http-api/app/middlewares/tenant_middelware.py index af2ce2e..d87bafc 100644 --- a/http-api/app/middlewares/tenant_middelware.py +++ b/http-api/app/middlewares/tenant_middelware.py @@ -46,11 +46,11 @@ class TenantMiddleware(BaseMiddlewareHandler): def __init__( self, - collect: DynamoDBCollection, + collection: DynamoDBCollection, /, header: str = 'X-Tenant', ) -> None: - self.collect = collect + self.collection = collection self.header = header def handler( @@ -69,7 +69,7 @@ class TenantMiddleware(BaseMiddlewareHandler): tenant=_tenant( app.current_event.headers.get(self.header), app.context.get('user'), # type: ignore - collect=self.collect, + collection=self.collection, ) ) @@ -85,7 +85,7 @@ def _tenant( tenant_id: str | None, user: User, /, - collect: DynamoDBCollection, + collection: DynamoDBCollection, ) -> Tenant: """Get a Tenant instance based on the provided tenant_id and user's access permissions. @@ -96,7 +96,7 @@ def _tenant( The identifier of the tenant. Must not be None or empty. user : User The user attempting to access the tenant. - collect : DynamoDBCollection + collection : DynamoDBCollection The DynamoDB collection used to retrieve tenant information. Returns @@ -117,7 +117,7 @@ def _tenant( raise BadRequestError('Missing tenant') # Ensure user has ACL - collect.get_item( + collection.get_item( KeyPair(user.id, ComposeKey(tenant_id, prefix='acls')), exc_cls=ForbiddenError, ) @@ -126,5 +126,5 @@ def _tenant( if tenant_id == '*': return Tenant(id=tenant_id, name='default') - obj = collect.get_item(KeyPair(tenant_id, '0'), exc_cls=NotFoundError) + obj = collection.get_item(KeyPair(tenant_id, '0'), exc_cls=NotFoundError) return Tenant.model_validate(obj) diff --git a/http-api/app/routes/courses/__init__.py b/http-api/app/routes/courses/__init__.py index 2739405..d197e5e 100644 --- a/http-api/app/routes/courses/__init__.py +++ b/http-api/app/routes/courses/__init__.py @@ -22,8 +22,6 @@ router = Router() meili_client = Meilisearch(MEILISEARCH_HOST, MEILISEARCH_API_KEY) course_layer = DynamoDBPersistenceLayer(COURSE_TABLE, dynamodb_client) user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) -user_collect = DynamoDBCollection(user_layer) -course_collect = DynamoDBCollection(course_layer) @router.get( @@ -55,8 +53,8 @@ def get_courses(): compress=True, tags=['Course'], middlewares=[ - TenantMiddleware(user_collect), - AuditLogMiddleware('COURSE_ADD', user_collect, ('id', 'name')), + TenantMiddleware(user_layer.collection), + AuditLogMiddleware('COURSE_ADD', user_layer.collection, ('id', 'name')), ], ) def post_course(payload: Course): @@ -74,7 +72,7 @@ def post_course(payload: Course): @router.get('/', compress=True, tags=['Course']) def get_course(id: str): - return course_collect.get_item( + return course_layer.collection.get_item( KeyPair(id, '0'), exc_cls=NotFoundError, ) @@ -85,8 +83,8 @@ def get_course(id: str): compress=True, tags=['Course'], middlewares=[ - TenantMiddleware(user_collect), - AuditLogMiddleware('COURSE_UPDATE', user_collect, ('id', 'name')), + TenantMiddleware(user_layer.collection), + AuditLogMiddleware('COURSE_UPDATE', user_layer.collection, ('id', 'name')), ], ) def put_course(id: str, payload: Course): diff --git a/http-api/app/routes/enrollments/cancel.py b/http-api/app/routes/enrollments/cancel.py new file mode 100644 index 0000000..681bed1 --- /dev/null +++ b/http-api/app/routes/enrollments/cancel.py @@ -0,0 +1,55 @@ +from aws_lambda_powertools.event_handler.api_gateway import Router +from elasticsearch import Elasticsearch +from layercake.dynamodb import ( + DynamoDBCollection, + DynamoDBPersistenceLayer, + KeyPair, +) +from pydantic import UUID4, BaseModel + +from boto3clients import dynamodb_client +from config import ELASTIC_CONN, ENROLLMENT_TABLE, USER_TABLE +from middlewares.audit_log_middleware import AuditLogMiddleware +from middlewares.authentication_middleware import User +from rules.enrollment import set_status_as_canceled + +from .vacancies import router as vacancies + +__all__ = ['vacancies'] + + +router = Router() +elastic_client = Elasticsearch(**ELASTIC_CONN) +enrollment_layer = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client) +user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) +user_collect = DynamoDBCollection(user_layer) + + +class Cancel(BaseModel): + id: UUID4 | str + lock_hash: str + course: dict = {} + vacancy: dict = {} + + +@router.patch( + '//cancel', + compress=True, + tags=['Enrollment'], + middlewares=[ + AuditLogMiddleware('ENROLLMENT_CANCEL', user_collect, ('id', 'course')) + ], +) +def cancel(id: str, payload: Cancel): + user: User = router.context['user'] + + set_status_as_canceled( + id, + lock_hash=payload.lock_hash, + author=user.model_dump(), # type: ignore + course=payload.course, # type: ignore + vacancy_key=KeyPair.parse_obj(payload.vacancy), + persistence_layer=enrollment_layer, + ) + + return payload diff --git a/http-api/app/routes/enrollments/enroll.py b/http-api/app/routes/enrollments/enroll.py new file mode 100644 index 0000000..00d9f1a --- /dev/null +++ b/http-api/app/routes/enrollments/enroll.py @@ -0,0 +1,64 @@ +from datetime import datetime + +from aws_lambda_powertools.event_handler.api_gateway import Router +from layercake.batch import BatchProcessor +from layercake.dynamodb import ( + DynamoDBCollection, + DynamoDBPersistenceLayer, +) +from pydantic import BaseModel + +from boto3clients import dynamodb_client +from config import ( + ENROLLMENT_TABLE, + USER_TABLE, +) +from middlewares import Tenant, TenantMiddleware +from models import Course, User + +router = Router() + +enrollment_layer = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client) +user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) +user_collect = DynamoDBCollection(user_layer) +enrollment_collect = DynamoDBCollection(enrollment_layer) +processor = BatchProcessor() + + +class Item(BaseModel): + user: User + course: Course + schedule_date: datetime | None = None + + +class Payload(BaseModel): + items: tuple[Item, ...] + + +@router.post( + '/', + compress=True, + tags=['Enrollment'], + middlewares=[ + TenantMiddleware(user_collect), + ], +) +def enroll_(payload: Payload): + context = {'tenant': router.context['tenant']} + + with processor(payload.items, handler, context): + processor.process() + + return {} + + +def handler(record: Item, context: dict): + tenant: Tenant = context['tenant'] + # enroll( + # enrollment=Enrollment(user=[]) + # tenant={ + # 'id': str(tenant.id), + # 'name': tenant.name, + # }, + # persistence_layer=enrollment_layer, + # ) diff --git a/http-api/app/routes/settings/__init__.py b/http-api/app/routes/settings/__init__.py index 69539bb..a048b65 100644 --- a/http-api/app/routes/settings/__init__.py +++ b/http-api/app/routes/settings/__init__.py @@ -1,6 +1,5 @@ from aws_lambda_powertools.event_handler.api_gateway import Router from layercake.dynamodb import ( - DynamoDBCollection, DynamoDBPersistenceLayer, KeyPair, PrefixKey, @@ -13,7 +12,6 @@ from middlewares import User router = Router() user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) -user_collect = DynamoDBCollection(user_layer) LIMIT = 25 @@ -22,11 +20,11 @@ LIMIT = 25 @router.get('/', include_in_schema=False) def settings(): user: User = router.context['user'] - acls = user_collect.query( + acls = user_layer.collection.query( KeyPair(user.id, PrefixKey('acls')), limit=LIMIT, ) - tenants = user_collect.query( + tenants = user_layer.collection.query( KeyPair(user.id, PrefixKey('orgs')), limit=LIMIT, ) diff --git a/http-api/app/routes/users/orgs.py b/http-api/app/routes/users/orgs.py index 7f072a4..b5dac5f 100644 --- a/http-api/app/routes/users/orgs.py +++ b/http-api/app/routes/users/orgs.py @@ -5,7 +5,6 @@ from aws_lambda_powertools.event_handler.exceptions import ( BadRequestError as PowertoolsBadRequestError, ) from layercake.dynamodb import ( - DynamoDBCollection, DynamoDBPersistenceLayer, KeyPair, MissingError, @@ -26,7 +25,6 @@ class BadRequestError(MissingError, PowertoolsBadRequestError): ... router = Router() user_layer = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) -user_collect = DynamoDBCollection(user_layer, exc_cls=BadRequestError) @router.get( @@ -36,7 +34,7 @@ user_collect = DynamoDBCollection(user_layer, exc_cls=BadRequestError) summary='Get user orgs', ) def get_orgs(id: str): - return user_collect.query( + return user_layer.collection.query( KeyPair(id, PrefixKey('orgs')), start_key=router.current_event.get_query_string_value('start_key', None), ) @@ -54,7 +52,9 @@ class Unassign(BaseModel): tags=['User'], summary='Delete user org', middlewares=[ - AuditLogMiddleware('UNASSIGN_ORG', user_collect, ('id', 'name', 'cnpj')) + AuditLogMiddleware( + 'UNASSIGN_ORG', user_layer.collection, ('id', 'name', 'cnpj') + ) ], ) def delete_org(id: str, payload: Unassign): diff --git a/http-api/app/rules/course.py b/http-api/app/rules/course.py index e35b50d..6d965e7 100644 --- a/http-api/app/rules/course.py +++ b/http-api/app/rules/course.py @@ -1,5 +1,5 @@ from layercake.dateutils import now -from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, TransactItems +from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair from models import Course, Org @@ -11,25 +11,26 @@ def create_course( persistence_layer: DynamoDBPersistenceLayer, ): now_ = now() - transact = TransactItems(persistence_layer.table_name) - transact.put( - item={ - 'sk': '0', - 'metadata__tenant_id': org.id, - 'create_date': now_, - **course.model_dump(), - } - ) - transact.put( - item={ - 'id': course.id, - 'sk': 'metadata#tenant', - 'tenant_id': f'ORG#{org.id}', - 'name': org.name, - 'create_date': now_, - } - ) - return persistence_layer.transact_write_items(transact) + with persistence_layer.transact_writer() as transact: + transact.put( + item={ + 'sk': '0', + 'metadata__tenant_id': org.id, + 'create_date': now_, + **course.model_dump(), + } + ) + transact.put( + item={ + 'id': course.id, + 'sk': 'metadata#tenant', + 'tenant_id': f'ORG#{org.id}', + 'name': org.name, + 'create_date': now_, + } + ) + + return True def update_course( @@ -39,20 +40,20 @@ def update_course( persistence_layer: DynamoDBPersistenceLayer, ): now_ = now() - transact = TransactItems(persistence_layer.table_name) - transact.update( - key=KeyPair(id, '0'), - update_expr='SET #name = :name, access_period = :access_period, \ - cert = :cert, update_date = :update_date', - expr_attr_names={ - '#name': 'name', - }, - expr_attr_values={ - ':name': course.name, - ':cert': course.cert.model_dump() if course.cert else None, - ':access_period': course.access_period, - ':update_date': now_, - }, - cond_expr='attribute_exists(sk)', - ) - return persistence_layer.transact_write_items(transact) + with persistence_layer.transact_writer() as transact: + transact.update( + key=KeyPair(id, '0'), + update_expr='SET #name = :name, access_period = :access_period, \ + cert = :cert, update_date = :update_date', + expr_attr_names={ + '#name': 'name', + }, + expr_attr_values={ + ':name': course.name, + ':cert': course.cert.model_dump() if course.cert else None, + ':access_period': course.access_period, + ':update_date': now_, + }, + cond_expr='attribute_exists(sk)', + ) + return True diff --git a/http-api/app/rules/enrollment.py b/http-api/app/rules/enrollment.py index 8cecec3..69a886b 100644 --- a/http-api/app/rules/enrollment.py +++ b/http-api/app/rules/enrollment.py @@ -4,7 +4,7 @@ from typing import TypedDict from uuid import uuid4 from layercake.dateutils import now, ttl -from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, TransactItems +from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair from layercake.strutils import md5_hash from config import ORDER_TABLE @@ -64,97 +64,97 @@ def enroll( course = enrollment.course tenant_id = tenant['id'] - transact = TransactItems(persistence_layer.table_name) - transact.put( - item={ - 'sk': '0', - 'create_date': now_, - 'metadata__tenant_id': tenant_id, - 'metadata__related_ids': {tenant_id, user.id}, - **enrollment.model_dump(), - }, - ) - transact.put( - item={ - 'id': enrollment.id, - 'sk': 'metadata#tenant', - 'tenant_id': f'ORG#{tenant_id}', - 'name': tenant['name'], - 'create_date': now_, - }, - ) - transact.put( - item={ - 'id': enrollment.id, - 'sk': LifecycleEvents.REMINDER_NO_ACCESS_3_DAYS, - 'name': user.name, - 'email': user.email, - 'course': course.name, - 'create_date': now_, - 'ttl': ttl(days=3, start_dt=now_), - }, - ) - transact.put( - item={ - 'id': enrollment.id, - 'sk': LifecycleEvents.ACCESS_PERIOD_REMINDER_30_DAYS, - 'name': user.name, - 'email': user.email, - 'course': course.name, - 'create_date': now_, - 'ttl': ttl(start_dt=now_ + timedelta(days=course.access_period - 30)), - }, - ) - transact.put( - item={ - 'id': enrollment.id, - 'sk': LifecycleEvents.COURSE_EXPIRED, - 'name': user.name, - 'email': user.email, - 'course': course.name, - 'create_date': now_, - 'ttl': ttl(start_dt=now_ + timedelta(days=course.access_period)), - }, - ) - - # Prevents the user from enrolling in the same course again until - # the deduplication window expires or is removed - if deduplication_window: - lock_hash = md5_hash('%s%s' % (user.id, course.id)) - offset_days = deduplication_window['offset_days'] - ttl_expiration = ttl( - start_dt=now_ + timedelta(days=course.access_period - offset_days) - ) + with persistence_layer.transact_writer() as transact: transact.put( item={ - 'id': 'lock', - 'sk': lock_hash, - 'enrollment_id': enrollment.id, + 'sk': '0', 'create_date': now_, - 'ttl': ttl_expiration, + 'metadata__tenant_id': tenant_id, + 'metadata__related_ids': {tenant_id, user.id}, + **enrollment.model_dump(), }, - cond_expr='attribute_not_exists(sk)', ) transact.put( item={ 'id': enrollment.id, - 'sk': 'metadata#lock', - 'hash': lock_hash, + 'sk': 'metadata#tenant', + 'tenant_id': f'ORG#{tenant_id}', + 'name': tenant['name'], 'create_date': now_, - 'ttl': ttl_expiration, }, ) - # Deduplication window can be recalculated if needed transact.put( item={ 'id': enrollment.id, - 'sk': 'metadata#deduplication_window', - 'offset_days': offset_days, + 'sk': LifecycleEvents.REMINDER_NO_ACCESS_3_DAYS, + 'name': user.name, + 'email': user.email, + 'course': course.name, 'create_date': now_, + 'ttl': ttl(days=3, start_dt=now_), + }, + ) + transact.put( + item={ + 'id': enrollment.id, + 'sk': LifecycleEvents.ACCESS_PERIOD_REMINDER_30_DAYS, + 'name': user.name, + 'email': user.email, + 'course': course.name, + 'create_date': now_, + 'ttl': ttl(start_dt=now_ + timedelta(days=course.access_period - 30)), + }, + ) + transact.put( + item={ + 'id': enrollment.id, + 'sk': LifecycleEvents.COURSE_EXPIRED, + 'name': user.name, + 'email': user.email, + 'course': course.name, + 'create_date': now_, + 'ttl': ttl(start_dt=now_ + timedelta(days=course.access_period)), }, ) - return persistence_layer.transact_write_items(transact) + # Prevents the user from enrolling in the same course again until + # the deduplication window expires or is removed + if deduplication_window: + lock_hash = md5_hash('%s%s' % (user.id, course.id)) + offset_days = deduplication_window['offset_days'] + ttl_expiration = ttl( + start_dt=now_ + timedelta(days=course.access_period - offset_days) + ) + transact.put( + item={ + 'id': 'lock', + 'sk': lock_hash, + 'enrollment_id': enrollment.id, + 'create_date': now_, + 'ttl': ttl_expiration, + }, + cond_expr='attribute_not_exists(sk)', + ) + transact.put( + item={ + 'id': enrollment.id, + 'sk': 'metadata#lock', + 'hash': lock_hash, + 'create_date': now_, + 'ttl': ttl_expiration, + }, + ) + # Deduplication window can be recalculated if needed + transact.put( + item={ + 'id': enrollment.id, + 'sk': 'metadata#deduplication_window', + 'offset_days': offset_days, + 'create_date': now_, + }, + ) + + return True def set_status_as_canceled( @@ -169,73 +169,74 @@ def set_status_as_canceled( """Cancel the enrollment if there's a `cancel_policy` and put its vacancy back if `vacancy_key` is provided.""" now_ = now() - transact = TransactItems(persistence_layer.table_name) - transact.update( - key=KeyPair(id, '0'), - update_expr='SET #status = :canceled, update_date = :update', - expr_attr_names={ - '#status': 'status', - }, - expr_attr_values={ - ':canceled': 'CANCELED', - ':update': now_, - }, - ) - transact.put( - item={ - 'id': id, - 'sk': 'canceled_date', - 'author': author, - 'create_date': now_, - }, - ) - transact.delete( - key=KeyPair(id, 'cancel_policy'), - cond_expr='attribute_exists(sk)', - ) - # Remove schedules lifecycle events, referencies and locks - transact.delete(key=KeyPair(id, 'schedules#archive_it')) - transact.delete(key=KeyPair(id, 'schedules#no_activity')) - transact.delete(key=KeyPair(id, 'schedules#access_period_ends')) - transact.delete(key=KeyPair(id, 'schedules#does_not_access')) - transact.delete(key=KeyPair(id, 'parent_vacancy')) - transact.delete(key=KeyPair(id, 'lock')) - transact.delete(key=KeyPair('lock', lock_hash)) - if vacancy_key and course: - vacancy_pk, vacancy_sk = vacancy_key.values() - org_id = vacancy_pk.removeprefix('vacancies#') - order_id, enrollment_id = vacancy_sk.split('#') - - transact.condition( - key=KeyPair(order_id, '0'), - cond_expr='attribute_exists(id)', - table_name=ORDER_TABLE, - ) - # Put the vacancy back and assign a new ID - transact.put( - item={ - 'id': f'vacancies#{org_id}', - 'sk': f'{order_id}#{uuid4()}', - 'course': course, - 'create_date': now_, - }, - cond_expr='attribute_not_exists(sk)', - ) - # Set the status of `generated_items` to `ROLLBACK` to know - # which vacancy is available for reuse + with persistence_layer.transact_writer() as transact: transact.update( - key=KeyPair(order_id, f'generated_items#{enrollment_id}'), - update_expr='SET #status = :status, update_date = :update', + key=KeyPair(id, '0'), + update_expr='SET #status = :canceled, update_date = :update', expr_attr_names={ '#status': 'status', }, expr_attr_values={ - ':status': 'ROLLBACK', + ':canceled': 'CANCELED', ':update': now_, }, - cond_expr='attribute_exists(sk)', - table_name=ORDER_TABLE, ) + transact.put( + item={ + 'id': id, + 'sk': 'canceled_date', + 'author': author, + 'create_date': now_, + }, + ) + transact.delete( + key=KeyPair(id, 'cancel_policy'), + cond_expr='attribute_exists(sk)', + ) + # Remove schedules lifecycle events, referencies and locks + transact.delete(key=KeyPair(id, 'schedules#archive_it')) + transact.delete(key=KeyPair(id, 'schedules#no_activity')) + transact.delete(key=KeyPair(id, 'schedules#access_period_ends')) + transact.delete(key=KeyPair(id, 'schedules#does_not_access')) + transact.delete(key=KeyPair(id, 'parent_vacancy')) + transact.delete(key=KeyPair(id, 'lock')) + transact.delete(key=KeyPair('lock', lock_hash)) - return persistence_layer.transact_write_items(transact) + if vacancy_key and course: + vacancy_pk, vacancy_sk = vacancy_key.values() + org_id = vacancy_pk.removeprefix('vacancies#') + order_id, enrollment_id = vacancy_sk.split('#') + + transact.condition( + key=KeyPair(order_id, '0'), + cond_expr='attribute_exists(id)', + table_name=ORDER_TABLE, + ) + # Put the vacancy back and assign a new ID + transact.put( + item={ + 'id': f'vacancies#{org_id}', + 'sk': f'{order_id}#{uuid4()}', + 'course': course, + 'create_date': now_, + }, + cond_expr='attribute_not_exists(sk)', + ) + # Set the status of `generated_items` to `ROLLBACK` to know + # which vacancy is available for reuse + transact.update( + key=KeyPair(order_id, f'generated_items#{enrollment_id}'), + update_expr='SET #status = :status, update_date = :update', + expr_attr_names={ + '#status': 'status', + }, + expr_attr_values={ + ':status': 'ROLLBACK', + ':update': now_, + }, + cond_expr='attribute_exists(sk)', + table_name=ORDER_TABLE, + ) + + return True diff --git a/http-api/app/rules/org.py b/http-api/app/rules/org.py index d8e1cfe..38dff5b 100644 --- a/http-api/app/rules/org.py +++ b/http-api/app/rules/org.py @@ -1,5 +1,5 @@ from layercake.dateutils import now -from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, TransactItems +from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair def update_policies( @@ -11,30 +11,30 @@ def update_policies( persistence_layer: DynamoDBPersistenceLayer, ): now_ = now() - transact = TransactItems(persistence_layer.table_name) - if payment_policy: - transact.put( - item={ - 'id': id, - 'sk': 'metadata#payment_policy', - 'create_date': now_, - } - | payment_policy - ) - else: - transact.delete(key=KeyPair(id, 'metadata#payment_policy')) + with persistence_layer.transact_writer() as transact: + if payment_policy: + transact.put( + item={ + 'id': id, + 'sk': 'metadata#payment_policy', + 'create_date': now_, + } + | payment_policy + ) + else: + transact.delete(key=KeyPair(id, 'metadata#payment_policy')) - if billing_policy: - transact.put( - item={ - 'id': id, - 'sk': 'metadata#billing_policy', - 'create_date': now_, - } - | billing_policy - ) - else: - transact.delete(key=KeyPair(id, 'metadata#billing_policy')) + if billing_policy: + transact.put( + item={ + 'id': id, + 'sk': 'metadata#billing_policy', + 'create_date': now_, + } + | billing_policy + ) + else: + transact.delete(key=KeyPair(id, 'metadata#billing_policy')) - return persistence_layer.transact_write_items(transact) + return True diff --git a/http-api/app/rules/user.py b/http-api/app/rules/user.py index 65a7535..7c96b86 100644 --- a/http-api/app/rules/user.py +++ b/http-api/app/rules/user.py @@ -10,71 +10,68 @@ from layercake.dynamodb import ( ComposeKey, DynamoDBPersistenceLayer, KeyPair, - TransactItems, ) User = TypedDict('User', {'id': str, 'name': str, 'cpf': str}) def update_user( - userdata: User, + data: User, /, *, persistence_layer: DynamoDBPersistenceLayer, ) -> bool: now_ = now() - ttl_ = now_ + timedelta(hours=24) - user = SimpleNamespace(**userdata) + user = SimpleNamespace(**data) # Get the user's CPF, if it exists. old_cpf = persistence_layer.get_item(KeyPair(user.id, '0')).get('cpf', None) - transact = TransactItems(persistence_layer.table_name) - transact.update( - key=KeyPair(user.id, '0'), - update_expr='SET #name = :name, cpf = :cpf, update_date = :update_date', - expr_attr_names={ - '#name': 'name', - }, - expr_attr_values={ - ':name': user.name, - ':cpf': user.cpf, - ':update_date': now_, - }, - cond_expr='attribute_exists(sk)', - ) - # Prevent the user from updating more than once every 24 hours - transact.put( - item={ - 'id': user.id, - 'sk': 'last_profile_edit', - 'create_date': now_, - 'ttl': ttl(start_dt=ttl_), - 'ttl_date': ttl_, - }, - cond_expr='attribute_not_exists(sk)', - ) - - class CPFConflictError(BadRequestError): - def __init__(self, msg: str): - super().__init__('Cpf already exists') - - if user.cpf != old_cpf: + with persistence_layer.transact_writer() as transact: + transact.update( + key=KeyPair(user.id, '0'), + update_expr='SET #name = :name, cpf = :cpf, update_date = :update_date', + expr_attr_names={ + '#name': 'name', + }, + expr_attr_values={ + ':name': user.name, + ':cpf': user.cpf, + ':update_date': now_, + }, + cond_expr='attribute_exists(sk)', + ) + # Prevent the user from updating more than once every 24 hours transact.put( item={ - 'id': 'cpf', - 'sk': user.cpf, - 'user_id': user.id, + 'id': user.id, + 'sk': 'last_profile_edit', 'create_date': now_, + 'ttl': ttl(start_dt=now_ + timedelta(hours=24)), }, cond_expr='attribute_not_exists(sk)', - exc_cls=CPFConflictError, ) - # Ensures that the old CPF is discarded - if old_cpf: - transact.delete(key=KeyPair('cpf', old_cpf)) + class CPFConflictError(BadRequestError): + def __init__(self, msg: str): + super().__init__('Cpf already exists') - return persistence_layer.transact_write_items(transact) + if user.cpf != old_cpf: + transact.put( + item={ + 'id': 'cpf', + 'sk': user.cpf, + 'user_id': user.id, + 'create_date': now_, + }, + cond_expr='attribute_not_exists(sk)', + exc_cls=CPFConflictError, + ) + + # Ensures that the old CPF is discarded + if old_cpf: + transact.delete(key=KeyPair('cpf', old_cpf)) + + return True def add_email( @@ -85,41 +82,42 @@ def add_email( persistence_layer: DynamoDBPersistenceLayer, ): now_ = now() - transact = TransactItems(persistence_layer.table_name) - transact.update( - key=KeyPair(id, '0'), - update_expr='ADD emails :email', - expr_attr_values={ - ':email': {email}, - }, - ) - transact.put( - item={ - 'id': id, - 'sk': f'emails#{email}', - 'email_primary': False, - 'email_verified': False, - 'create_date': now_, - }, - cond_expr='attribute_not_exists(sk)', - ) - class EmailConflictError(BadRequestError): - def __init__(self, msg: str): - super().__init__('Email already exists') + with persistence_layer.transact_writer() as transact: + transact.update( + key=KeyPair(id, '0'), + update_expr='ADD emails :email', + expr_attr_values={ + ':email': {email}, + }, + ) + transact.put( + item={ + 'id': id, + 'sk': f'emails#{email}', + 'email_primary': False, + 'email_verified': False, + 'create_date': now_, + }, + cond_expr='attribute_not_exists(sk)', + ) - transact.put( - item={ - 'id': 'email', - 'sk': email, - 'user_id': id, - 'create_date': now_, - }, - cond_expr='attribute_not_exists(sk)', - exc_cls=EmailConflictError, - ) + class EmailConflictError(BadRequestError): + def __init__(self, msg: str): + super().__init__('Email already exists') - return persistence_layer.transact_write_items(transact) + transact.put( + item={ + 'id': 'email', + 'sk': email, + 'user_id': id, + 'create_date': now_, + }, + cond_expr='attribute_not_exists(sk)', + exc_cls=EmailConflictError, + ) + + return True def del_email( @@ -130,25 +128,24 @@ def del_email( persistence_layer: DynamoDBPersistenceLayer, ) -> bool: """Delete any email except the primary email.""" - transact = TransactItems(persistence_layer.table_name) - transact.delete( - key=KeyPair('email', email), - ) - transact.delete( - key=KeyPair(id, ComposeKey(email, prefix='emails')), - cond_expr='email_primary <> :primary', - expr_attr_values={':primary': True}, - exc_cls=BadRequestError, - ) - transact.update( - key=KeyPair(id, '0'), - update_expr='DELETE emails :email', - expr_attr_values={ - ':email': {email}, - }, - ) - - return persistence_layer.transact_write_items(transact) + with persistence_layer.transact_writer() as transact: + transact.delete( + key=KeyPair('email', email), + ) + transact.delete( + key=KeyPair(id, ComposeKey(email, prefix='emails')), + cond_expr='email_primary <> :primary', + expr_attr_values={':primary': True}, + exc_cls=BadRequestError, + ) + transact.update( + key=KeyPair(id, '0'), + update_expr='DELETE emails :email', + expr_attr_values={ + ':email': {email}, + }, + ) + return True def set_email_as_primary( @@ -162,37 +159,38 @@ def set_email_as_primary( ): now_ = now() expr = 'SET email_primary = :email_primary, update_date = :update_date' - transact = TransactItems(persistence_layer.table_name) - # Set the old email as non-primary - transact.update( - key=KeyPair(id, ComposeKey(old_email, 'emails')), - update_expr=expr, - expr_attr_values={ - ':email_primary': False, - ':update_date': now_, - }, - ) - # Set the new email as primary - transact.update( - key=KeyPair(id, ComposeKey(new_email, 'emails')), - update_expr=expr, - expr_attr_values={ - ':email_primary': True, - ':update_date': now_, - }, - ) - transact.update( - key=KeyPair(id, '0'), - update_expr='SET email = :email, email_verified = :email_verified, \ - update_date = :update_date', - expr_attr_values={ - ':email': new_email, - ':email_verified': email_verified, - ':update_date': now_, - }, - ) - return persistence_layer.transact_write_items(transact) + with persistence_layer.transact_writer() as transact: + # Set the old email as non-primary + transact.update( + key=KeyPair(id, ComposeKey(old_email, 'emails')), + update_expr=expr, + expr_attr_values={ + ':email_primary': False, + ':update_date': now_, + }, + ) + # Set the new email as primary + transact.update( + key=KeyPair(id, ComposeKey(new_email, 'emails')), + update_expr=expr, + expr_attr_values={ + ':email_primary': True, + ':update_date': now_, + }, + ) + transact.update( + key=KeyPair(id, '0'), + update_expr='SET email = :email, email_verified = :email_verified, \ + update_date = :update_date', + expr_attr_values={ + ':email': new_email, + ':email_verified': email_verified, + ':update_date': now_, + }, + ) + + return True def del_org_member( @@ -201,20 +199,19 @@ def del_org_member( org_id: str, persistence_layer: DynamoDBPersistenceLayer, ) -> bool: - transact = TransactItems(persistence_layer.table_name) + with persistence_layer.transact_writer() as transact: + # Remove the user's relationship with the organization and their privileges + transact.delete(key=KeyPair(id, f'acls#{org_id}')) + transact.delete(key=KeyPair(id, f'orgs#{org_id}')) + transact.update( + key=KeyPair(id, '0'), + update_expr='DELETE #tenant :org_id', + expr_attr_names={'#tenant': 'tenant__org_id'}, + expr_attr_values={':org_id': {org_id}}, + ) - # Remove the user's relationship with the organization and their privileges - transact.delete(key=KeyPair(id, f'acls#{org_id}')) - transact.delete(key=KeyPair(id, f'orgs#{org_id}')) - transact.update( - key=KeyPair(id, '0'), - update_expr='DELETE #tenant :org_id', - expr_attr_names={'#tenant': 'tenant__org_id'}, - expr_attr_values={':org_id': {org_id}}, - ) + # Remove the user from the organization's admins and members list + transact.delete(key=KeyPair(org_id, f'admins#{id}')) + transact.delete(key=KeyPair(f'orgmembers#{org_id}', id)) - # Remove the user from the organization's admins and members list - transact.delete(key=KeyPair(org_id, f'admins#{id}')) - transact.delete(key=KeyPair(f'orgmembers#{org_id}', id)) - - return persistence_layer.transact_write_items(transact) + return True diff --git a/http-api/tests/routes/test_courses.py b/http-api/tests/routes/test_courses.py index 34a531e..87d253e 100644 --- a/http-api/tests/routes/test_courses.py +++ b/http-api/tests/routes/test_courses.py @@ -3,7 +3,6 @@ from http import HTTPMethod, HTTPStatus from layercake.dynamodb import ( ComposeKey, - DynamoDBCollection, DynamoDBPersistenceLayer, KeyPair, PartitionKey, @@ -58,8 +57,8 @@ def test_post_course( assert 'id' in json.loads(r['body']) assert r['statusCode'] == HTTPStatus.CREATED - collect = DynamoDBCollection(dynamodb_persistence_layer) - logs = collect.query( + collection = dynamodb_persistence_layer.collection + logs = collection.query( PartitionKey( ComposeKey('5OxmMjL-ujoR5IMGegQz', prefix='log', delimiter=':'), ) @@ -92,6 +91,6 @@ def test_put_course( ) assert r['statusCode'] == HTTPStatus.OK - collect = DynamoDBCollection(dynamodb_persistence_layer) - course = collect.get_item(KeyPair('90d7f0d2-d9a4-4467-a31c-f9a7955964cf', '0')) + collection = dynamodb_persistence_layer.collection + course = collection.get_item(KeyPair('90d7f0d2-d9a4-4467-a31c-f9a7955964cf', '0')) assert course['name'] == 'pytest' diff --git a/http-api/uv.lock b/http-api/uv.lock index ea1cb0c..4319431 100644 --- a/http-api/uv.lock +++ b/http-api/uv.lock @@ -522,7 +522,7 @@ wheels = [ [[package]] name = "layercake" -version = "0.4.0" +version = "0.6.2" source = { directory = "../layercake" } dependencies = [ { name = "arnparse" }, diff --git a/layercake/layercake/dynamodb.py b/layercake/layercake/dynamodb.py index e75029e..1850d17 100644 --- a/layercake/layercake/dynamodb.py +++ b/layercake/layercake/dynamodb.py @@ -339,10 +339,10 @@ class TransactionCanceledException(Exception): class TransactOperation: def __init__( self, - op: dict, + operation: dict, exc_cls: type[Exception] | None = None, ) -> None: - self.op = op + self.operation = operation self.exc_cls = exc_cls @@ -352,23 +352,27 @@ else: DynamoDBClient = object -class TransactItems: +class TransactWriter: def __init__( self, table_name: str, + *, + flush_amount: int, client: DynamoDBClient, ) -> None: self._table_name = table_name - self._operations: list[TransactOperation] = [] + self._items_buffer: list[TransactOperation] = [] + self._flush_amount = flush_amount self._client = client def __enter__(self) -> Self: - """Remove operations from previous execution.""" - self._operations.clear() return self - def __exit__(self, exc_type, exc_val, exc_tb) -> bool: - return False + def __exit__(self, *exc_details) -> None: + # When we exit, we need to keep flushing whatever's left + # until there's nothing left in our items buffer. + while self._items_buffer: + self._flush() def put( self, @@ -386,7 +390,7 @@ class TransactItems: if not table_name: table_name = self._table_name - self._operations.append( + self._add_op_and_process( TransactOperation( { 'Put': dict( @@ -406,8 +410,8 @@ class TransactItems: update_expr: str, cond_expr: str | None = None, table_name: str | None = None, - expr_attr_names: dict = {}, - expr_attr_values: dict = {}, + expr_attr_names: dict | None = None, + expr_attr_values: dict | None = None, exc_cls: Type[Exception] | None = None, ) -> None: attrs: dict = {} @@ -424,7 +428,7 @@ class TransactItems: if not table_name: table_name = self._table_name - self._operations.append( + self._add_op_and_process( TransactOperation( { 'Update': dict( @@ -438,41 +442,14 @@ class TransactItems: ) ) - def get( - self, - *, - table_name: str | None = None, - key: dict, - expr_attr_names: str | None = None, - ) -> None: - attrs: dict = {} - - if expr_attr_names: - attrs['ExpressionAttributeNames'] = expr_attr_names - - if not table_name: - table_name = self._table_name - - self._operations.append( - TransactOperation( - { - 'Get': dict( - TableName=table_name, - Key=serialize(key), - **attrs, - ) - } - ), - ) - def delete( self, *, key: dict, table_name: str | None = None, cond_expr: str | None = None, - expr_attr_names: dict = {}, - expr_attr_values: dict = {}, + expr_attr_names: dict | None = None, + expr_attr_values: dict | None = None, exc_cls: Type[Exception] | None = None, ) -> None: attrs: dict = {} @@ -489,7 +466,7 @@ class TransactItems: if not table_name: table_name = self._table_name - self._operations.append( + self._add_op_and_process( TransactOperation( { 'Delete': dict( @@ -508,8 +485,8 @@ class TransactItems: key: dict, cond_expr: str, table_name: str | None = None, - expr_attr_names: dict = {}, - expr_attr_values: dict = {}, + expr_attr_names: dict | None = None, + expr_attr_values: dict | None = None, exc_cls: Type[Exception] | None = None, ) -> None: attrs: dict = {'ConditionExpression': cond_expr} @@ -523,7 +500,7 @@ class TransactItems: if not table_name: table_name = self._table_name - self._operations.append( + self._add_op_and_process( TransactOperation( { 'ConditionCheck': dict( @@ -536,13 +513,21 @@ class TransactItems: ) ) - def write_items(self) -> bool: - operations = self._operations.copy() - self._operations.clear() + def _add_op_and_process(self, op: TransactOperation) -> None: + self._items_buffer.append(op) + self._flush_if_needed() + + def _flush_if_needed(self) -> None: + if len(self._items_buffer) >= self._flush_amount: + self._flush() + + def _flush(self) -> bool: + items_to_send = self._items_buffer[: self._flush_amount] + self._items_buffer = self._items_buffer[self._flush_amount :] try: self._client.transact_write_items( - TransactItems=[item.op for item in operations] # type: ignore + TransactItems=[item.operation for item in items_to_send] # type: ignore ) except ClientError as err: error_msg = glom(err, 'response.Error.Message', default='') @@ -553,7 +538,7 @@ class TransactItems: if 'Message' not in reason: continue - item = operations[idx] + item = items_to_send[idx] if item.exc_cls: raise item.exc_cls(error_msg) @@ -562,7 +547,7 @@ class TransactItems: { 'code': reason.get('Code'), 'message': reason.get('Message'), - 'operation': item.op, + 'operation': item.operation, } ) @@ -570,32 +555,11 @@ class TransactItems: else: return True - def get_items(self) -> list[dict[str, Any]]: - operations = self._operations.copy() - self._operations.clear() - - try: - response = self._client.transact_get_items( - TransactItems=[item.op for item in operations] # type: ignore - ) - except ClientError as err: - logger.exception(err) - raise - else: - return [ - deserialize(response.get('Item', {})) - for response in response.get('Responses', []) - ] - class DynamoDBPersistenceLayer: def __init__(self, table_name: str, client: DynamoDBClient) -> None: - self._table_name = table_name - self._client = client - - @property - def collect(self) -> 'DynamoDBCollection': - return DynamoDBCollection(self) + self.table_name = table_name + self.client = client def query( self, @@ -625,7 +589,7 @@ class DynamoDBPersistenceLayer: - https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/query.html """ attrs: dict = { - 'TableName': self._table_name, + 'TableName': self.table_name, 'KeyConditionExpression': key_cond_expr, 'ScanIndexForward': index_forward, } @@ -646,7 +610,7 @@ class DynamoDBPersistenceLayer: attrs['Limit'] = limit try: - response = self._client.query(**attrs) + response = self.client.query(**attrs) except ClientError as err: logger.info(attrs) logger.exception(err) @@ -665,12 +629,12 @@ class DynamoDBPersistenceLayer: there will be no Item element in the response. """ attrs = { - 'TableName': self._table_name, + 'TableName': self.table_name, 'Key': serialize(key), } try: - response = self._client.get_item(**attrs) + response = self.client.get_item(**attrs) except ClientError as err: logger.info(attrs) logger.exception(err) @@ -680,7 +644,7 @@ class DynamoDBPersistenceLayer: def put_item(self, item: dict, *, cond_expr: str | None = None) -> bool: attrs = { - 'TableName': self._table_name, + 'TableName': self.table_name, 'Item': serialize(item), } @@ -688,7 +652,7 @@ class DynamoDBPersistenceLayer: attrs['ConditionExpression'] = cond_expr try: - self._client.put_item(**attrs) + self.client.put_item(**attrs) except ClientError as err: logger.info(attrs) logger.exception(err) @@ -706,7 +670,7 @@ class DynamoDBPersistenceLayer: expr_attr_values: dict | None = None, ) -> bool: attrs: dict = { - 'TableName': self._table_name, + 'TableName': self.table_name, 'Key': serialize(key), 'UpdateExpression': update_expr, } @@ -721,7 +685,7 @@ class DynamoDBPersistenceLayer: attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) try: - self._client.update_item(**attrs) + self.client.update_item(**attrs) except ClientError as err: logger.info(attrs) logger.exception(err) @@ -742,7 +706,7 @@ class DynamoDBPersistenceLayer: or if it has an expected attribute value. """ attrs: dict = { - 'TableName': self._table_name, + 'TableName': self.table_name, 'Key': serialize(key), } @@ -756,7 +720,7 @@ class DynamoDBPersistenceLayer: attrs['ExpressionAttributeValues'] = serialize(expr_attr_values) try: - self._client.delete_item(**attrs) + self.client.delete_item(**attrs) except ClientError as err: logger.info(attrs) logger.exception(err) @@ -764,8 +728,16 @@ class DynamoDBPersistenceLayer: else: return True - def transact_items(self) -> TransactItems: - return TransactItems(table_name=self._table_name, client=self._client) + @property + def collection(self) -> 'DynamoDBCollection': + return DynamoDBCollection(self) + + def transact_writer(self, flush_amount: int = 50) -> TransactWriter: + return TransactWriter( + table_name=self.table_name, + client=self.client, + flush_amount=flush_amount, + ) def batch_writer( self, @@ -797,8 +769,8 @@ class DynamoDBPersistenceLayer: DynamoDB.Table.batch_writer https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/table/batch_writer.html#DynamoDB.Table.batch_writer """ return BatchWriter( - table_name=table_name or self._table_name, - client=self._client, + table_name=table_name or self.table_name, + client=self.client, overwrite_by_pkeys=overwrite_by_pkeys, ) @@ -1033,15 +1005,23 @@ class DynamoDBCollection: if not key.pairs: return {} - items = [] sortkeys = key.pairs[1:] if flatten_top else key.pairs + client = self.persistence_layer.client + table_name = self.persistence_layer.table_name - with self.persistence_layer.transact_items() as transact: - # Add a get operation for each key for the transaction - for pair in key.pairs: - transact.get(key=pair) + # Add a get operation for each key for the transaction + transact_items = [ + { + 'Get': { + 'TableName': getattr(pair, 'table_name', table_name), + 'Key': serialize(pair), + } + } + for pair in key.pairs + ] - items = transact.get_items() + response = client.transact_get_items(TransactItems=transact_items) # type: ignore + items = [deserialize(r.get('Item', {})) for r in response.get('Responses', [])] if flatten_top: head, *tail = items diff --git a/layercake/pyproject.toml b/layercake/pyproject.toml index 3d53546..c244d0c 100644 --- a/layercake/pyproject.toml +++ b/layercake/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "layercake" -version = "0.5.0" +version = "0.6.2" description = "Packages shared dependencies to optimize deployment and ensure consistency across functions." readme = "README.md" authors = [ diff --git a/layercake/tests/test_dynamodb.py b/layercake/tests/test_dynamodb.py index 82308c9..026b0ab 100644 --- a/layercake/tests/test_dynamodb.py +++ b/layercake/tests/test_dynamodb.py @@ -93,33 +93,30 @@ def test_transact_write_items( ): class EmailConflictError(Exception): ... - with dynamodb_persistence_layer.transact_items() as transact: - # transact = TransactItems(dynamodb_persistence_layer.table_name) - transact.put(item=KeyPair('5OxmMjL-ujoR5IMGegQz', '0')) - transact.put(item=KeyPair('cpf', '07879819908')) - transact.put( - item=KeyPair('email', 'sergio@somosbeta.com.br'), - cond_expr='attribute_not_exists(sk)', - ) - transact.put( - item=KeyPair( - '5OxmMjL-ujoR5IMGegQz', - ComposeKey('sergio@somosbeta.com.br', 'emails'), - ), - cond_expr='attribute_not_exists(sk)', - exc_cls=EmailConflictError, - ) - - with pytest.raises(EmailConflictError): - transact.write_items() + with pytest.raises(EmailConflictError): + with dynamodb_persistence_layer.transact_writer(flush_amount=2) as transact: + transact.put(item=KeyPair('5OxmMjL-ujoR5IMGegQz', '0')) + transact.put(item=KeyPair('cpf', '07879819908')) + transact.put( + item=KeyPair('email', 'sergio@somosbeta.com.br'), + cond_expr='attribute_not_exists(sk)', + ) + transact.put( + item=KeyPair( + '5OxmMjL-ujoR5IMGegQz', + ComposeKey('sergio@somosbeta.com.br', 'emails'), + ), + cond_expr='attribute_not_exists(sk)', + exc_cls=EmailConflictError, + ) def test_collection_get_item( dynamodb_seeds, dynamodb_persistence_layer: DynamoDBPersistenceLayer, ): - collect = dynamodb_persistence_layer.collect - data_notfound = collect.get_item( + collection = dynamodb_persistence_layer.collection + data_notfound = collection.get_item( KeyPair( pk='5OxmMjL-ujoR5IMGegQz', sk='tenant', @@ -130,7 +127,7 @@ def test_collection_get_item( assert data_notfound == {} # This data was added from seeds - data = collect.get_item( + data = collection.get_item( KeyPair( pk='5OxmMjL-ujoR5IMGegQz', sk=ComposeKey('sergio@somosbeta.com.br', prefix='emails'), @@ -150,7 +147,7 @@ def test_collection_get_item( class NotFoundError(Exception): ... with pytest.raises(NotFoundError): - collect.get_item( + collection.get_item( KeyPair('5OxmMjL-ujoR5IMGegQz', 'notfound'), exc_cls=NotFoundError, ) @@ -160,10 +157,10 @@ def test_collection_get_item_path_spec( dynamodb_seeds, dynamodb_persistence_layer: DynamoDBPersistenceLayer, ): - collect = dynamodb_persistence_layer.collect + collection = dynamodb_persistence_layer.collection # This data was added from seeds - data = collect.get_item( + data = collection.get_item( KeyPair( pk='5OxmMjL-ujoR5IMGegQz', sk=SortKey( diff --git a/layercake/uv.lock b/layercake/uv.lock index 6cfa0ea..880922c 100644 --- a/layercake/uv.lock +++ b/layercake/uv.lock @@ -589,7 +589,7 @@ wheels = [ [[package]] name = "layercake" -version = "0.4.0" +version = "0.6.2" source = { editable = "." } dependencies = [ { name = "arnparse" }, diff --git a/order-management/app/events/assign_tenant_cnpj.py b/order-management/app/events/assign_tenant_cnpj.py index 7f1656d..9cd7ca0 100644 --- a/order-management/app/events/assign_tenant_cnpj.py +++ b/order-management/app/events/assign_tenant_cnpj.py @@ -9,7 +9,6 @@ from layercake.dynamodb import ( DynamoDBPersistenceLayer, KeyPair, SortKey, - TransactItems, ) from boto3clients import dynamodb_client @@ -25,7 +24,7 @@ order_layer = DynamoDBPersistenceLayer(ORDER_TABLE, dynamodb_client) def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: new_image = event.detail['new_image'] now_ = now() - ids = user_layer.collect.get_items( + ids = user_layer.collection.get_items( KeyPair( pk='cnpj', sk=SortKey(new_image['cnpj'], path_spec='user_id'), @@ -44,35 +43,36 @@ def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: if len(ids) < 2: raise ValueError('IDs not found.') - transact = TransactItems(order_layer.table_name) - transact.update( - key=KeyPair(new_image['id'], '0'), - update_expr='SET metadata__tenant_id = :tenant_id, \ - metadata__related_ids = :related_ids, update_date = :update_date', - expr_attr_values={ - ':tenant_id': ids['org_id'], - ':related_ids': set(ids.values()), - ':update_date': now_, - }, - ) - transact.put( - item={ - 'id': new_image['id'], - 'sk': 'metadata#tenant', - 'tenant_id': f'ORG#{ids["org_id"]}', - 'create_date': now_, - } - ) - - for k, v in ids.items(): + with order_layer.transact_writer() as transact: + transact.update( + key=KeyPair(new_image['id'], '0'), + update_expr='SET metadata__tenant_id = :tenant_id, \ + metadata__related_ids = :related_ids, \ + update_date = :update_date', + expr_attr_values={ + ':tenant_id': ids['org_id'], + ':related_ids': set(ids.values()), + ':update_date': now_, + }, + ) transact.put( item={ 'id': new_image['id'], - 'sk': 'related_ids#%s' % k.removesuffix('_id'), # e.g. related_ids#user + 'sk': 'metadata#tenant', + 'tenant_id': f'ORG#{ids["org_id"]}', 'create_date': now_, - k: v, } ) - order_layer.transact_write_items(transact) + for k, v in ids.items(): + transact.put( + item={ + 'id': new_image['id'], + 'sk': 'related_ids#%s' + % k.removesuffix('_id'), # e.g. related_ids#user + 'create_date': now_, + k: v, + } + ) + return True diff --git a/order-management/app/events/stopgap/set_as_paid.py b/order-management/app/events/stopgap/set_as_paid.py index 806ea08..1edb1c2 100644 --- a/order-management/app/events/stopgap/set_as_paid.py +++ b/order-management/app/events/stopgap/set_as_paid.py @@ -8,7 +8,6 @@ from layercake.dateutils import now from layercake.dynamodb import ( DynamoDBPersistenceLayer, KeyPair, - TransactItems, ) from boto3clients import dynamodb_client @@ -23,24 +22,25 @@ order_layer = DynamoDBPersistenceLayer(ORDER_TABLE, dynamodb_client) def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: new_image = event.detail['new_image'] now_ = now() - transact = TransactItems(order_layer.table_name) - transact.update( - key=KeyPair(new_image['id'], '0'), - update_expr='SET #status = :status, update_date = :update_date', - expr_attr_names={ - '#status': 'status', - }, - expr_attr_values={ - ':status': 'PAID', - ':update_date': now_, - }, - ) - transact.put( - item={ - 'id': new_image['id'], - 'sk': 'paid_date', - 'create_date': now_, - } - ) - order_layer.transact_write_items(transact) + + with order_layer.transact_writer() as transact: + transact.update( + key=KeyPair(new_image['id'], '0'), + update_expr='SET #status = :status, update_date = :update_date', + expr_attr_names={ + '#status': 'status', + }, + expr_attr_values={ + ':status': 'PAID', + ':update_date': now_, + }, + ) + transact.put( + item={ + 'id': new_image['id'], + 'sk': 'paid_date', + 'create_date': now_, + } + ) + return True diff --git a/order-management/template.yaml b/order-management/template.yaml index 80c0a2d..4d0c9c4 100644 --- a/order-management/template.yaml +++ b/order-management/template.yaml @@ -20,7 +20,7 @@ Globals: Architectures: - x86_64 Layers: - - !Sub arn:aws:lambda:sa-east-1:336641857101:layer:layercake:68 + - !Sub arn:aws:lambda:sa-east-1:336641857101:layer:layercake:72 Environment: Variables: TZ: America/Sao_Paulo diff --git a/order-management/tests/events/test_assign_tenant.py b/order-management/tests/events/test_assign_tenant.py index a9beff4..ccb36cb 100644 --- a/order-management/tests/events/test_assign_tenant.py +++ b/order-management/tests/events/test_assign_tenant.py @@ -21,7 +21,7 @@ def test_assign_tenant_cnpj( assert app.lambda_handler(event, lambda_context) # type: ignore - result = dynamodb_persistence_layer.collect.query( + result = dynamodb_persistence_layer.collection.query( PartitionKey('9omWNKymwU5U4aeun6mWzZ') ) diff --git a/order-management/uv.lock b/order-management/uv.lock index cb5116c..759c70d 100644 --- a/order-management/uv.lock +++ b/order-management/uv.lock @@ -495,7 +495,7 @@ wheels = [ [[package]] name = "layercake" -version = "0.4.0" +version = "0.6.2" source = { directory = "../layercake" } dependencies = [ { name = "arnparse" },