Files
saladeaula.digital/layercake/layercake/batch.py
2025-05-30 18:18:42 -03:00

139 lines
3.9 KiB
Python

import inspect
import logging
import os
from contextlib import AbstractContextManager
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Self, Sequence
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=getattr(logging, LOG_LEVEL))
logger = logging.getLogger(__name__)
class Status(Enum):
SUCCESS = 'success'
FAIL = 'fail'
@dataclass(frozen=True)
class Result:
status: Status
input_record: Any
output: Any | None = None
cause: Any | None = None
class BatchProcessor(AbstractContextManager):
"""
This class provides a structured way to process a sequence of records,
capturing both successful and failed executions along with exception details.
Batch processing utility inspired by AWS Lambda Powertools for Python:
https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/batch/base.py
Attributes
----------
successes : list
List of successfully processed records.
failures : list
List of records that failed to process.
exceptions : list
List of exceptions raised during processing.
Examples
--------
>>> def handler(record, context):
... if record % 2 == 0:
... return record * 2
... else:
... raise ValueError("Odd number not allowed")
...
>>> records = [1, 2, 3, 4]
>>> processor = BatchProcessor()
>>> with processor(records, handler) as p:
... results = p.process()
...
>>> for result in results:
... print(result.status, result.cause, result.record)
Status.FAIL ValueError: Odd number not allowed 1
Status.SUCCESS 4 2
Status.FAIL ValueError: Odd number not allowed 3
Status.SUCCESS 8 4
"""
def __init__(self) -> None:
self.successes: list[Any] = []
self.failures: list[Any] = []
self.exceptions: list[BaseException] = []
def __call__(
self,
records: Sequence[Any],
handler: Callable[..., Any],
context: dict[str, Any] | None = None,
) -> Self:
self.records = records
self.handler = handler
if context is None:
self._handler_accepts_context = False
else:
self.context = context
self._handler_accepts_context = (
'context' in inspect.signature(self.handler).parameters
)
return self
def __enter__(self) -> Self:
"""Remove results from previous execution."""
self.successes.clear()
self.failures.clear()
self.exceptions.clear()
return self
def __exit__(self, *exc_details) -> None:
pass
def process(self) -> Sequence[Result]:
return tuple(self._process_record(record) for record in self.records)
def _process_record(self, record: Any) -> Result:
"""
Processes a single record using the handler function with optional context.
Returns a Result indicating success or failure.
"""
try:
if self._handler_accepts_context:
result = self.handler(record, self.context)
else:
result = self.handler(record)
self.successes.append(record)
return Result(
status=Status.SUCCESS,
output=result,
input_record=record,
)
except Exception as exc:
exc_str = f'{type(exc).__name__}: {exc}'
logger.debug(f'Record processing exception: {exc_str}')
self.exceptions.append(exc)
self.failures.append(record)
return Result(
status=Status.FAIL,
input_record=record,
cause={
'type': type(exc).__name__,
'message': str(exc),
},
)