rename key

This commit is contained in:
2025-05-23 13:46:20 -03:00
parent a7ee787378
commit 693ced6fdd
5 changed files with 135 additions and 70 deletions

View File

@@ -3,7 +3,6 @@ from datetime import datetime, timedelta
import pytz
TZ = os.getenv('TZ', 'UTC')

View File

@@ -135,8 +135,8 @@ if TYPE_CHECKING:
@dataclass
class SortKey(str):
"""
SortKey encapsulates the sort key value and optionally stores additional attributes
for nested data extraction.
SortKey encapsulates the sort key value and optionally stores additional
attributes for nested data extraction.
Parameters
----------
@@ -146,21 +146,18 @@ if TYPE_CHECKING:
Optional specification for nested data extraction.
remove_prefix: str, optional
Optional prefix to remove from the key when forming the result dict.
retain_key: bool, optional
Use the key itself as value if True; otherwise, use the extracted value.
"""
sk: str
path_spec: str | None = None
remove_prefix: str | None = None
retain_key: bool = False
else:
class SortKey(str):
"""
SortKey encapsulates the sort key value and optionally stores additional attributes
for nested data extraction.
SortKey encapsulates the sort key value and optionally stores additional
attributes for nested data extraction.
Parameters
----------
@@ -170,8 +167,6 @@ else:
Optional specification for nested data extraction.
remove_prefix: str, optional
Optional prefix to remove from the key when forming the result dict.
retain_key: bool, optional
Use the key itself as value if True; otherwise, use the extracted value.
"""
def __new__(
@@ -180,7 +175,6 @@ else:
*,
path_spec: str | None = None,
remove_prefix: str | None = None,
retain_key: bool = False,
) -> str:
return super().__new__(cls, sk)
@@ -190,14 +184,12 @@ else:
*,
path_spec: str | None = None,
remove_prefix: str | None = None,
retain_key: bool = False,
) -> None:
# __init__ is used to store the parameters for later reference.
# For immutable types like str, __init__ cannot change the instance's value.
self.sk = sk
self.path_spec = path_spec
self.remove_prefix = remove_prefix
self.retain_key = retain_key
class Key(ABC, dict):
@@ -228,11 +220,43 @@ class PartitionKey(Key):
class KeyPair(Key):
"""Represents a composite key (partition key and sort key) for DynamoDB queries"""
def __init__(self, pk: str, sk: str) -> None:
def __init__(
self,
pk: str,
sk: str,
*,
rename_key: str | None = None,
retain_key: bool = False,
) -> None:
"""
Initializes a composite key using partition and sort key.
Parameters
----------
pk : str
The partition key.
sk : str
The sort key.
rename_key : str, optional
If provided, renames the sort key in the output.
retain_key : bool, optional
Use the key itself as value if True; otherwise, use the extracted value.
"""
super().__init__(**{PK: pk, SK: sk})
self._rename_key = rename_key
self._retain_key = retain_key
@property
def rename_key(self) -> str | None:
return self._rename_key
@property
def retain_key(self) -> bool:
return self._retain_key
def __repr__(self) -> str:
pk, sk = self.values()
pk, sk, *_ = self.values()
return f'KeyPair({pk!r}, {sk!r})'
def expr_attr_name(self) -> dict:
@@ -517,15 +541,19 @@ class DynamoDBPersistenceLayer:
limit: int | None = None,
index_forward: bool = True,
) -> dict[str, Any]:
"""You must provide the name of the partition key attribute and a single value for that attribute.
"""You must provide the name of the partition key attribute
and a single value for that attribute.
Query returns all items with that partition key value.
Optionally, you can provide a sort key attribute and use a comparison operator to refine the search results.
Optionally, you can provide a sort key attribute and use a comparison operator
to refine the search results.
...
A `Query` operation always returns a result set. If no matching items are found, the result set will be empty.
Queries that do not return results consume the minimum number of read capacity units for that type of read operation.
A `Query` operation always returns a result set. If no matching items are found,
the result set will be empty.
Queries that do not return results consume the minimum number
of read capacity units for that type of read operation.
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/query.html
"""
@@ -563,8 +591,11 @@ class DynamoDBPersistenceLayer:
)
def get_item(self, key: dict) -> dict:
"""The GetItem operation returns a set of attributes for the item with the given primary key.
If there is no matching item, GetItem does not return any data and there will be no Item element in the response.
"""The GetItem operation returns a set of attributes for the item
with the given primary key.
If there is no matching item, GetItem does not return any data and
there will be no Item element in the response.
"""
attrs = {
'TableName': self.table_name,
@@ -639,8 +670,9 @@ class DynamoDBPersistenceLayer:
expr_attr_names: dict | None = None,
expr_attr_values: dict | None = None,
) -> bool:
"""Deletes a single item in a table by primary key. You can perform a conditional delete operation that deletes
the item if it exists, or if it has an expected attribute value.
"""Deletes a single item in a table by primary key. You can perform
a conditional delete operation that deletes the item if it exists,
or if it has an expected attribute value.
"""
attrs: dict = {
'TableName': self.table_name,
@@ -722,15 +754,18 @@ class PaginatedResult(TypedDict):
class DynamoDBCollection:
"""
DynamoDBCollection provides a high-level abstraction for performing common CRUD operations
and queries on a DynamoDB table. It leverages an underlying persistence layer to handle
serialization and deserialization of data, key composition, transaction operations, and TTL management.
DynamoDBCollection provides a high-level abstraction for performing common
CRUD operations and queries on a DynamoDB table.
It leverages an underlying persistence layer to handle
serialization and deserialization of data, key composition, transaction operations,
and TTL management.
This collection class simplifies interaction with DynamoDB items, allowing users to:
- Retrieve a single item or multiple items via transactions.
- Insert (put) items with optional TTL (time-to-live) settings.
- Delete items based on keys and conditions.
- Query items using partition keys or composite key pairs with optional filtering and pagination.
- Query items using partition keys or composite key pairs with
optional filtering and pagination.
Parameters
----------
@@ -756,7 +791,6 @@ class DynamoDBCollection:
def get_item(
self,
key: Key,
path_spec: str | None = None,
/,
raise_on_error: bool = True,
exc_cls: Type[Exception] | None = None,
@@ -780,8 +814,6 @@ class DynamoDBCollection:
----------
key: Key
Key of the item to be retrieved.
path_spec: str, optional
A path specification for nested data extraction.
raise_on_error: bool, optional
If True, raises an exception when the item is not found.
exc_cls: Type[Exception], optional
@@ -797,10 +829,12 @@ class DynamoDBCollection:
Raises
------
Exception
Raises the provided exception if the item is not found and raise_on_error is True.
Raises the provided exception if the item is not found
and raise_on_error is True.
"""
exc_cls = exc_cls or self.exc_cls
data = self.persistence_layer.get_item(key)
path_spec = getattr(key[SK], 'path_spec', None)
if raise_on_error and not data:
raise exc_cls(f'Item with {key} not found.')
@@ -826,7 +860,8 @@ class DynamoDBCollection:
key: Key
Key for the item to be inserted or updated.
ttl: int or datetime, optional
Time-to-live for the item, specified as a timestamp integer or datetime object.
Time-to-live for the item, specified as a timestamp integer
or datetime object.
**kwargs
Additional data to be stored with the item.
@@ -836,18 +871,10 @@ class DynamoDBCollection:
True if the operation is successful, False otherwise.
"""
if isinstance(ttl, int):
kwargs.update(
{
'ttl': ttl,
}
)
kwargs.update({'ttl': ttl})
if isinstance(ttl, datetime):
kwargs.update(
{
'ttl': timestamp(ttl),
}
)
kwargs.update({'ttl': timestamp(ttl)})
return self.persistence_layer.put_item(item=key | kwargs)
@@ -897,20 +924,42 @@ class DynamoDBCollection:
key = (
TransactKey('b3511b5a-cb32-4833-a373-f8223f2088d4')
+ SortKey('sk-1') + SortKey('sk-2')
+ SortKey('sk-1')
+ SortKey('sk-2')
)
collect = DynamoDBCollection(...)
items = collect.get_items(key)
**Get items using chained key pairs**
key = (
KeyPair('b3511b5a-cb32-4833-a373-f8223f2088d4', '0')
+ KeyPair('cpf', '07879819908')
+ KeyPair('email', 'user@example.com')
)
collect = DynamoDBCollection(...)
items = collect.get_items(key)
Parameters
----------
key: TransactKey
A TransactKey instance that contains a partition key and one or more sort keys.
If no sort key is provided, the transaction is skipped.
key: TransactKey or KeyChain
A `TransactKey` is used when you want to define a partition key (`pk`)
and append multiple `SortKey` instances using the `+` operator. Each
`SortKey` is internally paired with the `pk` to form a `KeyPair`.
Alternatively, a `KeyChain` can be created by chaining two or more `KeyPair`
instances using the `+` operator. For example:
`KeyPair(pk1, sk1) + KeyPair(pk2, sk2)` creates a `KeyChain` with two pairs.
Further additions to the chain extend it.
If no sort keys (in `TransactKey`) or no key pairs (in `KeyChain`)
are provided, the operation is skipped.
flatten_top: bool, optional
Determines whether the first nested item in the transaction result should be flattened,
i.e., extracted to serve as the primary item at the top level of the returned dict.
Determines whether the first nested item in the transaction result
should be flattened,
i.e., extracted to serve as the primary item at the top level of
the returned dict.
If True, the nested item is promoted to the top level.
Returns
@@ -949,21 +998,22 @@ class DynamoDBCollection:
return glom(obj, path_spec)
return obj
def _removeprefix(pair: KeyPair) -> str:
def _mapkey(pair: KeyPair) -> str:
pk = pair[PK]
sk = pair[SK]
if pair.rename_key:
return pair.rename_key
if not isinstance(sk, SortKey):
return pk
key = pk if sk.retain_key else sk
key = pk if pair.retain_key else sk
return key.removeprefix(sk.remove_prefix or '')
return head | {
_removeprefix(pair): _getin(pair, obj)
for pair, obj in zip(sortkeys, tail)
if obj
_mapkey(pair): _getin(pair, obj) for pair, obj in zip(sortkeys, tail) if obj
}
def query(
@@ -1044,6 +1094,11 @@ class DynamoDBCollection:
_startkey_b64encode(response['last_key']) if response['last_key'] else None
)
def _removeprefix(
items: list[dict[str, Any]], /, key: str, prefix: str
) -> list[dict[str, Any]]:
return [x | {key: x[key].removeprefix(prefix)} for x in items]
match key.get(PK), key.get(SK):
case ComposeKey(), _: # Remove prefix from Partition Key
items = _removeprefix(items, PK, key[PK].prefix + key[PK].delimiter)
@@ -1056,16 +1111,6 @@ class DynamoDBCollection:
}
def _removeprefix(
items: list[dict[str, Any]],
/,
key: str,
prefix: str,
) -> list[dict[str, Any]]:
"""Remove the given prefix from the value associated with key in each item."""
return [x | {key: x[key].removeprefix(prefix)} for x in items]
def _startkey_b64encode(obj: dict) -> str:
s = json.dumps(obj)
b = urlsafe_b64encode(s.encode('utf-8')).decode('utf-8')