Files
saladeaula.digital/users-events/tests/events/batch/test_chunks_into_users.py

48 lines
1.3 KiB
Python

import pprint
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair
import events.batch.chunks_into_users as app
def test_chunk_csv(
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
lambda_context,
):
pk = 'BATCH_JOB#ORG#1411844c-10d6-456e-959d-e91775145461'
file_sk = 'FILE#2025-11-13T16:04:53.024743'
event = {
'detail': {
'new_image': {
'id': pk,
'sk': 'CHUNK#START#0#END#4885',
'weight': 100,
'created_at': '2025-11-20T19:00:41.896001-03:00',
'file_sk': file_sk,
's3_uri': 's3://saladeaula.digital/samples/users.csv',
'columns': {
'1:name',
'2:email',
'3:cpf',
},
'org': {
'id': '1411844c-10d6-456e-959d-e91775145461',
'name': 'EDUSEG',
'cnpj': '15608435000190',
},
},
},
}
assert app.lambda_handler(event, lambda_context) # type: ignore
r = dynamodb_persistence_layer.collection.query(
KeyPair(
pk=pk,
sk=f'REPORTING#{file_sk}',
),
limit=100,
)
pprint.pp(r['items'])
assert 26 == len(r['items'])