Files

59 lines
2.0 KiB
Python

from decimal import Decimal
from aws_lambda_powertools.utilities.data_classes import (
EventBridgeEvent,
event_source,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from layercake.dateutils import now
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair
from boto3clients import dynamodb_client, s3_client
from config import CHUNK_SIZE, USER_TABLE
from csv_utils import byte_ranges
dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
transport_params = {'client': s3_client}
@event_source(data_class=EventBridgeEvent)
def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool:
now_ = now()
new_image = event.detail['new_image']
csvfile = new_image['s3_uri']
chunks = byte_ranges(csvfile, CHUNK_SIZE, transport_params=transport_params)
total_chunks = len(chunks)
weight_per_chunk = round(100 / total_chunks, 2)
weights = [weight_per_chunk] * total_chunks
# Fix last value to balance total
weights[-1] = round(100 - sum(weights[:-1]), 2)
with dyn.transact_writer() as transact:
transact.update(
key=KeyPair(new_image['id'], new_image['sk']),
update_expr='SET total_chunks = :total_chunks, \
progress = :progress, \
started_at = :now',
expr_attr_values={
':total_chunks': total_chunks,
':progress': 0,
':now': now_,
},
)
for (start, end), weight in zip(chunks, weights):
transact.put(
item={
'id': new_image['id'],
'sk': f'CHUNK#START#{start}#END#{end}',
'file_sk': new_image['sk'],
's3_uri': new_image['s3_uri'],
'columns': new_image['columns'],
'weight': Decimal(str(weight)),
'org': new_image['org'],
'created_at': now_,
}
)
return True