From 52e86b9f0f1cbe4e550a6cc56665c7f356fdbfe7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Rafael=20Siqueira?= Date: Mon, 21 Jul 2025 18:41:20 -0300 Subject: [PATCH] enrollment to PF --- enrollments-events/app/enrollment.py | 242 ++++++++++++++++++ enrollments-events/app/events/enroll.py | 97 +++++++ .../stopgap/{enroll.py => patch_enroll.py} | 49 ++-- .../app/events/stopgap/patch_konviva.py | 4 + enrollments-events/app/schemas.py | 55 ++++ enrollments-events/template.yaml | 33 ++- .../tests/events/test_enroll.py | 26 ++ enrollments-events/tests/seeds.jsonl | 7 +- 8 files changed, 488 insertions(+), 25 deletions(-) create mode 100644 enrollments-events/app/enrollment.py create mode 100644 enrollments-events/app/events/enroll.py rename enrollments-events/app/events/stopgap/{enroll.py => patch_enroll.py} (60%) create mode 100644 enrollments-events/app/schemas.py create mode 100644 enrollments-events/tests/events/test_enroll.py diff --git a/enrollments-events/app/enrollment.py b/enrollments-events/app/enrollment.py new file mode 100644 index 0000000..deb2291 --- /dev/null +++ b/enrollments-events/app/enrollment.py @@ -0,0 +1,242 @@ +from dataclasses import asdict, dataclass +from datetime import timedelta +from enum import Enum +from typing import Self, TypedDict + +from layercake.dateutils import now, ttl +from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair +from layercake.strutils import md5_hash + +from schemas import Enrollment + +Tenant = TypedDict('Tenant', {'id': str, 'name': str}) +Author = TypedDict('Author', {'id': str, 'name': str}) +DeduplicationWindow = TypedDict('DeduplicationWindow', {'offset_days': int}) + + +class LinkedEntity(str): + def __new__(cls, id: str, type: str) -> Self: + return super().__new__(cls, '#'.join([type.upper(), id])) + + def __init__(self, id: str, type: str) -> None: + # __init__ is used to store the parameters for later reference. + # For immutable types like str, __init__ cannot change the instance's value. + self.id = id + self.type = type + + +@dataclass(frozen=True) +class Slot: + id: str + sk: str + + @property + def order_id(self) -> LinkedEntity: + idx, _ = self.sk.split('#') + return LinkedEntity(idx, 'order') + + +class LifecycleEvents(str, Enum): + """Lifecycle events related to scheduling actions.""" + + # Reminder if the user does not access within 3 days + # REMINDER_NO_ACCESS_3_DAYS = 'schedules#reminder_no_access_3_days' + DOES_NOT_ACCESS = 'schedules#does_not_access' + + # When there is no activity 7 days after the first access + # NO_ACTIVITY_7_DAYS = 'schedules#no_activity_7_days' + NO_ACTIVITY = 'schedules#no_activity' + + # Reminder 30 days before the access period expires + # ACCESS_PERIOD_REMINDER_30_DAYS = 'schedules#access_period_reminder_30_days' + ACCESS_PERIOD_ENDS = 'schedules#access_period_ends' + + # Reminder for certificate expiration set to 30 days from now + CERT_EXPIRATION_REMINDER_30_DAYS = 'schedules#cert_expiration_reminder_30_days' + + # Archive the course after the certificate expires + # SET_AS_ARCHIVE = 'schedules#set_as_archive' + ARCHIVE_IT = 'schedules#archive_it' + + # When the access period ends for a course without a certificate + # SET_AS_EXPIRE = 'schedules#set_as_expire' + EXPIRATION = 'schedules#expiration' + + +def enroll( + enrollment: Enrollment, + *, + slot: Slot | None = None, + author: Author | None = None, + linked_entities: frozenset[LinkedEntity] = frozenset(), + deduplication_window: DeduplicationWindow | None = None, + persistence_layer: DynamoDBPersistenceLayer, +) -> bool: + """Enrolls a user into a course and schedules lifecycle events.""" + now_ = now() + user = enrollment.user + course = enrollment.course + lock_hash = md5_hash('%s%s' % (user.id, course.id)) + + with persistence_layer.transact_writer() as transact: + if slot: + linked_entities = frozenset({slot.order_id}) | linked_entities + + transact.put( + item={ + 'sk': '0', + 'create_date': now_, + # 'created_at': now_, + **enrollment.model_dump(), + }, + ) + transact.put( + item={ + 'id': enrollment.id, + 'sk': 'metadata#course', + 'created_at': now_, + **course.model_dump(include={'cert', 'access_period'}), + } + ) + transact.put( + item={ + 'id': enrollment.id, + # Post-migration: uncomment the following line + # 'sk': LifecycleEvents.REMINDER_NO_ACCESS_3_DAYS, + 'sk': LifecycleEvents.DOES_NOT_ACCESS, + 'name': user.name, + 'email': user.email, + 'course': course.name, + 'created_at': now_, + 'ttl': ttl(days=3, start_dt=now_), + }, + ) + # Enrollment expires by default when the access period ends. + # When the course is finished, it is automatically removed, + # and the `schedules#course_archived` event is created. + transact.put( + item={ + 'id': enrollment.id, + 'sk': LifecycleEvents.EXPIRATION, + # Post-migration: uncomment the following line + # 'sk': LifecycleEvents.COURSE_EXPIRED, + 'name': user.name, + 'email': user.email, + 'course': course.name, + 'created_at': now_, + 'ttl': ttl(start_dt=now_ + timedelta(days=course.access_period)), + }, + ) + transact.put( + item={ + 'id': enrollment.id, + # Post-migration: uncomment the following line + # 'sk': LifecycleEvents.ACCESS_PERIOD_REMINDER_30_DAYS, + 'sk': LifecycleEvents.ACCESS_PERIOD_ENDS, + 'name': user.name, + 'email': user.email, + 'course': course.name, + 'created_at': now_, + 'ttl': ttl(start_dt=now_ + timedelta(days=course.access_period - 30)), + }, + ) + + for entity in linked_entities: + type = entity.type.lower() + transact.put( + item={ + 'id': enrollment.id, + 'sk': f'linked_entities#{type}', + 'created_at': now_, + f'{type}_id': entity.id, + } + ) + + if slot: + transact.put( + item={ + 'id': enrollment.id, + # Post-migration: uncomment the following line + # 'sk': 'metadata#parent_slot', + 'sk': 'parent_vacancy', + 'vacancy': asdict(slot), + 'created_at': now_, + } + ) + + class SlotDoesNotExistError(Exception): + def __init__(self, *args): + super().__init__('Slot does not exist') + + transact.delete( + key=KeyPair(slot.id, slot.sk), + cond_expr='attribute_exists(sk)', + exc_cls=SlotDoesNotExistError, + ) + transact.put( + item={ + 'id': enrollment.id, + 'sk': 'cancel_policy', + 'created_at': now_, + } + ) + + if author: + transact.put( + item={ + 'id': enrollment.id, + 'sk': 'author', + 'user_id': author['id'], + 'name': author['name'], + 'created_at': now_, + }, + ) + + class DeduplicationConflictError(Exception): + def __init__(self, *args): + super().__init__('Enrollment already exists') + + # Prevents the user from enrolling in the same course again until + # the deduplication window expires or is removed. + if deduplication_window: + offset_days = deduplication_window['offset_days'] + ttl_expiration = ttl( + start_dt=now_ + timedelta(days=course.access_period - offset_days) + ) + transact.put( + item={ + 'id': 'lock', + 'sk': lock_hash, + 'enrollment_id': enrollment.id, + 'created_at': now_, + 'ttl': ttl_expiration, + }, + cond_expr='attribute_not_exists(sk)', + exc_cls=DeduplicationConflictError, + ) + transact.put( + item={ + 'id': enrollment.id, + 'sk': 'lock', + 'hash': lock_hash, + 'created_at': now_, + 'ttl': ttl_expiration, + }, + ) + # Deduplication window can be recalculated if needed + transact.put( + item={ + 'id': enrollment.id, + 'sk': 'metadata#deduplication_window', + 'offset_days': offset_days, + 'created_at': now_, + }, + ) + else: + transact.condition( + key=KeyPair('lock', lock_hash), + cond_expr='attribute_not_exists(sk)', + exc_cls=DeduplicationConflictError, + ) + + return True diff --git a/enrollments-events/app/events/enroll.py b/enrollments-events/app/events/enroll.py new file mode 100644 index 0000000..7e17324 --- /dev/null +++ b/enrollments-events/app/events/enroll.py @@ -0,0 +1,97 @@ +from uuid import uuid4 + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.data_classes import ( + EventBridgeEvent, + event_source, +) +from aws_lambda_powertools.utilities.typing import LambdaContext +from glom import glom +from layercake.dateutils import now +from layercake.dynamodb import ( + DynamoDBPersistenceLayer, + KeyChain, + KeyPair, + SortKey, + TransactKey, +) + +from boto3clients import dynamodb_client +from config import COURSE_TABLE, ENROLLMENT_TABLE, ORDER_TABLE +from enrollment import DeduplicationWindow, LinkedEntity, enroll +from schemas import Course, Enrollment, User + +logger = Logger(__name__) +order_layer = DynamoDBPersistenceLayer(ORDER_TABLE, dynamodb_client) +course_layer = DynamoDBPersistenceLayer(COURSE_TABLE, dynamodb_client) +enrollment_layer = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client) + + +@event_source(data_class=EventBridgeEvent) +@logger.inject_lambda_context +def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> list[str]: + new_image = event.detail['new_image'] + now_ = now() + order_id = new_image['id'] + order = order_layer.collection.get_items( + TransactKey(order_id) + SortKey('0') + SortKey('items', path_spec='items'), + ) + items = { + item['id']: int(item['quantity']) + for item in order['items'] + # Ignore items with non-positive unit price; + # negative values are treated as discounts + if item['unit_price'] > 0 + } + + result = order_layer.collection.query(KeyPair(order_id, 'assignees#')) + user_id = glom(result, 'items.0.sk').removeprefix('assignees#') + + courses = _get_courses(set(items.keys())) + user = User( + id=user_id, + name=order['name'], + email=order['email'], + cpf=order['cpf'], + ) + + ids = [] + + for course in courses: + enrollment = Enrollment( + id=uuid4(), + user=user, + course=course, + ) + enroll( + enrollment, + persistence_layer=enrollment_layer, + deduplication_window=DeduplicationWindow(offset_days=90), + linked_entities=frozenset({LinkedEntity(order_id, 'ORDER')}), + ) + ids.append(enrollment.id) + + order_layer.update_item( + key=KeyPair(new_image['id'], new_image['sk']), + update_expr='SET #status = :status, updated_at = :updated_at', + expr_attr_names={ + '#status': 'status', + }, + expr_attr_values={ + ':status': 'SUCCESS', + ':updated_at': now_, + }, + cond_expr='attribute_exists(sk)', + ) + + return ids + + +def _get_courses(ids: set) -> tuple[Course, ...]: + pairs = tuple(KeyPair(idx, '0') for idx in ids) + result = course_layer.collection.get_items( + KeyChain(pairs), + flatten_top=False, + ) + courses = tuple(Course(id=idx, **obj) for idx, obj in result.items()) # type: ignore + return courses diff --git a/enrollments-events/app/events/stopgap/enroll.py b/enrollments-events/app/events/stopgap/patch_enroll.py similarity index 60% rename from enrollments-events/app/events/stopgap/enroll.py rename to enrollments-events/app/events/stopgap/patch_enroll.py index 63dd2bf..5e5c5fd 100644 --- a/enrollments-events/app/events/stopgap/enroll.py +++ b/enrollments-events/app/events/stopgap/patch_enroll.py @@ -29,30 +29,35 @@ enrollment_layer = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client) def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: new_image = event.detail['new_image'] now_ = now() - course = _get_course(new_image['course']['id']) - with enrollment_layer.transact_writer() as transact: - transact.put( - item={ - 'id': new_image['id'], - 'sk': 'metadata#deduplication_window', - 'offset_days': 90, - 'created_at': now_, - } - ) - transact.put( - item={ - 'id': new_image['id'], - 'sk': 'metadata#course', - 'created_at': now_, - 'access_period': int(course['access_period']), - 'cert': { - 'exp_interval': int(course['cert']['exp_interval']), - }, - } - ) + try: + course = _get_course(new_image['course']['id']) - return True + with enrollment_layer.transact_writer() as transact: + transact.put( + item={ + 'id': new_image['id'], + 'sk': 'metadata#deduplication_window', + 'offset_days': 90, + 'created_at': now_, + } + ) + transact.put( + item={ + 'id': new_image['id'], + 'sk': 'metadata#course', + 'created_at': now_, + 'access_period': int(course['access_period']), + 'cert': { + 'exp_interval': int(course['cert']['exp_interval']), + }, + } + ) + except Exception as exc: + logger.exception(exc) + return False + else: + return True class CourseNotFoundError(Exception): diff --git a/enrollments-events/app/events/stopgap/patch_konviva.py b/enrollments-events/app/events/stopgap/patch_konviva.py index 27075a7..1376033 100644 --- a/enrollments-events/app/events/stopgap/patch_konviva.py +++ b/enrollments-events/app/events/stopgap/patch_konviva.py @@ -50,6 +50,8 @@ def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: flatten_top=False, ) + # If `class_id` is not found, try to retrieve it from the SQLite + # migration database. if 'class_id' not in data: data['class_id'] = _get_class_id(course_id) @@ -72,6 +74,7 @@ class CourseNotFoundError(Exception): super().__init__('Course not found') +# Post-migration: remove the following function def _get_class_id(course_id: str) -> int: with sqlite3.connect( database=SQLITE_DATABASE, detect_types=sqlite3.PARSE_DECLTYPES @@ -84,4 +87,5 @@ def _get_class_id(course_id: str) -> int: for row in rows: return int(row['json']['metadata__konviva_id']) + logger.error('Course not found', course_id=course_id) raise CourseNotFoundError diff --git a/enrollments-events/app/schemas.py b/enrollments-events/app/schemas.py new file mode 100644 index 0000000..b9c06d5 --- /dev/null +++ b/enrollments-events/app/schemas.py @@ -0,0 +1,55 @@ +from typing import Any, Literal +from uuid import uuid4 + +from layercake.extra_types import CpfStr, NameStr +from pydantic import ( + UUID4, + BaseModel, + ConfigDict, + EmailStr, + Field, +) + + +class User(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + + id: UUID4 | str = Field(default_factory=uuid4) + name: NameStr + email: EmailStr + email_verified: bool = False + cpf: CpfStr | None = None + + +class Cert(BaseModel): + exp_interval: int + + +class Course(BaseModel): + id: UUID4 = Field(default_factory=uuid4) + name: str + cert: Cert | None = None + access_period: int = 90 # 3 months + + +class Enrollment(BaseModel): + id: UUID4 | str = Field(default_factory=uuid4) + user: User + course: Course + progress: int = Field(default=0, ge=0, le=100) + status: Literal['PENDING'] = 'PENDING' + + def model_dump( + self, + exclude=None, + *args, + **kwargs, + ) -> dict[str, Any]: + return super().model_dump( + exclude={ + 'user': {'email_verified'}, + 'course': {'cert', 'access_period'}, + }, + *args, + **kwargs, + ) diff --git a/enrollments-events/template.yaml b/enrollments-events/template.yaml index a24b2b7..f01ba46 100644 --- a/enrollments-events/template.yaml +++ b/enrollments-events/template.yaml @@ -42,10 +42,10 @@ Resources: Properties: RetentionInDays: 90 - EventEnrollFunction: + EventPatchEnrollFunction: Type: AWS::Serverless::Function Properties: - Handler: events.stopgap.enroll.lambda_handler + Handler: events.stopgap.patch_enroll.lambda_handler LoggingConfig: LogGroup: !Ref EventLog Policies: @@ -86,6 +86,35 @@ Resources: new_image: sk: ["0"] + EventEnrollFunction: + Type: AWS::Serverless::Function + Properties: + Handler: events.enroll.lambda_handler + LoggingConfig: + LogGroup: !Ref EventLog + Policies: + - DynamoDBCrudPolicy: + TableName: !Ref OrderTable + - DynamoDBCrudPolicy: + TableName: !Ref EnrollmentTable + - DynamoDBReadPolicy: + TableName: !Ref CourseTable + Events: + DynamoDBEvent: + Type: EventBridgeRule + Properties: + Pattern: + resources: [betaeducacao-prod-orders] + detail-type: [INSERT] + detail: + new_image: + # Post-migration: uncomment the following lines + # sk: [slots] + # mode: [STANDALONE] + sk: [generated_items] + scope: [SINGLE_USER] + status: [PENDING] + EventAllocateSlotsFunction: Type: AWS::Serverless::Function Properties: diff --git a/enrollments-events/tests/events/test_enroll.py b/enrollments-events/tests/events/test_enroll.py new file mode 100644 index 0000000..c24aaa7 --- /dev/null +++ b/enrollments-events/tests/events/test_enroll.py @@ -0,0 +1,26 @@ +import app.events.enroll as app +from aws_lambda_powertools.utilities.typing import LambdaContext +from layercake.dynamodb import DynamoDBPersistenceLayer, PartitionKey + + +def test_enroll( + dynamodb_seeds, + dynamodb_client, + dynamodb_persistence_layer: DynamoDBPersistenceLayer, + lambda_context: LambdaContext, +): + event = { + 'detail': { + 'new_image': { + 'id': 'cpYSbBcie2NDbZhDKCxCih', + 'sk': 'generated_items', + } + } + } + ids = app.lambda_handler(event, lambda_context) # type: ignore + print(ids) + + result = dynamodb_persistence_layer.collection.query(PartitionKey(str(ids[0]))) + print(result) + + # assert len(result['items']) == 4 diff --git a/enrollments-events/tests/seeds.jsonl b/enrollments-events/tests/seeds.jsonl index 1b05a8a..7eb8ddc 100644 --- a/enrollments-events/tests/seeds.jsonl +++ b/enrollments-events/tests/seeds.jsonl @@ -5,5 +5,10 @@ {"id": {"S": "JeCybf6oiv6CF3PchhBqdG"}, "sk": {"S": "items"},"items": {"L": [{"M": {"id": {"S": "a955518e-ebcb-4441-b914-ddc9ecef84f0"},"name": {"S": "NR-11 Operador de Munck"},"quantity": {"N": "3"},"unit_price": {"N": "99"}}}, {"M": {"id": {"S": "123"},"name": {"S": "pytest"},"quantity": {"N": "1"},"unit_price": {"N": "99"}}},{"M": {"id": {"S": "23020"},"name": {"S": "Desconto 100%"},"quantity": {"N": "1"},"unit_price": {"N": "-297"}}}]},"updated_at": {"S": "2025-07-16T15:54:27.154404-03:00"}} {"id": {"S": "JeCybf6oiv6CF3PchhBqdG"},"sk": {"S": "generated_items"},"create_date": {"S": "2025-07-16T15:54:30.160729-03:00"},"scope": {"S": "MULTI_USER"},"status": {"S": "SUCCESS"},"update_date": {"S": "2025-07-16T15:54:33.674670-03:00"}} {"id": {"S": "a955518e-ebcb-4441-b914-ddc9ecef84f0"},"sk": {"S": "0"},"access_period": {"N": "360"},"cert": {"M": {"exp_interval": {"N": "360"}}},"created_at": {"S": "2025-07-14T15:09:18.559528-03:00"},"metadata__konviva_class_id": {"N": "281"},"name": {"S": "NR-11 Operador de Munck"},"tenant_id": {"S": "*"}} +{"id": {"S": "6a403773-aeac-4e6a-ac39-dc958e4be52a"},"sk": {"S": "0"},"access_period": {"N": "360"},"cert": {"M": {"exp_interval": {"N": "360"}}},"created_at": {"S": "2025-07-14T15:09:18.559528-03:00"},"metadata__konviva_class_id": {"N": "281"},"name": {"S": "Reciclagem em NR-11 - Operador de Empilhadeira"},"tenant_id": {"S": "*"}} {"id": {"S": "123"},"sk": {"S": "0"},"access_period": {"N": "360"},"cert": {"M": {"exp_interval": {"N": "360"}}},"created_at": {"S": "2025-07-14T15:09:18.559528-03:00"},"metadata__konviva_class_id": {"N": "281"},"name": {"S": "pytest"},"tenant_id": {"S": "*"}} -{"id": {"S": "5OxmMjL-ujoR5IMGegQz"},"sk": {"S": "konviva"},"created_at": {"S": "2025-07-11T13:52:35.521154-03:00"},"konvivaId": {"N": "26943"}} \ No newline at end of file +{"id": {"S": "5OxmMjL-ujoR5IMGegQz"},"sk": {"S": "konviva"},"created_at": {"S": "2025-07-11T13:52:35.521154-03:00"},"konvivaId": {"N": "26943"}} +{"id": {"S": "cpYSbBcie2NDbZhDKCxCih"}, "sk": {"S": "0"},"cpf": {"S": "02713421535"},"create_date": {"S": "2025-07-21T16:19:43.297712-03:00"},"due_date": {"S": "2025-07-22T16:13:41.056000-03:00"},"email": {"S": "sergio@somosbeta.com.br"},"name": {"S": "Sérgio Rafael Siqueira"},"payment_date": {"S": "2025-07-21T16:21:47.161889-03:00"},"payment_method": {"S": "PIX"},"phone_number": {"S": "+5574998189595"},"status": {"S": "PAID"},"total": {"N": "99"},"update_date": {"S": "2025-07-21T16:21:47.161889-03:00"}} +{"id": {"S": "cpYSbBcie2NDbZhDKCxCih"}, "sk": {"S": "assignees#5OxmMjL-ujoR5IMGegQz"},"create_date": {"S": "2025-07-21T16:20:43.631344-03:00"},"scope": {"S": "USER"}} +{"id": {"S": "cpYSbBcie2NDbZhDKCxCih"}, "sk": {"S": "items"},"items": {"L": [{"M": {"id": {"S": "6a403773-aeac-4e6a-ac39-dc958e4be52a"},"name": {"S": "Reciclagem em NR-11 - Operador de Empilhadeira"},"quantity": {"N": "1"},"unit_price": {"N": "99"}}}]}} +{"id": {"S": "cpYSbBcie2NDbZhDKCxCih"}, "sk": {"S": "generated_items"},"create_date": {"S": "2025-07-21T16:21:50.143551-03:00"},"scope": {"S": "SINGLE_USER"},"status": {"S": "SUCCESS"},"update_date": {"S": "2025-07-21T16:21:53.994941-03:00"}} \ No newline at end of file