166 lines
4.5 KiB
Python
166 lines
4.5 KiB
Python
from datetime import datetime
|
|
from ipaddress import IPv4Address
|
|
|
|
import pytest
|
|
from botocore.exceptions import ClientError
|
|
|
|
from layercake.dateutils import ttl
|
|
from layercake.dynamodb import (
|
|
ComposeKey,
|
|
DynamoDBCollection,
|
|
DynamoDBPersistenceLayer,
|
|
KeyPair,
|
|
MissingError,
|
|
PartitionKey,
|
|
TransactItems,
|
|
serialize,
|
|
)
|
|
|
|
|
|
def test_serialize():
|
|
assert serialize(
|
|
{
|
|
'id': '123',
|
|
'sk': 'abc',
|
|
'date': datetime.fromisoformat('2025-03-20T18:29:10.713994'),
|
|
'ip': IPv4Address('127.0.0.1'),
|
|
}
|
|
) == {
|
|
'id': {'S': '123'},
|
|
'sk': {'S': 'abc'},
|
|
'date': {'S': '2025-03-20T18:29:10.713994'},
|
|
'ip': {'S': '127.0.0.1'},
|
|
}
|
|
|
|
|
|
def test_composekey():
|
|
assert ComposeKey(('122', 'abc'), prefix='schedules') == 'schedules#122#abc'
|
|
assert ComposeKey(('122', 'abc')) == '122#abc'
|
|
assert ComposeKey('122') == '122'
|
|
|
|
|
|
def test_partitionkey():
|
|
assert PartitionKey('123') == {'id': '123'}
|
|
assert PartitionKey('123').expr_attr_name() == {'#pk': 'id'}
|
|
assert PartitionKey('123').expr_attr_values() == {':pk': '123'}
|
|
|
|
|
|
def test_keypair():
|
|
assert KeyPair('123', 'abc') == {'id': '123', 'sk': 'abc'}
|
|
assert KeyPair('123', 'abc').expr_attr_name() == {'#pk': 'id', '#sk': 'sk'}
|
|
assert KeyPair('123', 'abc').expr_attr_values() == {':pk': '123', ':sk': 'abc'}
|
|
|
|
|
|
def test_transact_write_items(
|
|
dynamodb_seeds,
|
|
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
|
|
):
|
|
transact = TransactItems(dynamodb_persistence_layer.table_name)
|
|
transact.put(item=KeyPair('5OxmMjL-ujoR5IMGegQz', '0'))
|
|
transact.put(item=KeyPair('cpf', '07879819908'))
|
|
transact.put(
|
|
item=KeyPair('email', 'sergio@somosbeta.com.br'),
|
|
cond_expr='attribute_not_exists(sk)',
|
|
)
|
|
transact.put(
|
|
item=KeyPair('5OxmMjL-ujoR5IMGegQz', 'emails#sergio@somosbeta.com.br'),
|
|
cond_expr='attribute_not_exists(sk)',
|
|
)
|
|
|
|
with pytest.raises(ClientError):
|
|
dynamodb_persistence_layer.transact_write_items(transact)
|
|
|
|
|
|
def test_collection_get_item(
|
|
dynamodb_seeds,
|
|
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
|
|
):
|
|
collect = DynamoDBCollection(dynamodb_persistence_layer)
|
|
data_notfound = collect.get_item(
|
|
KeyPair(
|
|
pk='5OxmMjL-ujoR5IMGegQz',
|
|
sk='tenant',
|
|
),
|
|
raise_if_missing=False,
|
|
default={},
|
|
)
|
|
assert data_notfound == {}
|
|
|
|
# This data was added from seeds
|
|
data = collect.get_item(
|
|
KeyPair(
|
|
pk='5OxmMjL-ujoR5IMGegQz',
|
|
sk=ComposeKey('sergio@somosbeta.com.br', prefix='emails'),
|
|
),
|
|
default={},
|
|
)
|
|
assert data == {
|
|
'email_verified': True,
|
|
'mx_record_exists': True,
|
|
'sk': 'emails#sergio@somosbeta.com.br',
|
|
'email_primary': True,
|
|
'id': '5OxmMjL-ujoR5IMGegQz',
|
|
'create_date': '2019-03-25T00:00:00-03:00',
|
|
'update_date': '2023-11-09T12:13:04.308986-03:00',
|
|
}
|
|
|
|
with pytest.raises(MissingError):
|
|
collect.get_item(key=KeyPair('5OxmMjL-ujoR5IMGegQz', 'notfound'))
|
|
|
|
|
|
def test_collection_put_item(
|
|
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
|
|
):
|
|
collect = DynamoDBCollection(dynamodb_persistence_layer)
|
|
|
|
assert collect.put_item(
|
|
KeyPair(
|
|
'5OxmMjL-ujoR5IMGegQz',
|
|
ComposeKey('6d1044d5-18c5-437c-9219-fc2ace7e5ebc', prefix='orgs'),
|
|
),
|
|
name='Beta Educação',
|
|
ttl=ttl(days=3),
|
|
)
|
|
|
|
data = collect.get_item(
|
|
KeyPair(
|
|
pk='5OxmMjL-ujoR5IMGegQz',
|
|
sk=ComposeKey('6d1044d5-18c5-437c-9219-fc2ace7e5ebc', prefix='orgs'),
|
|
),
|
|
)
|
|
|
|
assert data['sk'] == 'orgs#6d1044d5-18c5-437c-9219-fc2ace7e5ebc'
|
|
assert 'name' in data
|
|
assert 'ttl' in data
|
|
assert 'ttl_date' in data
|
|
|
|
|
|
def test_collection_delete_item(
|
|
dynamodb_seeds,
|
|
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
|
|
):
|
|
collect = DynamoDBCollection(dynamodb_persistence_layer)
|
|
|
|
# This data was added from seeds
|
|
assert collect.delete_item(
|
|
KeyPair(
|
|
'5OxmMjL-ujoR5IMGegQz',
|
|
ComposeKey('sergio@somsbeta.com.br', prefix='emails'),
|
|
)
|
|
)
|
|
|
|
|
|
def test_collection_get_items(
|
|
dynamodb_seeds,
|
|
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
|
|
):
|
|
collect = DynamoDBCollection(dynamodb_persistence_layer)
|
|
|
|
# This data was added from seeds
|
|
data = collect.get_items(
|
|
PartitionKey(
|
|
ComposeKey('5OxmMjL-ujoR5IMGegQz', prefix='logs'),
|
|
),
|
|
)
|
|
assert len(data['items']) == 2
|