Files
saladeaula.digital/layercake/layercake/batch.py

124 lines
3.5 KiB
Python

import inspect
import logging
import os
from contextlib import AbstractContextManager
from enum import Enum
from typing import Any, Callable, NamedTuple, Self, Sequence
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
logging.basicConfig(level=getattr(logging, LOG_LEVEL))
logger = logging.getLogger(__name__)
class Status(Enum):
SUCCESS = 'success'
FAIL = 'fail'
class Result(NamedTuple):
status: Status
cause: Any
record: Any
class BatchProcessor(AbstractContextManager):
"""
This class provides a structured way to process a sequence of records,
capturing both successful and failed executions along with exception details.
Batch processing utility inspired by AWS Lambda Powertools for Python:
https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/batch/base.py
Attributes
----------
successes : list
List of successfully processed records.
failures : list
List of records that failed to process.
exceptions : list
List of exceptions raised during processing.
Examples
--------
>>> def handler(record, context):
... if record % 2 == 0:
... return record * 2
... else:
... raise ValueError("Odd number not allowed")
...
>>> records = [1, 2, 3, 4]
>>> processor = BatchProcessor()
>>> with processor(records, handler) as p:
... results = p.process()
...
>>> for result in results:
... print(result.status, result.cause, result.record)
Status.FAIL ValueError: Odd number not allowed 1
Status.SUCCESS 4 2
Status.FAIL ValueError: Odd number not allowed 3
Status.SUCCESS 8 4
"""
def __init__(self) -> None:
self.successes: list[Any] = []
self.failures: list[Any] = []
self.exceptions: list[BaseException] = []
def __call__(
self,
records: Sequence[Any],
handler: Callable[..., Any],
context: dict[str, Any] | None = None,
) -> Self:
self.records = records
self.handler = handler
if context is None:
self._handler_accepts_context = False
else:
self.context = context
self._handler_accepts_context = (
'context' in inspect.signature(self.handler).parameters
)
return self
def __enter__(self):
"""Remove results from previous execution."""
self.successes.clear()
self.failures.clear()
self.exceptions.clear()
return self
def __exit__(self, *exc_details) -> None:
pass
def process(self) -> Sequence[Result]:
return tuple(self._process_record(record) for record in self.records)
def _process_record(self, record: Any) -> Result:
"""
Processes a single record using the handler function with optional context.
Returns a Result indicating success or failure.
"""
try:
if self._handler_accepts_context:
result = self.handler(record, self.context)
else:
result = self.handler(record)
self.successes.append(record)
return Result(Status.SUCCESS, result, record)
except Exception as exc:
exc_str = f'{type(exc).__name__}: {exc}'
logger.debug(f'Record processing exception: {exc_str}')
self.exceptions.append(exc)
self.failures.append(record)
return Result(Status.FAIL, exc_str, record)