59 lines
2.0 KiB
Python
59 lines
2.0 KiB
Python
from decimal import Decimal
|
|
|
|
from aws_lambda_powertools.utilities.data_classes import (
|
|
EventBridgeEvent,
|
|
event_source,
|
|
)
|
|
from aws_lambda_powertools.utilities.typing import LambdaContext
|
|
from layercake.dateutils import now
|
|
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair
|
|
|
|
from boto3clients import dynamodb_client, s3_client
|
|
from config import CHUNK_SIZE, USER_TABLE
|
|
from csv_utils import byte_ranges
|
|
|
|
dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client)
|
|
transport_params = {'client': s3_client}
|
|
|
|
|
|
@event_source(data_class=EventBridgeEvent)
|
|
def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool:
|
|
now_ = now()
|
|
new_image = event.detail['new_image']
|
|
csvfile = new_image['s3_uri']
|
|
chunks = byte_ranges(csvfile, CHUNK_SIZE, transport_params=transport_params)
|
|
total_chunks = len(chunks)
|
|
weight_per_chunk = round(100 / total_chunks, 2)
|
|
weights = [weight_per_chunk] * total_chunks
|
|
# Fix last value to balance total
|
|
weights[-1] = round(100 - sum(weights[:-1]), 2)
|
|
|
|
with dyn.transact_writer() as transact:
|
|
transact.update(
|
|
key=KeyPair(new_image['id'], new_image['sk']),
|
|
update_expr='SET total_chunks = :total_chunks, \
|
|
progress = :progress, \
|
|
started_at = :now',
|
|
expr_attr_values={
|
|
':total_chunks': total_chunks,
|
|
':progress': 0,
|
|
':now': now_,
|
|
},
|
|
)
|
|
|
|
for (start, end), weight in zip(chunks, weights):
|
|
transact.put(
|
|
item={
|
|
'id': new_image['id'],
|
|
'sk': f'CHUNK#START#{start}#END#{end}',
|
|
'file_sk': new_image['sk'],
|
|
's3_uri': new_image['s3_uri'],
|
|
'columns': new_image['columns'],
|
|
'weight': Decimal(str(weight)),
|
|
'org': new_image['org'],
|
|
'created_at': now_,
|
|
}
|
|
)
|
|
|
|
return True
|