add datetable cnfcnpj
This commit is contained in:
207
users-events/app/events/batch/chunks_into_users.py
Normal file
207
users-events/app/events/batch/chunks_into_users.py
Normal file
@@ -0,0 +1,207 @@
|
||||
import csv
|
||||
import secrets
|
||||
from io import StringIO
|
||||
from types import SimpleNamespace
|
||||
from typing import TYPE_CHECKING
|
||||
from uuid import uuid4
|
||||
|
||||
from aws_lambda_powertools.utilities.data_classes import (
|
||||
EventBridgeEvent,
|
||||
event_source,
|
||||
)
|
||||
from aws_lambda_powertools.utilities.typing import LambdaContext
|
||||
from layercake.batch import BatchProcessor
|
||||
from layercake.dateutils import now
|
||||
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair
|
||||
from layercake.extra_types import CnpjStr, CpfStr, NameStr
|
||||
from pydantic import BaseModel, EmailStr, Field
|
||||
|
||||
from boto3clients import dynamodb_client, s3_client
|
||||
from config import USER_TABLE
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from mypy_boto3_s3.client import S3Client
|
||||
else:
|
||||
S3Client = object
|
||||
|
||||
|
||||
dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
|
||||
transport_params = {'client': s3_client}
|
||||
processor = BatchProcessor()
|
||||
|
||||
|
||||
class Org(BaseModel):
|
||||
id: str | None = Field(default=None, exclude=True)
|
||||
name: str
|
||||
cnpj: CnpjStr
|
||||
|
||||
|
||||
class User(BaseModel):
|
||||
name: NameStr
|
||||
cpf: CpfStr
|
||||
email: EmailStr
|
||||
|
||||
|
||||
class CPFConflictError(Exception): ...
|
||||
|
||||
|
||||
class EmailConflictError(Exception): ...
|
||||
|
||||
|
||||
@event_source(data_class=EventBridgeEvent)
|
||||
def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool:
|
||||
new_image = event.detail['new_image']
|
||||
csvfile = new_image['s3_uri']
|
||||
_, _, start_byte, _, end_byte = new_image['sk'].split('#')
|
||||
header = SimpleNamespace(
|
||||
**{
|
||||
column_name: int(idx)
|
||||
for idx, column_name in (
|
||||
column.split(':') for column in new_image['columns']
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
data = _get_s3_object_range(
|
||||
csvfile,
|
||||
start_byte=start_byte,
|
||||
end_byte=end_byte,
|
||||
s3_client=s3_client,
|
||||
)
|
||||
reader = csv.reader(data)
|
||||
users = [
|
||||
{
|
||||
'name': row[header.name],
|
||||
'email': row[header.email],
|
||||
'cpf': row[header.cpf],
|
||||
}
|
||||
for row in reader
|
||||
]
|
||||
ctx = {'org': new_image['org']}
|
||||
# Key pattern `FILE#{file}`
|
||||
sk = new_image['file_sk']
|
||||
|
||||
with (
|
||||
dyn.transact_writer() as transact,
|
||||
processor(records=users, handler=_create_user, context=ctx) as batch,
|
||||
):
|
||||
result = batch.process()
|
||||
|
||||
for r in result:
|
||||
transact.put(
|
||||
item={
|
||||
'id': new_image['id'],
|
||||
'sk': f'REPORTING#{sk}#ITEM#{secrets.token_urlsafe(16)}',
|
||||
'input': r.input_record,
|
||||
'status': r.status.value.upper(),
|
||||
'error': r.cause.get('type') if r.cause else None,
|
||||
}
|
||||
)
|
||||
|
||||
transact.update(
|
||||
key=KeyPair(
|
||||
pk=new_image['id'],
|
||||
sk=sk,
|
||||
),
|
||||
update_expr='ADD progress :progress',
|
||||
expr_attr_values={
|
||||
':progress': new_image['weight'],
|
||||
},
|
||||
)
|
||||
transact.delete(
|
||||
key=KeyPair(
|
||||
pk=new_image['id'],
|
||||
sk=new_image['sk'],
|
||||
)
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _create_user(rawuser: dict, context: dict) -> None:
|
||||
now_ = now()
|
||||
user_id = uuid4()
|
||||
org = Org(**context['org'])
|
||||
user = User(**rawuser)
|
||||
|
||||
with dyn.transact_writer() as transact:
|
||||
transact.put(
|
||||
item={
|
||||
**user.model_dump(),
|
||||
'id': user_id,
|
||||
'sk': '0',
|
||||
'email_verified': False,
|
||||
'tenant_id': {org.id},
|
||||
# Post-migration: uncomment the folloing line
|
||||
# 'org_id': {org.id},
|
||||
'created_at': now_,
|
||||
},
|
||||
)
|
||||
transact.put(
|
||||
item={
|
||||
'id': user_id,
|
||||
# Post-migration: rename `emails` to `EMAIL`
|
||||
'sk': f'emails#{user.email}',
|
||||
'email_verified': False,
|
||||
'email_primary': True,
|
||||
'created_at': now_,
|
||||
}
|
||||
)
|
||||
transact.put(
|
||||
item={
|
||||
# Post-migration: rename `cpf` to `CPF`
|
||||
'id': 'cpf',
|
||||
'sk': user.cpf,
|
||||
'created_at': now_,
|
||||
},
|
||||
cond_expr='attribute_not_exists(sk)',
|
||||
exc_cls=CPFConflictError,
|
||||
)
|
||||
transact.put(
|
||||
item={
|
||||
# Post-migration: rename `email` to `EMAIL`
|
||||
'id': 'email',
|
||||
'sk': user.email,
|
||||
'created_at': now_,
|
||||
},
|
||||
cond_expr='attribute_not_exists(sk)',
|
||||
exc_cls=EmailConflictError,
|
||||
)
|
||||
transact.put(
|
||||
item={
|
||||
'id': user_id,
|
||||
'sk': f'orgs#{org.id}',
|
||||
# Post-migration: uncomment the following line
|
||||
# pk=f'ORG#{org.id}',
|
||||
'name': org.name,
|
||||
'cnpj': org.cnpj,
|
||||
'created_at': now_,
|
||||
}
|
||||
)
|
||||
transact.put(
|
||||
item={
|
||||
'id': f'orgmembers#{org.id}',
|
||||
# Post-migration: uncomment the following line
|
||||
# pk=f'MEMBER#ORG#{org_id}',
|
||||
'sk': user_id,
|
||||
'created_at': now_,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _get_s3_object_range(
|
||||
s3_uri: str,
|
||||
*,
|
||||
start_byte: int,
|
||||
end_byte: int,
|
||||
s3_client: S3Client,
|
||||
) -> StringIO:
|
||||
bucket, key = s3_uri.replace('s3://', '').split('/', 1)
|
||||
|
||||
r = s3_client.get_object(
|
||||
Bucket=bucket,
|
||||
Key=key,
|
||||
Range=f'bytes={start_byte}-{end_byte}',
|
||||
)
|
||||
|
||||
return StringIO(r['Body'].read().decode('utf-8'))
|
||||
Reference in New Issue
Block a user