Files
saladeaula.digital/users-events/app/events/batch/chunks_into_users.py

222 lines
5.9 KiB
Python

import csv
import secrets
from io import StringIO
from types import SimpleNamespace
from typing import TYPE_CHECKING
from uuid import uuid4
from aws_lambda_powertools.utilities.data_classes import (
EventBridgeEvent,
event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from layercake.batch import BatchProcessor
from layercake.dateutils import now, ttl
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair
from layercake.extra_types import CnpjStr, CpfStr, NameStr
from pydantic import BaseModel, EmailStr, Field
from boto3clients import dynamodb_client, s3_client
from config import USER_TABLE
if TYPE_CHECKING:
from mypy_boto3_s3.client import S3Client
else:
S3Client = object
dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
transport_params = {'client': s3_client}
processor = BatchProcessor()
class Org(BaseModel):
id: str | None = Field(default=None, exclude=True)
name: str
cnpj: CnpjStr
class User(BaseModel):
name: NameStr
cpf: CpfStr
email: EmailStr
class CPFConflictError(Exception): ...
class EmailConflictError(Exception): ...
@event_source(data_class=EventBridgeEvent)
def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool:
new_image = event.detail['new_image']
csvfile = new_image['s3_uri']
_, _, start_byte, _, end_byte = new_image['sk'].split('#')
header = SimpleNamespace(
**{
column_name: int(idx)
for idx, column_name in (
column.split(':') for column in new_image['columns']
)
}
)
data = _get_s3_object_range(
csvfile,
start_byte=start_byte,
end_byte=end_byte,
s3_client=s3_client,
)
reader = csv.reader(data)
users = [
{
'name': row[header.name],
'email': row[header.email],
'cpf': row[header.cpf],
}
for row in reader
]
ctx = {'org': new_image['org']}
# Key pattern `FILE#{file}`
sk = new_image['file_sk']
with (
dyn.transact_writer() as transact,
processor(records=users, handler=_create_user, context=ctx) as batch,
):
result = batch.process()
for r in result:
transact.put(
item={
'id': new_image['id'],
'sk': f'REPORTING#{sk}#ITEM#{secrets.token_urlsafe(16)}',
'input': r.input_record,
'status': r.status.value.upper(),
'error': r.cause.get('type') if r.cause else None,
}
)
transact.update(
key=KeyPair(
pk=new_image['id'],
sk=sk,
),
update_expr='ADD progress :progress',
expr_attr_values={
':progress': new_image['weight'],
},
)
transact.delete(
key=KeyPair(
pk=new_image['id'],
sk=new_image['sk'],
)
)
return True
def _create_user(rawuser: dict, context: dict) -> None:
now_ = now()
user_id = uuid4()
org = Org(**context['org'])
user = User(**rawuser)
with dyn.transact_writer() as transact:
transact.put(
item={
**user.model_dump(),
'id': user_id,
'sk': '0',
'email_verified': False,
'tenant_id': {org.id},
# Post-migration: uncomment the folloing line
# 'org_id': {org.id},
'created_at': now_,
},
)
transact.put(
item={
'id': user_id,
# Post-migration: rename `emails` to `EMAIL`
'sk': f'emails#{user.email}',
'email_verified': False,
'email_primary': True,
'mx_record_exists': False,
'created_at': now_,
}
)
transact.put(
item={
'id': user_id,
'sk': f'EMAIL_VERIFICATION#{uuid4()}',
'fresh_user': True,
'name': user.name,
'email': user.email,
'email_primary': True,
'org_name': org.name,
'ttl': ttl(start_dt=now_, days=30),
'created_at': now_,
}
)
transact.put(
item={
# Post-migration: rename `cpf` to `CPF`
'id': 'cpf',
'sk': user.cpf,
'created_at': now_,
},
cond_expr='attribute_not_exists(sk)',
exc_cls=CPFConflictError,
)
transact.put(
item={
# Post-migration: rename `email` to `EMAIL`
'id': 'email',
'sk': user.email,
'created_at': now_,
},
cond_expr='attribute_not_exists(sk)',
exc_cls=EmailConflictError,
)
transact.put(
item={
'id': user_id,
'sk': f'orgs#{org.id}',
# Post-migration: uncomment the following line
# pk=f'ORG#{org.id}',
'name': org.name,
'cnpj': org.cnpj,
'created_at': now_,
}
)
transact.put(
item={
'id': f'orgmembers#{org.id}',
# Post-migration: uncomment the following line
# pk=f'MEMBER#ORG#{org_id}',
'sk': user_id,
'created_at': now_,
}
)
def _get_s3_object_range(
s3_uri: str,
*,
start_byte: int,
end_byte: int,
s3_client: S3Client,
) -> StringIO:
bucket, key = s3_uri.replace('s3://', '').split('/', 1)
r = s3_client.get_object(
Bucket=bucket,
Key=key,
Range=f'bytes={start_byte}-{end_byte}',
)
return StringIO(r['Body'].read().decode('utf-8'))