add batch_writer

This commit is contained in:
2025-05-25 12:01:15 -03:00
parent d6c4eba78e
commit 38de5d7494
2 changed files with 42 additions and 4 deletions

View File

@@ -10,6 +10,7 @@ from typing import TYPE_CHECKING, Any, Self, Type, TypedDict
from urllib.parse import quote, unquote from urllib.parse import quote, unquote
from uuid import UUID from uuid import UUID
from boto3.dynamodb.table import BatchWriter
from boto3.dynamodb.types import TypeDeserializer, TypeSerializer from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from glom import glom from glom import glom
@@ -744,6 +745,41 @@ class DynamoDBPersistenceLayer:
else: else:
return True return True
def batch_writer(
self,
table_name: str | None = None,
overwrite_by_pkeys: list[str] = [],
) -> BatchWriter:
"""Create a batch writer object.
This method creates a context manager for writing
objects to Amazon DynamoDB in batch.
The batch writer will automatically handle buffering and sending items
in batches. In addition, the batch writer will also automatically
handle any unprocessed items and resend them as needed. All you need
to do is call ``put_item`` for any items you want to add, and
``delete_item`` for any items you want to delete.
Example usage::
with table.batch_writer() as batch:
for _ in range(1000000):
batch.put_item(Item={'HashKey': '...',
'Otherstuff': '...'})
# You can also delete_items in a batch.
batch.delete_item(Key={'HashKey': 'SomeHashKey'})
See Also
--------
DynamoDB.Table.batch_writer https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/table/batch_writer.html#DynamoDB.Table.batch_writer
"""
return BatchWriter(
table_name=table_name or self.table_name,
client=self.dynamodb_client,
overwrite_by_pkeys=overwrite_by_pkeys,
)
class MissingError(Exception): class MissingError(Exception):
pass pass
@@ -990,7 +1026,7 @@ class DynamoDBCollection:
else: else:
head, tail = {}, items head, tail = {}, items
def _getin(pair: KeyPair, obj: dict) -> dict: def _extract_sk_values(pair: KeyPair, obj: dict) -> dict:
obj = omit((PK, SK), obj) obj = omit((PK, SK), obj)
sk = pair[SK] sk = pair[SK]
path_spec = getattr(sk, 'path_spec', None) path_spec = getattr(sk, 'path_spec', None)
@@ -1001,7 +1037,7 @@ class DynamoDBCollection:
return glom(obj, path_spec) return glom(obj, path_spec)
return obj return obj
def _mapkey(pair: KeyPair) -> str: def _map_key(pair: KeyPair) -> str:
pk = pair[PK] pk = pair[PK]
sk = pair[SK] sk = pair[SK]
@@ -1016,7 +1052,9 @@ class DynamoDBCollection:
return key.removeprefix(sk.remove_prefix or '') return key.removeprefix(sk.remove_prefix or '')
return head | { return head | {
_mapkey(pair): _getin(pair, obj) for pair, obj in zip(sortkeys, tail) if obj _map_key(pair): _extract_sk_values(pair, obj)
for pair, obj in zip(sortkeys, tail)
if obj
} }
def query( def query(

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "layercake" name = "layercake"
version = "0.3.5" version = "0.4.0"
description = "Packages shared dependencies to optimize deployment and ensure consistency across functions." description = "Packages shared dependencies to optimize deployment and ensure consistency across functions."
readme = "README.md" readme = "README.md"
authors = [ authors = [