add get_items

This commit is contained in:
2025-04-08 13:44:06 -03:00
parent 68ff6f282b
commit 6450e5fa7c
18 changed files with 393 additions and 195 deletions

View File

@@ -14,6 +14,7 @@ from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from botocore.exceptions import ClientError
from .dateutils import now, timestamp
from .funcs import omit
TZ = os.getenv('TZ', 'UTC')
PK = os.getenv('DYNAMODB_PARTITION_KEY', 'pk')
@@ -124,6 +125,44 @@ else:
self.delimiter = delimiter
if TYPE_CHECKING:
@dataclass
class SortKey(str):
sk: str
table_name: str | None = None
else:
class SortKey(str):
def __new__(cls, sk: str, *, table_name: str | None = None) -> str:
return super().__new__(cls, sk)
def __init__(self, sk: str, *, table_name: str | None = None) -> None:
# __init__ is used to store the parameters for later reference.
# For immutable types like str, __init__ cannot change the instance's value.
self.sk = sk
self.table_name = table_name
@dataclass
class TransactKey:
"""
Example
-------
>>> TransactKey('e9bb7dc6-c7b2-4d34-8931-d298353758ec')
... + SortKey('0')
... + SortKey('tenant')
"""
pk: str
sk: tuple[SortKey, ...] = ()
def __add__(self, sk: SortKey) -> 'TransactKey':
if isinstance(sk, SortKey):
return TransactKey(pk=self.pk, sk=self.sk + (sk,))
raise TypeError('Can only add a SortKey to a TransactKey')
class Key(ABC, dict):
@abstractmethod
def expr_attr_name(self) -> dict: ...
@@ -485,7 +524,7 @@ class DynamoDBPersistenceLayer:
else:
return True
def transact_get_items(self, transact_items: TransactItems) -> list[dict]:
def transact_get_items(self, transact_items: TransactItems) -> list[dict[str, Any]]:
try:
response = self.dynamodb_client.transact_get_items(
TransactItems=transact_items.items
@@ -521,36 +560,6 @@ class PaginatedResult(TypedDict):
class DynamoDBCollection:
"""
Example
-------
**Get an item using a composed sort key**
collect = DynamoDBCollection(...)
collect.get_item(
KeyPair(
'b3511b5a-cb32-4833-a373-f8223f2088d4',
ComposeKey('sergio@somosbeta.com.br', prefix='emails'),
),
)
**Get items using a composed partition key**
collect = DynamoDBCollection(...)
collect.get_items(
PartitionKey(
ComposeKey('b3511b5a-cb32-4833-a373-f8223f2088d4', prefix='logs')
),
)
**Get items using a key pair**
collect = DynamoDBCollection(...)
collect.get_items(
KeyPair('b3511b5a-cb32-4833-a373-f8223f2088d4', 'emails'),
)
"""
def __init__(
self,
persistence_layer: DynamoDBPersistenceLayer,
@@ -572,6 +581,45 @@ class DynamoDBCollection:
default: Any = None,
delimiter: str = '#',
) -> Any:
"""Get an item with the given key.
Example
-------
**Get an item using a composed sort key**
collect = DynamoDBCollection(...)
collect.get_item(
KeyPair(
'b3511b5a-cb32-4833-a373-f8223f2088d4',
ComposeKey('username@domain.com', prefix='emails'),
),
)
Parameters
----------
key : Key
Key of the item to be retrieved.
path_spec : str, optional
A path specification for nested data extraction, default is None.
raise_on_error : bool, optional
If True, raises an exception when the item is not found, default is True.
exception_cls : Type[Exception], optional
Exception class to be used if the item is not found, default is MissingError.
default : Any, optional
Default value returned if the item is not found, default is None.
delimiter : str, optional
Delimiter used in key composition, default is '#'.
Returns
-------
Any
Data of the retrieved item or the default value if not found.
Raises
------
Exception
Raises the provided exception if the item is not found and raise_on_error is True.
"""
exc_cls = exception_cls or self.exception_cls
data = self.persistence_layer.get_item(key)
@@ -592,6 +640,22 @@ class DynamoDBCollection:
ttl: int | datetime | None = None,
**kwargs: Any,
) -> bool:
"""Creates a new item, or replaces an old item with a new item.
Parameters
----------
key : Key
Key for the item to be inserted or updated.
ttl : int or datetime, optional
Time-to-live for the item, specified as a timestamp integer or datetime object, default is None.
**kwargs
Additional data to be stored with the item.
Returns
-------
bool
True if the operation is successful, False otherwise.
"""
now_ = now(self.tz)
if isinstance(ttl, int):
@@ -620,6 +684,24 @@ class DynamoDBCollection:
expr_attr_names: dict | None = None,
expr_attr_values: dict | None = None,
) -> bool:
"""Deletes a single item in a table by key.
Parameters
----------
key : Key
Key of the item to be deleted.
cond_expr : str, optional
Conditional expression for deletion, default is None.
expr_attr_names : dict, optional
Mapping of attribute names for the expression, default is None.
expr_attr_values : dict, optional
Mapping of attribute values for the expression, default is None.
Returns
-------
bool
True if the item is successfully deleted, False otherwise.
"""
return self.persistence_layer.delete_item(
key=key,
cond_expr=cond_expr,
@@ -627,7 +709,57 @@ class DynamoDBCollection:
expr_attr_values=expr_attr_values,
)
def get_items(
def get_items(self, key: TransactKey) -> dict[str, Any]:
"""Get multiple items via a transaction based on the provided TransactKey.
Example
-------
**Get items using chained sort keys**
key = TransactKey('b3511b5a-cb32-4833-a373-f8223f2088d4') + SortKey('sk-1') + SortKey('sk-2')
collect = DynamoDBCollection(...)
items = collect.get_items(key)
Parameters
----------
key : TransactKey
A TransactKey instance that contains a partition key and one or more sort keys.
If no sort key is provided, the transaction is skipped.
Returns
-------
dict[str, Any]
A dict of items retrieved from the transaction.
"""
# If no sort key is provided, the query is skipped
if not key.sk:
return {}
table_name = self.persistence_layer.table_name
transact = TransactItems(table_name)
# Add a get operation for each sort key for the transaction
for sk in key.sk:
transact.get(
key=KeyPair(key.pk, sk),
table_name=sk.table_name,
)
data, *rest = self.persistence_layer.transact_get_items(transact)
return data | {
k: omit(
(
PK,
SK,
),
item,
)
for k, item in zip(key.sk[1:], rest)
if item
}
def query(
self,
key: PartitionKey | KeyPair,
*,
@@ -638,6 +770,52 @@ class DynamoDBCollection:
index_forward: bool = False,
limit: int = LIMIT,
) -> PaginatedResult:
"""Query returns all items with that partition key or key pair.
Example
-------
**Query using a composed partition key**
collect = DynamoDBCollection(...)
collect.query(
PartitionKey(
ComposeKey('b3511b5a-cb32-4833-a373-f8223f2088d4', prefix='logs')
),
)
**Query using a key pair**
collect = DynamoDBCollection(...)
collect.query(
KeyPair('b3511b5a-cb32-4833-a373-f8223f2088d4', 'emails'),
)
Parameters
----------
key : PartitionKey or KeyPair
Partition key or Key pair used for the query.
expr_attr_name : dict, optional
Additional mapping for attribute names, default is {}.
expr_attr_values : dict, optional
Additional mapping for attribute values, default is {}.
start_key : str, optional
Starting key for pagination, default is None.
filter_expr : str, optional
Filter expression for the query, default is None.
index_forward : bool, optional
Order of the results; True for ascending order, default is False.
limit : int, optional
Maximum number of items to return, default is LIMIT.
Returns
-------
PaginatedResult
Dict containing the queried items and the key for the next batch.
See Also
--------
DynamoDB.Client.query : https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/query.html
"""
key_cond_expr = (
'#pk = :pk AND begins_with(#sk, :sk)'
if isinstance(key, KeyPair)