from decimal import Decimal from aws_lambda_powertools.utilities.data_classes import ( EventBridgeEvent, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext from layercake.dateutils import now from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair from boto3clients import dynamodb_client, s3_client from config import CHUNK_SIZE, USER_TABLE from csv_utils import byte_ranges dyn = DynamoDBPersistenceLayer(USER_TABLE, dynamodb_client) transport_params = {'client': s3_client} @event_source(data_class=EventBridgeEvent) def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: now_ = now() new_image = event.detail['new_image'] csvfile = new_image['s3_uri'] chunks = byte_ranges(csvfile, CHUNK_SIZE, transport_params=transport_params) total_chunks = len(chunks) weight_per_chunk = round(100 / total_chunks, 2) weights = [weight_per_chunk] * total_chunks # Fix last value to balance total weights[-1] = round(100 - sum(weights[:-1]), 2) with dyn.transact_writer() as transact: transact.update( key=KeyPair(new_image['id'], new_image['sk']), update_expr='SET total_chunks = :total_chunks, \ progress = :progress, \ started_at = :now', expr_attr_values={ ':total_chunks': total_chunks, ':progress': 0, ':now': now_, }, ) for (start, end), weight in zip(chunks, weights): transact.put( item={ 'id': new_image['id'], 'sk': f'CHUNK#START#{start}#END#{end}', 'file_sk': new_image['sk'], 's3_uri': new_image['s3_uri'], 'columns': new_image['columns'], 'weight': Decimal(str(weight)), 'org': new_image['org'], 'created_at': now_, } ) return True