import csv from io import StringIO from typing import TYPE_CHECKING from aws_lambda_powertools.utilities.data_classes import ( EventBridgeEvent, event_source, ) from aws_lambda_powertools.utilities.typing import LambdaContext from boto3clients import s3_client if TYPE_CHECKING: from mypy_boto3_s3.client import S3Client else: S3Client = object transport_params = {'client': s3_client} @event_source(data_class=EventBridgeEvent) def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool: new_image = event.detail['new_image'] csvfile = new_image['s3_uri'] data = _get_s3_object_range( csvfile, start_byte=new_image['start_byte'], end_byte=new_image['end_byte'], s3_client=s3_client, ) reader = csv.reader(data) for x in reader: print(x) return True def _get_s3_object_range( s3_uri: str, *, start_byte: int, end_byte: int, s3_client: S3Client, ) -> StringIO: bucket, key = s3_uri.replace('s3://', '').split('/', 1) response = s3_client.get_object( Bucket=bucket, Key=key, Range=f'bytes={start_byte}-{end_byte}', ) return StringIO(response['Body'].read().decode('utf-8'))