wip
This commit is contained in:
5
batch-jobs/Makefile
Normal file
5
batch-jobs/Makefile
Normal file
@@ -0,0 +1,5 @@
|
||||
build:
|
||||
sam build --use-container
|
||||
|
||||
deploy: build
|
||||
sam deploy --debug
|
||||
83
batch-jobs/app/csv_utils.py
Normal file
83
batch-jobs/app/csv_utils.py
Normal file
@@ -0,0 +1,83 @@
|
||||
import csv
|
||||
from typing import TextIO
|
||||
|
||||
from smart_open import open
|
||||
|
||||
|
||||
def byte_ranges(
|
||||
csvfile: str,
|
||||
chunk_size: int = 100,
|
||||
**kwargs,
|
||||
) -> list[tuple[int, int]]:
|
||||
"""Compute byte ranges for reading a CSV file in fixed-size line chunks.
|
||||
|
||||
Returns pairs (start_byte, end_byte) for each fixed-size group of lines.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
csvfile : str
|
||||
Path to the CSV file, opened in binary mode internally.
|
||||
chunk_size : int, optional
|
||||
Number of lines per chunk. Default is 100.
|
||||
**kwargs :
|
||||
Extra options passed to `open()`, e.g., buffering.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list of tuple[int, int]
|
||||
Byte ranges covering each chunk of lines.
|
||||
|
||||
Example
|
||||
-------
|
||||
>>> byte_ranges("users.csv", chunk_size=500)
|
||||
[(0, 3125), (3126, 6150), (6151, 9124)]
|
||||
"""
|
||||
line_offsets = [0]
|
||||
|
||||
with open(csvfile, 'rb', **kwargs) as fp:
|
||||
while True:
|
||||
if not fp.readline():
|
||||
break
|
||||
line_offsets.append(fp.tell())
|
||||
|
||||
total_lines = len(line_offsets) - 1
|
||||
byte_ranges = []
|
||||
|
||||
for start_line in range(1, total_lines + 1, chunk_size):
|
||||
# Calculate the end line index, bounded by total lines
|
||||
end_line = min(start_line + chunk_size - 1, total_lines)
|
||||
# Get byte range for this chunk
|
||||
start_byte = line_offsets[start_line - 1]
|
||||
end_byte = line_offsets[end_line] - 1
|
||||
|
||||
byte_ranges.append((start_byte, end_byte))
|
||||
|
||||
return byte_ranges
|
||||
|
||||
|
||||
def detect_delimiter(sample: TextIO) -> str:
|
||||
"""Detect the delimiter character used in a CSV file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
sample : TextIO
|
||||
A file-like object opened in text mode (e.g., from `open('file.csv')`).
|
||||
Must be readable and at position 0.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
The detected delimiter character (e.g., ',', ';', '\\t').
|
||||
|
||||
Raises
|
||||
------
|
||||
csv.Error
|
||||
If the file cannot be parsed as CSV or delimiter detection fails.
|
||||
ValueError
|
||||
If the file is empty or contains no detectable delimiter.
|
||||
"""
|
||||
sniffer = csv.Sniffer()
|
||||
dialect = sniffer.sniff(sample.read())
|
||||
sample.seek(0)
|
||||
|
||||
return dialect.delimiter
|
||||
0
batch-jobs/app/events/__init__.py
Normal file
0
batch-jobs/app/events/__init__.py
Normal file
20
batch-jobs/app/events/csv_chunks.py
Normal file
20
batch-jobs/app/events/csv_chunks.py
Normal file
@@ -0,0 +1,20 @@
|
||||
import boto3
|
||||
from aws_lambda_powertools.utilities.data_classes import (
|
||||
EventBridgeEvent,
|
||||
event_source,
|
||||
)
|
||||
from aws_lambda_powertools.utilities.typing import LambdaContext
|
||||
|
||||
from csv_utils import byte_ranges
|
||||
|
||||
CHUNK_SIZE = 50
|
||||
s3_client = boto3.client('s3')
|
||||
|
||||
|
||||
@event_source(data_class=EventBridgeEvent)
|
||||
def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool:
|
||||
new_image = event.detail['new_image']
|
||||
csvfile = new_image['csv_s3uri']
|
||||
pairs = byte_ranges(csvfile, CHUNK_SIZE)
|
||||
|
||||
return True
|
||||
47
batch-jobs/app/events/read_chunk.py
Normal file
47
batch-jobs/app/events/read_chunk.py
Normal file
@@ -0,0 +1,47 @@
|
||||
import csv
|
||||
from io import StringIO
|
||||
|
||||
import boto3
|
||||
from aws_lambda_powertools.utilities.data_classes import (
|
||||
EventBridgeEvent,
|
||||
event_source,
|
||||
)
|
||||
from aws_lambda_powertools.utilities.typing import LambdaContext
|
||||
|
||||
from csv_utils import byte_ranges
|
||||
|
||||
CHUNK_SIZE = 50
|
||||
s3_client = boto3.client('s3')
|
||||
|
||||
|
||||
@event_source(data_class=EventBridgeEvent)
|
||||
def lambda_handler(event: EventBridgeEvent, context: LambdaContext) -> bool:
|
||||
new_image = event.detail['new_image']
|
||||
csvfile = new_image['csv_s3uri']
|
||||
*_, pair = byte_ranges(csvfile, CHUNK_SIZE)
|
||||
|
||||
data = get_object_range(csvfile, pair[0], pair[1], s3_client=s3_client)
|
||||
reader = csv.reader(data)
|
||||
|
||||
for x in reader:
|
||||
print(x)
|
||||
return True
|
||||
|
||||
|
||||
def get_object_range(
|
||||
s3_uri: str,
|
||||
start_byte: int,
|
||||
end_byte: int = -1,
|
||||
*,
|
||||
s3_client,
|
||||
) -> StringIO:
|
||||
bucket, key = s3_uri.replace('s3://', '').split('/', 1)
|
||||
range_ = f'bytes={start_byte}-{end_byte}' if end_byte else f'bytes={start_byte}-'
|
||||
|
||||
response = s3_client.get_object(
|
||||
Bucket=bucket,
|
||||
Key=key,
|
||||
Range=range_,
|
||||
)
|
||||
|
||||
return StringIO(response['Body'].read().decode('utf-8'))
|
||||
32
batch-jobs/pyproject.toml
Normal file
32
batch-jobs/pyproject.toml
Normal file
@@ -0,0 +1,32 @@
|
||||
[project]
|
||||
name = "batch-jobs"
|
||||
version = "0.1.0"
|
||||
description = ""
|
||||
readme = ""
|
||||
requires-python = ">=3.13"
|
||||
dependencies = ["layercake"]
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"duckdb>=1.2.2",
|
||||
"pytest>=8.3.4",
|
||||
"pytest-cov>=6.0.0",
|
||||
"ruff>=0.9.1",
|
||||
]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
pythonpath = ["app/"]
|
||||
addopts = "--cov --cov-report html -v"
|
||||
|
||||
[tool.ruff]
|
||||
target-version = "py311"
|
||||
src = ["app"]
|
||||
|
||||
[tool.ruff.format]
|
||||
quote-style = "single"
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["E", "F", "I"]
|
||||
|
||||
[tool.uv.sources]
|
||||
layercake = { path = "../layercake" }
|
||||
3
batch-jobs/pyrightconfig.json
Normal file
3
batch-jobs/pyrightconfig.json
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"extraPaths": ["app/"]
|
||||
}
|
||||
9
batch-jobs/samconfig.toml
Normal file
9
batch-jobs/samconfig.toml
Normal file
@@ -0,0 +1,9 @@
|
||||
version = 0.1
|
||||
[default.deploy.parameters]
|
||||
stack_name = "saladeaula-batch-jobs"
|
||||
resolve_s3 = true
|
||||
s3_prefix = "batchjobs"
|
||||
region = "sa-east-1"
|
||||
confirm_changeset = false
|
||||
capabilities = "CAPABILITY_IAM"
|
||||
image_repositories = []
|
||||
44
batch-jobs/template.yaml
Normal file
44
batch-jobs/template.yaml
Normal file
@@ -0,0 +1,44 @@
|
||||
AWSTemplateFormatVersion: 2010-09-09
|
||||
Transform: AWS::Serverless-2016-10-31
|
||||
|
||||
Globals:
|
||||
Function:
|
||||
CodeUri: app/
|
||||
Runtime: python3.13
|
||||
Tracing: Active
|
||||
Architectures:
|
||||
- x86_64
|
||||
Layers:
|
||||
- !Sub arn:aws:lambda:sa-east-1:336641857101:layer:layercake:53
|
||||
Environment:
|
||||
Variables:
|
||||
TZ: America/Sao_Paulo
|
||||
LOG_LEVEL: DEBUG
|
||||
POWERTOOLS_LOGGER_SAMPLE_RATE: 0.1
|
||||
POWERTOOLS_LOGGER_LOG_EVENT: true
|
||||
|
||||
Resources:
|
||||
EventLog:
|
||||
Type: AWS::Logs::LogGroup
|
||||
Properties:
|
||||
RetentionInDays: 90
|
||||
|
||||
EventCsvChunksFunction:
|
||||
Type: AWS::Serverless::Function
|
||||
Properties:
|
||||
Handler: events.csv_chunks.lambda_handler
|
||||
LoggingConfig:
|
||||
LogGroup: !Ref EventLog
|
||||
Policies:
|
||||
- S3CrudPolicy:
|
||||
BucketName: saladeaula.digital
|
||||
Events:
|
||||
DynamoDBEvent:
|
||||
Type: EventBridgeRule
|
||||
Properties:
|
||||
Pattern:
|
||||
resources: [betaeducacao-prod-users_d2o3r5gmm4it7j]
|
||||
detail:
|
||||
new_image:
|
||||
sk:
|
||||
- prefix: batch_jobs#
|
||||
0
batch-jobs/tests/__init__.py
Normal file
0
batch-jobs/tests/__init__.py
Normal file
16
batch-jobs/tests/conftest.py
Normal file
16
batch-jobs/tests/conftest.py
Normal file
@@ -0,0 +1,16 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@dataclass
|
||||
class LambdaContext:
|
||||
function_name: str = 'test'
|
||||
memory_limit_in_mb: int = 128
|
||||
invoked_function_arn: str = 'arn:aws:lambda:eu-west-1:809313241:function:test'
|
||||
aws_request_id: str = '52fdfc07-2182-154f-163f-5f0f9a621d72'
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def lambda_context() -> LambdaContext:
|
||||
return LambdaContext()
|
||||
0
batch-jobs/tests/events/__init__.py
Normal file
0
batch-jobs/tests/events/__init__.py
Normal file
13
batch-jobs/tests/events/test_csv_chunks.py
Normal file
13
batch-jobs/tests/events/test_csv_chunks.py
Normal file
@@ -0,0 +1,13 @@
|
||||
import events.csv_chunks as app
|
||||
|
||||
|
||||
def test_csv_chunks(lambda_context):
|
||||
event = {
|
||||
'detail': {
|
||||
'new_image': {
|
||||
'csv_s3uri': 's3://saladeaula.digital/samples/large_users.csv',
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
app.lambda_handler(event, lambda_context)
|
||||
3286
batch-jobs/tests/samples/large_users.csv
Normal file
3286
batch-jobs/tests/samples/large_users.csv
Normal file
File diff suppressed because it is too large
Load Diff
28
batch-jobs/tests/samples/users.csv
Normal file
28
batch-jobs/tests/samples/users.csv
Normal file
@@ -0,0 +1,28 @@
|
||||
CADASTRO DE COLABORADOR,,,,
|
||||
,NOME COMPLETO,EMAIL (letra minúscula),CPF,TREINAMENTO
|
||||
,ANDRE HENRIQUE LOPES ZAFALON,henrique.zafalon@fanucamerica.com,261.955.138-22,NR-35 (RECICLAGEM)
|
||||
,SERGIO DA SILVA CUPERTINO,sergio.cupertino@fanucamerica.com,066.945.708-64,NR-10 (RECICLAGEM)
|
||||
,SERGIO DA SILVA CUPERTINO,sergio.cupertino@fanucamerica.com,066.945.708-64,NR-35 (RECICLAGEM)
|
||||
,ROVANE CAMPOS,rovane.campos@fanucamerica.com,095.958.578-82,NR-10 (RECICLAGEM)
|
||||
,ROVANE CAMPOS,rovane.campos@fanucamerica.com,095.958.578-82,NR-35 (RECICLAGEM)
|
||||
,MARCIO ATSUSHI KANEKO MASUDA,marcio.masuda@fanucamerica.com,293.042.798-10,NR-10 (RECICLAGEM)
|
||||
,FABIO AKIRA HARAGUCHI,fabio.haraguchi@fanucamerica.com,287.018.428-03,NR-10 (RECICLAGEM)
|
||||
,EMIDIO YOITI MOCHIZUKI,emidio.mochizuki@fanucamerica.com,268.579.208-26,NR-10 (RECICLAGEM)
|
||||
,EMIDIO YOITI MOCHIZUKI,emidio.mochizuki@fanucamerica.com,268.579.208-26,NR-35 (RECICLAGEM)
|
||||
,ERIC HIDEKI MORIKIO,eric.morikio@fanucamerica.com,417.359.838-61,NR-10 (RECICLAGEM)
|
||||
,HENRIQUE DE FIGUEIREDO BASTOS FERRAZ,henrique.ferraz@fanucamerica.com,417.059.788-51,NR-10 (RECICLAGEM)
|
||||
,LAYS MORETTI DA SILVA,lays.silva@fanucamerica.com,013.107.662-07,NR-10 (RECICLAGEM)
|
||||
,LAYS MORETTI DA SILVA,lays.silva@fanucamerica.com,013.107.662-07,NR-12
|
||||
,ANDRE DE SOUZA,andre.souza@fanucamerica.com,290.688.648-31,NR-10 (RECICLAGEM)
|
||||
,ANDRE DE SOUZA,andre.souza@fanucamerica.com,290.688.648-31,NR-12
|
||||
,RAFAEL TOSHIO BURATO MAEDA,rafael.maeda@fanucamerica.com,394.153.268-59,NR-10 (RECICLAGEM)
|
||||
,RAFAEL TOSHIO BURATO MAEDA,rafael.maeda@fanucamerica.com,394.153.268-59,NR-12
|
||||
,RAFAEL TOSHIO BURATO MAEDA,rafael.maeda@fanucamerica.com,394.153.268-59,NR-35 (RECICLAGEM)
|
||||
,RICARDO GALLES BONET,ricardo.bonet@fanucamerica.com,424.430.528-93,NR-10 (RECICLAGEM)
|
||||
,RULIO SIEFERT SERA,rulio.sera@fanucamerica.com,063.916.859-08,NR-10 (RECICLAGEM)
|
||||
,MACIEL FERREIRA BOMFIM,maciel.bomfim@fanucamerica.com,334.547.088-85,NR-10 (RECICLAGEM)
|
||||
,JAIME EDUARDO GALVEZ AVILES,jaime.galvez@fanucamerica.com,280.238.818-50,NR-12
|
||||
,JAIME EDUARDO GALVEZ AVILES,jaime.galvez@fanucamerica.com,280.238.818-50,NR-35 (RECICLAGEM)
|
||||
,HIGOR MACHADO SILVA,higor.silva@fanucamerica.com,419.879.878-88,NR-12
|
||||
,LÁZARO SOUZA DIAS,lazaro.dias@fanucamerica.com,067.179.825-19,NR-12
|
||||
,JOÃO PEDRO AGUIAR GALASSO,joao.pedro@fanucamerica.com,570.403.588-40,NR-12
|
||||
|
27
batch-jobs/tests/test_csv_utils.py
Normal file
27
batch-jobs/tests/test_csv_utils.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from csv_utils import byte_ranges, detect_delimiter
|
||||
|
||||
|
||||
def test_detect_delimiter():
|
||||
with open('tests/samples/users.csv') as fp:
|
||||
assert detect_delimiter(fp) == ','
|
||||
|
||||
|
||||
def test_byte_ranges():
|
||||
csvpath = 'tests/samples/users.csv'
|
||||
ranges = byte_ranges(csvpath, 10)
|
||||
*_, pair = ranges
|
||||
start_byte, end_byte = pair
|
||||
|
||||
expected = """,RICARDO GALLES BONET,ricardo.bonet@fanucamerica.com,424.430.528-93,NR-10 (RECICLAGEM)
|
||||
,RULIO SIEFERT SERA,rulio.sera@fanucamerica.com,063.916.859-08,NR-10 (RECICLAGEM)
|
||||
,MACIEL FERREIRA BOMFIM,maciel.bomfim@fanucamerica.com,334.547.088-85,NR-10 (RECICLAGEM)
|
||||
,JAIME EDUARDO GALVEZ AVILES,jaime.galvez@fanucamerica.com,280.238.818-50,NR-12
|
||||
,JAIME EDUARDO GALVEZ AVILES,jaime.galvez@fanucamerica.com,280.238.818-50,NR-35 (RECICLAGEM)
|
||||
,HIGOR MACHADO SILVA,higor.silva@fanucamerica.com,419.879.878-88,NR-12
|
||||
,LÁZARO SOUZA DIAS,lazaro.dias@fanucamerica.com,067.179.825-19,NR-12
|
||||
,JOÃO PEDRO AGUIAR GALASSO,joao.pedro@fanucamerica.com,570.403.588-40,NR-12"""
|
||||
|
||||
with open(csvpath, 'rb') as f:
|
||||
f.seek(start_byte)
|
||||
data = f.read(end_byte - start_byte + 1)
|
||||
assert data.decode('utf-8') == expected
|
||||
1057
batch-jobs/uv.lock
generated
Normal file
1057
batch-jobs/uv.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user