72 lines
1.6 KiB
Python
72 lines
1.6 KiB
Python
from datetime import datetime
|
|
from ipaddress import IPv4Address
|
|
|
|
import pytest
|
|
from botocore.exceptions import ClientError
|
|
|
|
from layercake.dynamodb import (
|
|
DynamoDBPersistenceLayer,
|
|
Key,
|
|
KeyPair,
|
|
TransactItems,
|
|
serialize,
|
|
)
|
|
|
|
|
|
def test_serialize():
|
|
assert serialize(
|
|
{
|
|
'id': '123',
|
|
'sk': 'abc',
|
|
'date': datetime.fromisoformat('2025-03-20T18:29:10.713994'),
|
|
'ip': IPv4Address('127.0.0.1'),
|
|
}
|
|
) == {
|
|
'id': {'S': '123'},
|
|
'sk': {'S': 'abc'},
|
|
'date': {'S': '2025-03-20T18:29:10.713994'},
|
|
'ip': {'S': '127.0.0.1'},
|
|
}
|
|
|
|
|
|
def test_key():
|
|
assert Key(('122', 'abc'), prefix='schedules') == 'schedules#122#abc'
|
|
|
|
|
|
def test_keypair():
|
|
assert KeyPair('123', 'abc') == {'id': '123', 'sk': 'abc'}
|
|
|
|
|
|
def test_transact_write_items(dynamodb_client):
|
|
user_layer = DynamoDBPersistenceLayer('pytest', dynamodb_client)
|
|
transact = TransactItems(user_layer.table_name)
|
|
transact.put(
|
|
item={
|
|
'id': '5OxmMjL-ujoR5IMGegQz',
|
|
'sk': '0',
|
|
}
|
|
)
|
|
transact.put(
|
|
item={
|
|
'id': 'cpf',
|
|
'sk': '07879819908',
|
|
}
|
|
)
|
|
transact.put(
|
|
item={
|
|
'id': 'email',
|
|
'sk': 'sergio@somosbeta.com.br',
|
|
},
|
|
cond_expr='attribute_not_exists(sk)',
|
|
)
|
|
transact.put(
|
|
item={
|
|
'id': '5OxmMjL-ujoR5IMGegQz',
|
|
'sk': 'email:sergio@somosbeta.com.br',
|
|
},
|
|
cond_expr='attribute_not_exists(sk)',
|
|
)
|
|
|
|
with pytest.raises(ClientError):
|
|
user_layer.transact_write_items(transact)
|