from dataclasses import asdict, dataclass from datetime import timedelta from enum import Enum from typing import Self, TypedDict from layercake.dateutils import now, ttl from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair from layercake.strutils import md5_hash from schemas import Enrollment Tenant = TypedDict('Tenant', {'id': str, 'name': str}) Author = TypedDict('Author', {'id': str, 'name': str}) DeduplicationWindow = TypedDict('DeduplicationWindow', {'offset_days': int}) class LinkedEntity(str): def __new__(cls, id: str, type: str) -> Self: return super().__new__(cls, '#'.join([type.lower(), id])) def __init__(self, id: str, type: str) -> None: # __init__ is used to store the parameters for later reference. # For immutable types like str, __init__ cannot change the instance's value. self.id = id self.type = type @dataclass(frozen=True) class Slot: id: str sk: str @property def order_id(self) -> LinkedEntity: idx, _ = self.sk.split('#') return LinkedEntity(idx, 'ORDER') class LifecycleEvents(str, Enum): """Lifecycle events related to scheduling actions.""" # Reminder if the user does not access within 3 days # REMINDER_NO_ACCESS_AFTER_3_DAYS = 'SCHEDULE#REMINDER_NO_ACCESS_AFTER_3_DAYS' DOES_NOT_ACCESS = 'schedules#does_not_access' # When there is no activity 7 days after the first access # REMINDER_NO_ACTIVITY_AFTER_7_DAYS = 'SCHEDULE#REMINDER_NO_ACTIVITY_AFTER_7_DAYS' NO_ACTIVITY = 'schedules#no_activity' # Reminder 30 days before the access period expires # REMINDER_ACCESS_PERIOD_BEFORE_30_DAYS = 'SCHEDULE#REMINDER_ACCESS_PERIOD_BEFORE_30_DAYS' ACCESS_PERIOD_ENDS = 'schedules#access_period_ends' # Reminder for certificate expiration set to 30 days from now REMINDER_CERT_EXPIRATION_BEFORE_30_DAYS = ( 'SCHEDULE#REMINDER_CERT_EXPIRATION_BEFORE_30_DAYS' ) # Archive the course after the certificate expires # SET_AS_ARCHIVE = 'SCHEDULE#SET_AS_ARCHIVE' ARCHIVE_IT = 'schedules#archive_it' # When the access period ends for a course without a certificate # SET_AS_EXPIRE = 'SCHEDULE#SET_AS_EXPIRE' EXPIRATION = 'schedules#expiration' class DeduplicationConflictError(Exception): def __init__(self, *args): super().__init__('Enrollment already exists') class SlotDoesNotExistError(Exception): def __init__(self, *args): super().__init__('Slot does not exist') def enroll( enrollment: Enrollment, *, slot: Slot | None = None, author: Author | None = None, linked_entities: frozenset[LinkedEntity] = frozenset(), deduplication_window: DeduplicationWindow | None = None, persistence_layer: DynamoDBPersistenceLayer, ) -> bool: """Enrolls a user into a course and schedules lifecycle events.""" now_ = now() user = enrollment.user course = enrollment.course lock_hash = md5_hash('%s%s' % (user.id, course.id)) with persistence_layer.transact_writer() as transact: if slot: linked_entities = frozenset({slot.order_id}) | linked_entities transact.put( item={ 'sk': '0', 'create_date': now_, # 'created_at': now_, **enrollment.model_dump(), }, ) transact.put( item={ 'id': enrollment.id, 'sk': 'metadata#course', 'created_at': now_, **course.model_dump(include={'cert', 'access_period'}), } ) transact.put( item={ 'id': enrollment.id, # Post-migration: uncomment the following line # 'sk': LifecycleEvents.REMINDER_NO_ACCESS_3_DAYS, 'sk': LifecycleEvents.DOES_NOT_ACCESS, 'name': user.name, 'email': user.email, 'course': course.name, 'created_at': now_, 'ttl': ttl(days=3, start_dt=now_), }, ) # Enrollment expires by default when the access period ends. # When the course is completed, it is automatically removed, # and the `SCHEDULE#SET_AS_ARCHIVE` event is created. transact.put( item={ 'id': enrollment.id, 'sk': LifecycleEvents.EXPIRATION, # Post-migration: uncomment the following line # 'sk': LifecycleEvents.COURSE_EXPIRED, 'name': user.name, 'email': user.email, 'course': course.name, 'created_at': now_, 'ttl': ttl(start_dt=now_ + timedelta(days=course.access_period)), }, ) transact.put( item={ 'id': enrollment.id, # Post-migration: uncomment the following line # 'sk': LifecycleEvents.ACCESS_PERIOD_REMINDER_30_DAYS, 'sk': LifecycleEvents.ACCESS_PERIOD_ENDS, 'name': user.name, 'email': user.email, 'course': course.name, 'created_at': now_, 'ttl': ttl(start_dt=now_ + timedelta(days=course.access_period - 30)), }, ) for entity in linked_entities: keyprefix = entity.type.lower() transact.put( item={ 'id': enrollment.id, 'sk': f'LINKED_ENTITIES#{entity.type}', 'created_at': now_, f'{keyprefix}_id': entity.id, } ) if slot: transact.put( item={ 'id': enrollment.id, # Post-migration: uncomment the following line # 'sk': 'METADATA#SOURCE_SLOT', 'sk': 'parent_vacancy', 'vacancy': asdict(slot), 'created_at': now_, } ) transact.delete( key=KeyPair(slot.id, slot.sk), cond_expr='attribute_exists(sk)', exc_cls=SlotDoesNotExistError, ) transact.put( item={ 'id': enrollment.id, 'sk': 'cancel_policy', 'created_at': now_, } ) if author: transact.put( item={ 'id': enrollment.id, 'sk': 'author', 'user_id': author['id'], 'name': author['name'], 'created_at': now_, }, ) # Prevents the user from enrolling in the same course again until # the deduplication window expires or is removed. if deduplication_window: offset_days = deduplication_window['offset_days'] ttl_expiration = ttl( start_dt=now_ + timedelta(days=course.access_period - offset_days) ) transact.put( item={ 'id': 'lock', 'sk': lock_hash, 'enrollment_id': enrollment.id, 'created_at': now_, 'ttl': ttl_expiration, }, cond_expr='attribute_not_exists(sk)', exc_cls=DeduplicationConflictError, ) transact.put( item={ 'id': enrollment.id, 'sk': 'lock', 'hash': lock_hash, 'created_at': now_, 'ttl': ttl_expiration, }, ) # Deduplication window can be recalculated if needed transact.put( item={ 'id': enrollment.id, 'sk': 'metadata#deduplication_window', 'offset_days': offset_days, 'created_at': now_, }, ) else: transact.condition( key=KeyPair('lock', lock_hash), cond_expr='attribute_not_exists(sk)', exc_cls=DeduplicationConflictError, ) return True