import csv import secrets from io import StringIO from types import SimpleNamespace from typing import TYPE_CHECKING from uuid import uuid4 from aws_lambda_powertools.utilities.data_classes import ( EventBridgeEvent, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext from layercake.batch import BatchProcessor from layercake.dateutils import now, ttl from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair from layercake.extra_types import CnpjStr, CpfStr, NameStr from pydantic import BaseModel, EmailStr, Field from boto3clients import dynamodb_client, s3_client from config import USER_TABLE if TYPE_CHECKING: from mypy_boto3_s3.client import S3Client else: S3Client = object dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) transport_params = {'client': s3_client} processor = BatchProcessor() class Org(BaseModel): id: str | None = Field(default=None, exclude=True) name: str cnpj: CnpjStr class User(BaseModel): name: NameStr cpf: CpfStr email: EmailStr class CPFConflictError(Exception): ... class EmailConflictError(Exception): ... @event_source(data_class=EventBridgeEvent) def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: new_image = event.detail['new_image'] csvfile = new_image['s3_uri'] _, _, start_byte, _, end_byte = new_image['sk'].split('#') header = SimpleNamespace( **{ column_name: int(idx) for idx, column_name in ( column.split(':') for column in new_image['columns'] ) } ) data = _get_s3_object_range( csvfile, start_byte=start_byte, end_byte=end_byte, s3_client=s3_client, ) reader = csv.reader(data) users = [ { 'name': row[header.name], 'email': row[header.email], 'cpf': row[header.cpf], } for row in reader ] ctx = {'org': new_image['org']} # Key pattern `FILE#{file}` sk = new_image['file_sk'] with ( dyn.transact_writer() as transact, processor(records=users, handler=_create_user, context=ctx) as batch, ): result = batch.process() for r in result: transact.put( item={ 'id': new_image['id'], 'sk': f'REPORTING#{sk}#ITEM#{secrets.token_urlsafe(16)}', 'input': r.input_record, 'status': r.status.value.upper(), 'error': r.cause.get('type') if r.cause else None, } ) transact.update( key=KeyPair( pk=new_image['id'], sk=sk, ), update_expr='ADD progress :progress', expr_attr_values={ ':progress': new_image['weight'], }, ) transact.delete( key=KeyPair( pk=new_image['id'], sk=new_image['sk'], ) ) return True def _create_user(rawuser: dict, context: dict) -> None: now_ = now() user_id = uuid4() org = Org(**context['org']) user = User(**rawuser) with dyn.transact_writer() as transact: transact.put( item=user.model_dump() | { 'id': user_id, 'sk': '0', 'email_verified': False, 'tenant_id': {org.id}, # Post-migration (users): uncomment the folloing line # 'org_id': {org.id}, # 'created_at': now_, 'createDate': now_, }, ) transact.put( item={ 'id': user_id, 'sk': 'NEVER_LOGGED', 'created_at': now_, } ) transact.put( item={ 'id': user_id, # Post-migration (users): rename `emails` to `EMAIL` 'sk': f'emails#{user.email}', 'email_verified': False, 'email_primary': True, 'mx_record_exists': False, 'created_at': now_, } ) transact.put( item={ 'id': user_id, 'sk': f'EMAIL_VERIFICATION#{uuid4()}', 'fresh_user': True, 'name': user.name, 'email': user.email, 'org_name': org.name, 'ttl': ttl(start_dt=now_, days=30), 'created_at': now_, } ) transact.put( item={ # Post-migration (users): rename `cpf` to `CPF` 'id': 'cpf', 'sk': user.cpf, 'user_id': user_id, 'created_at': now_, }, cond_expr='attribute_not_exists(sk)', exc_cls=CPFConflictError, ) transact.put( item={ # Post-migration (users): rename `email` to `EMAIL` 'id': 'email', 'sk': user.email, 'user_id': user_id, 'created_at': now_, }, cond_expr='attribute_not_exists(sk)', exc_cls=EmailConflictError, ) transact.put( item={ 'id': user_id, 'sk': f'orgs#{org.id}', # Post-migration (users): uncomment the following line # pk=f'ORG#{org.id}', 'name': org.name, 'cnpj': org.cnpj, 'created_at': now_, } ) transact.put( item={ 'id': f'orgmembers#{org.id}', # Post-migration (users): uncomment the following line # pk=f'MEMBER#ORG#{org_id}', 'sk': user_id, 'created_at': now_, } ) def _get_s3_object_range( s3_uri: str, *, start_byte: int, end_byte: int, s3_client: S3Client, ) -> StringIO: bucket, key = s3_uri.replace('s3://', '').split('/', 1) r = s3_client.get_object( Bucket=bucket, Key=key, Range=f'bytes={start_byte}-{end_byte}', ) return StringIO(r['Body'].read().decode('utf-8'))