enrollment to PF

This commit is contained in:
2025-07-21 18:41:20 -03:00
parent 58a174f432
commit 52e86b9f0f
8 changed files with 488 additions and 25 deletions

View File

@@ -0,0 +1,242 @@
from dataclasses import asdict, dataclass
from datetime import timedelta
from enum import Enum
from typing import Self, TypedDict
from layercake.dateutils import now, ttl
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair
from layercake.strutils import md5_hash
from schemas import Enrollment
Tenant = TypedDict('Tenant', {'id': str, 'name': str})
Author = TypedDict('Author', {'id': str, 'name': str})
DeduplicationWindow = TypedDict('DeduplicationWindow', {'offset_days': int})
class LinkedEntity(str):
def __new__(cls, id: str, type: str) -> Self:
return super().__new__(cls, '#'.join([type.upper(), id]))
def __init__(self, id: str, type: str) -> None:
# __init__ is used to store the parameters for later reference.
# For immutable types like str, __init__ cannot change the instance's value.
self.id = id
self.type = type
@dataclass(frozen=True)
class Slot:
id: str
sk: str
@property
def order_id(self) -> LinkedEntity:
idx, _ = self.sk.split('#')
return LinkedEntity(idx, 'order')
class LifecycleEvents(str, Enum):
"""Lifecycle events related to scheduling actions."""
# Reminder if the user does not access within 3 days
# REMINDER_NO_ACCESS_3_DAYS = 'schedules#reminder_no_access_3_days'
DOES_NOT_ACCESS = 'schedules#does_not_access'
# When there is no activity 7 days after the first access
# NO_ACTIVITY_7_DAYS = 'schedules#no_activity_7_days'
NO_ACTIVITY = 'schedules#no_activity'
# Reminder 30 days before the access period expires
# ACCESS_PERIOD_REMINDER_30_DAYS = 'schedules#access_period_reminder_30_days'
ACCESS_PERIOD_ENDS = 'schedules#access_period_ends'
# Reminder for certificate expiration set to 30 days from now
CERT_EXPIRATION_REMINDER_30_DAYS = 'schedules#cert_expiration_reminder_30_days'
# Archive the course after the certificate expires
# SET_AS_ARCHIVE = 'schedules#set_as_archive'
ARCHIVE_IT = 'schedules#archive_it'
# When the access period ends for a course without a certificate
# SET_AS_EXPIRE = 'schedules#set_as_expire'
EXPIRATION = 'schedules#expiration'
def enroll(
enrollment: Enrollment,
*,
slot: Slot | None = None,
author: Author | None = None,
linked_entities: frozenset[LinkedEntity] = frozenset(),
deduplication_window: DeduplicationWindow | None = None,
persistence_layer: DynamoDBPersistenceLayer,
) -> bool:
"""Enrolls a user into a course and schedules lifecycle events."""
now_ = now()
user = enrollment.user
course = enrollment.course
lock_hash = md5_hash('%s%s' % (user.id, course.id))
with persistence_layer.transact_writer() as transact:
if slot:
linked_entities = frozenset({slot.order_id}) | linked_entities
transact.put(
item={
'sk': '0',
'create_date': now_,
# 'created_at': now_,
**enrollment.model_dump(),
},
)
transact.put(
item={
'id': enrollment.id,
'sk': 'metadata#course',
'created_at': now_,
**course.model_dump(include={'cert', 'access_period'}),
}
)
transact.put(
item={
'id': enrollment.id,
# Post-migration: uncomment the following line
# 'sk': LifecycleEvents.REMINDER_NO_ACCESS_3_DAYS,
'sk': LifecycleEvents.DOES_NOT_ACCESS,
'name': user.name,
'email': user.email,
'course': course.name,
'created_at': now_,
'ttl': ttl(days=3, start_dt=now_),
},
)
# Enrollment expires by default when the access period ends.
# When the course is finished, it is automatically removed,
# and the `schedules#course_archived` event is created.
transact.put(
item={
'id': enrollment.id,
'sk': LifecycleEvents.EXPIRATION,
# Post-migration: uncomment the following line
# 'sk': LifecycleEvents.COURSE_EXPIRED,
'name': user.name,
'email': user.email,
'course': course.name,
'created_at': now_,
'ttl': ttl(start_dt=now_ + timedelta(days=course.access_period)),
},
)
transact.put(
item={
'id': enrollment.id,
# Post-migration: uncomment the following line
# 'sk': LifecycleEvents.ACCESS_PERIOD_REMINDER_30_DAYS,
'sk': LifecycleEvents.ACCESS_PERIOD_ENDS,
'name': user.name,
'email': user.email,
'course': course.name,
'created_at': now_,
'ttl': ttl(start_dt=now_ + timedelta(days=course.access_period - 30)),
},
)
for entity in linked_entities:
type = entity.type.lower()
transact.put(
item={
'id': enrollment.id,
'sk': f'linked_entities#{type}',
'created_at': now_,
f'{type}_id': entity.id,
}
)
if slot:
transact.put(
item={
'id': enrollment.id,
# Post-migration: uncomment the following line
# 'sk': 'metadata#parent_slot',
'sk': 'parent_vacancy',
'vacancy': asdict(slot),
'created_at': now_,
}
)
class SlotDoesNotExistError(Exception):
def __init__(self, *args):
super().__init__('Slot does not exist')
transact.delete(
key=KeyPair(slot.id, slot.sk),
cond_expr='attribute_exists(sk)',
exc_cls=SlotDoesNotExistError,
)
transact.put(
item={
'id': enrollment.id,
'sk': 'cancel_policy',
'created_at': now_,
}
)
if author:
transact.put(
item={
'id': enrollment.id,
'sk': 'author',
'user_id': author['id'],
'name': author['name'],
'created_at': now_,
},
)
class DeduplicationConflictError(Exception):
def __init__(self, *args):
super().__init__('Enrollment already exists')
# Prevents the user from enrolling in the same course again until
# the deduplication window expires or is removed.
if deduplication_window:
offset_days = deduplication_window['offset_days']
ttl_expiration = ttl(
start_dt=now_ + timedelta(days=course.access_period - offset_days)
)
transact.put(
item={
'id': 'lock',
'sk': lock_hash,
'enrollment_id': enrollment.id,
'created_at': now_,
'ttl': ttl_expiration,
},
cond_expr='attribute_not_exists(sk)',
exc_cls=DeduplicationConflictError,
)
transact.put(
item={
'id': enrollment.id,
'sk': 'lock',
'hash': lock_hash,
'created_at': now_,
'ttl': ttl_expiration,
},
)
# Deduplication window can be recalculated if needed
transact.put(
item={
'id': enrollment.id,
'sk': 'metadata#deduplication_window',
'offset_days': offset_days,
'created_at': now_,
},
)
else:
transact.condition(
key=KeyPair('lock', lock_hash),
cond_expr='attribute_not_exists(sk)',
exc_cls=DeduplicationConflictError,
)
return True

View File

@@ -0,0 +1,97 @@
from uuid import uuid4
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import (
EventBridgeEvent,
event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from glom import glom
from layercake.dateutils import now
from layercake.dynamodb import (
DynamoDBPersistenceLayer,
KeyChain,
KeyPair,
SortKey,
TransactKey,
)
from boto3clients import dynamodb_client
from config import COURSE_TABLE, ENROLLMENT_TABLE, ORDER_TABLE
from enrollment import DeduplicationWindow, LinkedEntity, enroll
from schemas import Course, Enrollment, User
logger = Logger(__name__)
order_layer = DynamoDBPersistenceLayer(ORDER_TABLE, dynamodb_client)
course_layer = DynamoDBPersistenceLayer(COURSE_TABLE, dynamodb_client)
enrollment_layer = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client)
@event_source(data_class=EventBridgeEvent)
@logger.inject_lambda_context
def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> list[str]:
new_image = event.detail['new_image']
now_ = now()
order_id = new_image['id']
order = order_layer.collection.get_items(
TransactKey(order_id) + SortKey('0') + SortKey('items', path_spec='items'),
)
items = {
item['id']: int(item['quantity'])
for item in order['items']
# Ignore items with non-positive unit price;
# negative values are treated as discounts
if item['unit_price'] > 0
}
result = order_layer.collection.query(KeyPair(order_id, 'assignees#'))
user_id = glom(result, 'items.0.sk').removeprefix('assignees#')
courses = _get_courses(set(items.keys()))
user = User(
id=user_id,
name=order['name'],
email=order['email'],
cpf=order['cpf'],
)
ids = []
for course in courses:
enrollment = Enrollment(
id=uuid4(),
user=user,
course=course,
)
enroll(
enrollment,
persistence_layer=enrollment_layer,
deduplication_window=DeduplicationWindow(offset_days=90),
linked_entities=frozenset({LinkedEntity(order_id, 'ORDER')}),
)
ids.append(enrollment.id)
order_layer.update_item(
key=KeyPair(new_image['id'], new_image['sk']),
update_expr='SET #status = :status, updated_at = :updated_at',
expr_attr_names={
'#status': 'status',
},
expr_attr_values={
':status': 'SUCCESS',
':updated_at': now_,
},
cond_expr='attribute_exists(sk)',
)
return ids
def _get_courses(ids: set) -> tuple[Course, ...]:
pairs = tuple(KeyPair(idx, '0') for idx in ids)
result = course_layer.collection.get_items(
KeyChain(pairs),
flatten_top=False,
)
courses = tuple(Course(id=idx, **obj) for idx, obj in result.items()) # type: ignore
return courses

View File

@@ -29,6 +29,8 @@ enrollment_layer = DynamoDBPersistenceLayer(ENROLLMENT_TABLE, dynamodb_client)
def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool:
new_image = event.detail['new_image'] new_image = event.detail['new_image']
now_ = now() now_ = now()
try:
course = _get_course(new_image['course']['id']) course = _get_course(new_image['course']['id'])
with enrollment_layer.transact_writer() as transact: with enrollment_layer.transact_writer() as transact:
@@ -51,7 +53,10 @@ def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool:
}, },
} }
) )
except Exception as exc:
logger.exception(exc)
return False
else:
return True return True

View File

@@ -50,6 +50,8 @@ def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool:
flatten_top=False, flatten_top=False,
) )
# If `class_id` is not found, try to retrieve it from the SQLite
# migration database.
if 'class_id' not in data: if 'class_id' not in data:
data['class_id'] = _get_class_id(course_id) data['class_id'] = _get_class_id(course_id)
@@ -72,6 +74,7 @@ class CourseNotFoundError(Exception):
super().__init__('Course not found') super().__init__('Course not found')
# Post-migration: remove the following function
def _get_class_id(course_id: str) -> int: def _get_class_id(course_id: str) -> int:
with sqlite3.connect( with sqlite3.connect(
database=SQLITE_DATABASE, detect_types=sqlite3.PARSE_DECLTYPES database=SQLITE_DATABASE, detect_types=sqlite3.PARSE_DECLTYPES
@@ -84,4 +87,5 @@ def _get_class_id(course_id: str) -> int:
for row in rows: for row in rows:
return int(row['json']['metadata__konviva_id']) return int(row['json']['metadata__konviva_id'])
logger.error('Course not found', course_id=course_id)
raise CourseNotFoundError raise CourseNotFoundError

View File

@@ -0,0 +1,55 @@
from typing import Any, Literal
from uuid import uuid4
from layercake.extra_types import CpfStr, NameStr
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
EmailStr,
Field,
)
class User(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
id: UUID4 | str = Field(default_factory=uuid4)
name: NameStr
email: EmailStr
email_verified: bool = False
cpf: CpfStr | None = None
class Cert(BaseModel):
exp_interval: int
class Course(BaseModel):
id: UUID4 = Field(default_factory=uuid4)
name: str
cert: Cert | None = None
access_period: int = 90 # 3 months
class Enrollment(BaseModel):
id: UUID4 | str = Field(default_factory=uuid4)
user: User
course: Course
progress: int = Field(default=0, ge=0, le=100)
status: Literal['PENDING'] = 'PENDING'
def model_dump(
self,
exclude=None,
*args,
**kwargs,
) -> dict[str, Any]:
return super().model_dump(
exclude={
'user': {'email_verified'},
'course': {'cert', 'access_period'},
},
*args,
**kwargs,
)

View File

@@ -42,10 +42,10 @@ Resources:
Properties: Properties:
RetentionInDays: 90 RetentionInDays: 90
EventEnrollFunction: EventPatchEnrollFunction:
Type: AWS::Serverless::Function Type: AWS::Serverless::Function
Properties: Properties:
Handler: events.stopgap.enroll.lambda_handler Handler: events.stopgap.patch_enroll.lambda_handler
LoggingConfig: LoggingConfig:
LogGroup: !Ref EventLog LogGroup: !Ref EventLog
Policies: Policies:
@@ -86,6 +86,35 @@ Resources:
new_image: new_image:
sk: ["0"] sk: ["0"]
EventEnrollFunction:
Type: AWS::Serverless::Function
Properties:
Handler: events.enroll.lambda_handler
LoggingConfig:
LogGroup: !Ref EventLog
Policies:
- DynamoDBCrudPolicy:
TableName: !Ref OrderTable
- DynamoDBCrudPolicy:
TableName: !Ref EnrollmentTable
- DynamoDBReadPolicy:
TableName: !Ref CourseTable
Events:
DynamoDBEvent:
Type: EventBridgeRule
Properties:
Pattern:
resources: [betaeducacao-prod-orders]
detail-type: [INSERT]
detail:
new_image:
# Post-migration: uncomment the following lines
# sk: [slots]
# mode: [STANDALONE]
sk: [generated_items]
scope: [SINGLE_USER]
status: [PENDING]
EventAllocateSlotsFunction: EventAllocateSlotsFunction:
Type: AWS::Serverless::Function Type: AWS::Serverless::Function
Properties: Properties:

View File

@@ -0,0 +1,26 @@
import app.events.enroll as app
from aws_lambda_powertools.utilities.typing import LambdaContext
from layercake.dynamodb import DynamoDBPersistenceLayer, PartitionKey
def test_enroll(
dynamodb_seeds,
dynamodb_client,
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
lambda_context: LambdaContext,
):
event = {
'detail': {
'new_image': {
'id': 'cpYSbBcie2NDbZhDKCxCih',
'sk': 'generated_items',
}
}
}
ids = app.lambda_handler(event, lambda_context) # type: ignore
print(ids)
result = dynamodb_persistence_layer.collection.query(PartitionKey(str(ids[0])))
print(result)
# assert len(result['items']) == 4

View File

@@ -5,5 +5,10 @@
{"id": {"S": "JeCybf6oiv6CF3PchhBqdG"}, "sk": {"S": "items"},"items": {"L": [{"M": {"id": {"S": "a955518e-ebcb-4441-b914-ddc9ecef84f0"},"name": {"S": "NR-11 Operador de Munck"},"quantity": {"N": "3"},"unit_price": {"N": "99"}}}, {"M": {"id": {"S": "123"},"name": {"S": "pytest"},"quantity": {"N": "1"},"unit_price": {"N": "99"}}},{"M": {"id": {"S": "23020"},"name": {"S": "Desconto 100%"},"quantity": {"N": "1"},"unit_price": {"N": "-297"}}}]},"updated_at": {"S": "2025-07-16T15:54:27.154404-03:00"}} {"id": {"S": "JeCybf6oiv6CF3PchhBqdG"}, "sk": {"S": "items"},"items": {"L": [{"M": {"id": {"S": "a955518e-ebcb-4441-b914-ddc9ecef84f0"},"name": {"S": "NR-11 Operador de Munck"},"quantity": {"N": "3"},"unit_price": {"N": "99"}}}, {"M": {"id": {"S": "123"},"name": {"S": "pytest"},"quantity": {"N": "1"},"unit_price": {"N": "99"}}},{"M": {"id": {"S": "23020"},"name": {"S": "Desconto 100%"},"quantity": {"N": "1"},"unit_price": {"N": "-297"}}}]},"updated_at": {"S": "2025-07-16T15:54:27.154404-03:00"}}
{"id": {"S": "JeCybf6oiv6CF3PchhBqdG"},"sk": {"S": "generated_items"},"create_date": {"S": "2025-07-16T15:54:30.160729-03:00"},"scope": {"S": "MULTI_USER"},"status": {"S": "SUCCESS"},"update_date": {"S": "2025-07-16T15:54:33.674670-03:00"}} {"id": {"S": "JeCybf6oiv6CF3PchhBqdG"},"sk": {"S": "generated_items"},"create_date": {"S": "2025-07-16T15:54:30.160729-03:00"},"scope": {"S": "MULTI_USER"},"status": {"S": "SUCCESS"},"update_date": {"S": "2025-07-16T15:54:33.674670-03:00"}}
{"id": {"S": "a955518e-ebcb-4441-b914-ddc9ecef84f0"},"sk": {"S": "0"},"access_period": {"N": "360"},"cert": {"M": {"exp_interval": {"N": "360"}}},"created_at": {"S": "2025-07-14T15:09:18.559528-03:00"},"metadata__konviva_class_id": {"N": "281"},"name": {"S": "NR-11 Operador de Munck"},"tenant_id": {"S": "*"}} {"id": {"S": "a955518e-ebcb-4441-b914-ddc9ecef84f0"},"sk": {"S": "0"},"access_period": {"N": "360"},"cert": {"M": {"exp_interval": {"N": "360"}}},"created_at": {"S": "2025-07-14T15:09:18.559528-03:00"},"metadata__konviva_class_id": {"N": "281"},"name": {"S": "NR-11 Operador de Munck"},"tenant_id": {"S": "*"}}
{"id": {"S": "6a403773-aeac-4e6a-ac39-dc958e4be52a"},"sk": {"S": "0"},"access_period": {"N": "360"},"cert": {"M": {"exp_interval": {"N": "360"}}},"created_at": {"S": "2025-07-14T15:09:18.559528-03:00"},"metadata__konviva_class_id": {"N": "281"},"name": {"S": "Reciclagem em NR-11 - Operador de Empilhadeira"},"tenant_id": {"S": "*"}}
{"id": {"S": "123"},"sk": {"S": "0"},"access_period": {"N": "360"},"cert": {"M": {"exp_interval": {"N": "360"}}},"created_at": {"S": "2025-07-14T15:09:18.559528-03:00"},"metadata__konviva_class_id": {"N": "281"},"name": {"S": "pytest"},"tenant_id": {"S": "*"}} {"id": {"S": "123"},"sk": {"S": "0"},"access_period": {"N": "360"},"cert": {"M": {"exp_interval": {"N": "360"}}},"created_at": {"S": "2025-07-14T15:09:18.559528-03:00"},"metadata__konviva_class_id": {"N": "281"},"name": {"S": "pytest"},"tenant_id": {"S": "*"}}
{"id": {"S": "5OxmMjL-ujoR5IMGegQz"},"sk": {"S": "konviva"},"created_at": {"S": "2025-07-11T13:52:35.521154-03:00"},"konvivaId": {"N": "26943"}} {"id": {"S": "5OxmMjL-ujoR5IMGegQz"},"sk": {"S": "konviva"},"created_at": {"S": "2025-07-11T13:52:35.521154-03:00"},"konvivaId": {"N": "26943"}}
{"id": {"S": "cpYSbBcie2NDbZhDKCxCih"}, "sk": {"S": "0"},"cpf": {"S": "02713421535"},"create_date": {"S": "2025-07-21T16:19:43.297712-03:00"},"due_date": {"S": "2025-07-22T16:13:41.056000-03:00"},"email": {"S": "sergio@somosbeta.com.br"},"name": {"S": "Sérgio Rafael Siqueira"},"payment_date": {"S": "2025-07-21T16:21:47.161889-03:00"},"payment_method": {"S": "PIX"},"phone_number": {"S": "+5574998189595"},"status": {"S": "PAID"},"total": {"N": "99"},"update_date": {"S": "2025-07-21T16:21:47.161889-03:00"}}
{"id": {"S": "cpYSbBcie2NDbZhDKCxCih"}, "sk": {"S": "assignees#5OxmMjL-ujoR5IMGegQz"},"create_date": {"S": "2025-07-21T16:20:43.631344-03:00"},"scope": {"S": "USER"}}
{"id": {"S": "cpYSbBcie2NDbZhDKCxCih"}, "sk": {"S": "items"},"items": {"L": [{"M": {"id": {"S": "6a403773-aeac-4e6a-ac39-dc958e4be52a"},"name": {"S": "Reciclagem em NR-11 - Operador de Empilhadeira"},"quantity": {"N": "1"},"unit_price": {"N": "99"}}}]}}
{"id": {"S": "cpYSbBcie2NDbZhDKCxCih"}, "sk": {"S": "generated_items"},"create_date": {"S": "2025-07-21T16:21:50.143551-03:00"},"scope": {"S": "SINGLE_USER"},"status": {"S": "SUCCESS"},"update_date": {"S": "2025-07-21T16:21:53.994941-03:00"}}