Files
saladeaula.digital/konviva-events/tests/test_app.py

224 lines
6.7 KiB
Python

from http import HTTPMethod, HTTPStatus
from layercake.dateutils import now
from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, PartitionKey
from .conftest import HttpApiProxy, LambdaContext
def test_start_progress(
app,
seeds,
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
http_api_proxy: HttpApiProxy,
lambda_context: LambdaContext,
):
# This data was added from seeds
r = app.lambda_handler(
http_api_proxy(
raw_path='/',
method=HTTPMethod.POST,
body={
'ID_MATRICULA': '123',
'APROVEITAMENTO': '23.152173913043477',
'ANDAMENTO': '38.888888888888886',
'status': 'IN_PROGRESS',
},
),
lambda_context,
)
assert r['statusCode'] == HTTPStatus.NO_CONTENT
# Check `seeds.jsonl` for sample data related to this query
r = dynamodb_persistence_layer.collection.query(
PartitionKey('d9da85f2-e09f-472d-9515-3d91d70f1e8a')
)
assert any(item.get('sk') == 'STARTED' for item in r['items'])
assert any(
item.get('sk') == 'SCHEDULE#REMINDER_NO_ACTIVITY_AFTER_7_DAYS'
for item in r['items']
)
assert len(r['items']) == 3
def test_update_progress(
app,
seeds,
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
http_api_proxy: HttpApiProxy,
lambda_context: LambdaContext,
):
# This data was added from seeds
r = app.lambda_handler(
http_api_proxy(
raw_path='/',
method=HTTPMethod.POST,
body={
'ID_MATRICULA': '456',
'APROVEITAMENTO': '23.152173913043477',
'ANDAMENTO': '12.888888888888886',
'status': 'IN_PROGRESS',
'event': 'Matrícula - atualização de conteúdo',
},
),
lambda_context,
)
assert r['statusCode'] == HTTPStatus.NO_CONTENT
# Check `seeds.jsonl` for sample data related to this query
r = dynamodb_persistence_layer.collection.query(
PartitionKey('197991aa-52e2-4e0c-b2a4-a7c53bcfee02')
)
assert len(r['items']) == 2
assert any(
item.get('sk') == 'SCHEDULE#REMINDER_NO_ACTIVITY_AFTER_7_DAYS'
for item in r['items']
)
def test_set_as_completed(
app,
seeds,
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
http_api_proxy: HttpApiProxy,
lambda_context: LambdaContext,
):
# Update `created_at` to avoid future test failures
dynamodb_persistence_layer.update_item(
key=KeyPair('6c7e3d9b-f5d1-4da4-9e55-0825bb6ff2b8', '0'),
update_expr='SET created_at = :created_at',
expr_attr_values={':created_at': now()},
)
# This data was added from seeds
r = app.lambda_handler(
http_api_proxy(
raw_path='/',
method=HTTPMethod.POST,
body={
'ID_MATRICULA': '567',
'APROVEITAMENTO': '89.152173913043477',
'ANDAMENTO': '100',
'status': 'COMPLETED',
},
),
lambda_context,
)
assert r['statusCode'] == HTTPStatus.NO_CONTENT
# Check `seeds.jsonl` for sample data related to this query
r = dynamodb_persistence_layer.collection.query(
PartitionKey('6c7e3d9b-f5d1-4da4-9e55-0825bb6ff2b8')
)
assert len(r['items']) == 7
assert any(item.get('sk') == 'COMPLETED' for item in r['items'])
assert any(item.get('sk') == 'LOCK' for item in r['items'])
assert any(
item.get('sk') == 'SCHEDULE#REMINDER_CERT_EXPIRATION_BEFORE_30_DAYS'
for item in r['items']
)
assert any(item.get('sk') == 'SCHEDULE#SET_AS_ARCHIVED' for item in r['items'])
r = dynamodb_persistence_layer.collection.query(PartitionKey('LOCK'))
assert len(r['items']) == 1
def test_set_as_failed(
app,
seeds,
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
http_api_proxy: HttpApiProxy,
lambda_context: LambdaContext,
):
# Update `created_at` to avoid future test failures
dynamodb_persistence_layer.update_item(
key=KeyPair('6c7e3d9b-f5d1-4da4-9e55-0825bb6ff2b8', '0'),
update_expr='SET created_at = :created_at',
expr_attr_values={':created_at': now()},
)
# This data was added from seeds
r = app.lambda_handler(
http_api_proxy(
raw_path='/',
method=HTTPMethod.POST,
body={
'ID_MATRICULA': '567',
'APROVEITAMENTO': '12.152173913043477',
'ANDAMENTO': '100',
'status': 'COMPLETED',
},
),
lambda_context,
)
assert r['statusCode'] == HTTPStatus.NO_CONTENT
# Check `seeds.jsonl` for sample data related to this query
r = dynamodb_persistence_layer.collection.query(
PartitionKey('6c7e3d9b-f5d1-4da4-9e55-0825bb6ff2b8')
)
assert any(item.get('sk') == 'FAILED' for item in r['items'])
def test_set_as_archived(
app,
seeds,
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
http_api_proxy: HttpApiProxy,
lambda_context: LambdaContext,
):
# This data was added from seeds
r = app.lambda_handler(
http_api_proxy(
raw_path='/',
method=HTTPMethod.POST,
body={
'ID_MATRICULA': '899',
'APROVEITAMENTO': '70',
'ANDAMENTO': '100',
'status': 'COMPLETED',
},
),
lambda_context,
)
assert r['statusCode'] == HTTPStatus.NO_CONTENT
# Check `seeds.jsonl` for sample data related to this query
r = dynamodb_persistence_layer.collection.query(
PartitionKey('cc2c3bce-c34a-4e82-aa6c-1a19e70ec5ae')
)
assert any(item.get('sk') == 'ARCHIVED' for item in r['items'])
assert len(r['items']) == 4
def test_set_as_expired(
app,
seeds,
dynamodb_persistence_layer: DynamoDBPersistenceLayer,
http_api_proxy: HttpApiProxy,
lambda_context: LambdaContext,
):
# This data was added from seeds
r = app.lambda_handler(
http_api_proxy(
raw_path='/',
method=HTTPMethod.POST,
body={
'ID_MATRICULA': '221',
'APROVEITAMENTO': '69',
'ANDAMENTO': '100',
'status': 'COMPLETED',
},
),
lambda_context,
)
assert r['statusCode'] == HTTPStatus.NO_CONTENT
# Check `seeds.jsonl` for sample data related to this query
r = dynamodb_persistence_layer.collection.query(
PartitionKey('5db53b35-0bae-4907-afda-a213cb5bf651')
)
assert any(item.get('sk') == 'EXPIRED' for item in r['items'])
assert len(r['items']) == 3