from layercake.batch import BatchProcessor, Result, Status processor = BatchProcessor() def test_batch(): def record_handler(record: bool): if record: return True raise ValueError('Invalid record') records = ( True, True, False, ) with processor(records=records, handler=record_handler) as p: processed_messages = p.process() assert processed_messages == ( Result(status=Status.SUCCESS, input_record=True, output=True, cause=None), Result(status=Status.SUCCESS, input_record=True, output=True, cause=None), Result( status=Status.FAIL, input_record=False, output=None, cause={'type': 'ValueError', 'message': 'Invalid record'}, ), ) assert len(processed_messages) == 3 assert processor.successes == [True, True] assert processor.failures == [False] with processor(records=(False,), handler=record_handler): processed_messages = processor.process() assert processed_messages[0].status == Status.FAIL assert processor.successes == [] assert processor.failures == [False] def test_batch_context(): def record_handler(val: int, context: dict): return val * context['multiplier'] with processor( records=(2, 3, 4), handler=record_handler, context={'multiplier': 2}, ): processed_messages = processor.process() assert processed_messages == ( Result(status=Status.SUCCESS, input_record=2, output=4, cause=None), Result(status=Status.SUCCESS, input_record=3, output=6, cause=None), Result(status=Status.SUCCESS, input_record=4, output=8, cause=None), )