from http import HTTPMethod, HTTPStatus from layercake.dateutils import now from layercake.dynamodb import DynamoDBPersistenceLayer, KeyPair, PartitionKey from .conftest import HttpApiProxy, LambdaContext def test_start_progress( app, seeds, dynamodb_persistence_layer: DynamoDBPersistenceLayer, http_api_proxy: HttpApiProxy, lambda_context: LambdaContext, ): # This data was added from seeds r = app.lambda_handler( http_api_proxy( raw_path='/', method=HTTPMethod.POST, body={ 'ID_MATRICULA': '123', 'APROVEITAMENTO': '23.152173913043477', 'ANDAMENTO': '38.888888888888886', 'status': 'IN_PROGRESS', }, ), lambda_context, ) assert r['statusCode'] == HTTPStatus.NO_CONTENT # Check `seeds.jsonl` for sample data related to this query r = dynamodb_persistence_layer.collection.query( PartitionKey('d9da85f2-e09f-472d-9515-3d91d70f1e8a') ) assert any(item.get('sk') == 'STARTED' for item in r['items']) assert any( item.get('sk') == 'SCHEDULE#REMINDER_NO_ACTIVITY_AFTER_7_DAYS' for item in r['items'] ) assert len(r['items']) == 3 def test_update_progress( app, seeds, dynamodb_persistence_layer: DynamoDBPersistenceLayer, http_api_proxy: HttpApiProxy, lambda_context: LambdaContext, ): # This data was added from seeds r = app.lambda_handler( http_api_proxy( raw_path='/', method=HTTPMethod.POST, body={ 'ID_MATRICULA': '456', 'APROVEITAMENTO': '23.152173913043477', 'ANDAMENTO': '12.888888888888886', 'status': 'IN_PROGRESS', 'event': 'Matrícula - atualização de conteúdo', }, ), lambda_context, ) assert r['statusCode'] == HTTPStatus.NO_CONTENT # Check `seeds.jsonl` for sample data related to this query r = dynamodb_persistence_layer.collection.query( PartitionKey('197991aa-52e2-4e0c-b2a4-a7c53bcfee02') ) assert len(r['items']) == 2 assert any( item.get('sk') == 'SCHEDULE#REMINDER_NO_ACTIVITY_AFTER_7_DAYS' for item in r['items'] ) def test_set_as_completed( app, seeds, dynamodb_persistence_layer: DynamoDBPersistenceLayer, http_api_proxy: HttpApiProxy, lambda_context: LambdaContext, ): # Update `created_at` to avoid future test failures dynamodb_persistence_layer.update_item( key=KeyPair('6c7e3d9b-f5d1-4da4-9e55-0825bb6ff2b8', '0'), update_expr='SET created_at = :created_at', expr_attr_values={':created_at': now()}, ) # This data was added from seeds r = app.lambda_handler( http_api_proxy( raw_path='/', method=HTTPMethod.POST, body={ 'ID_MATRICULA': '567', 'APROVEITAMENTO': '89.152173913043477', 'ANDAMENTO': '100', 'status': 'COMPLETED', }, ), lambda_context, ) assert r['statusCode'] == HTTPStatus.NO_CONTENT # Check `seeds.jsonl` for sample data related to this query r = dynamodb_persistence_layer.collection.query( PartitionKey('6c7e3d9b-f5d1-4da4-9e55-0825bb6ff2b8') ) assert len(r['items']) == 7 assert any(item.get('sk') == 'COMPLETED' for item in r['items']) assert any(item.get('sk') == 'LOCK' for item in r['items']) assert any( item.get('sk') == 'SCHEDULE#REMINDER_CERT_EXPIRATION_BEFORE_30_DAYS' for item in r['items'] ) assert any(item.get('sk') == 'SCHEDULE#SET_AS_ARCHIVED' for item in r['items']) r = dynamodb_persistence_layer.collection.query(PartitionKey('LOCK')) assert len(r['items']) == 1 def test_set_as_failed( app, seeds, dynamodb_persistence_layer: DynamoDBPersistenceLayer, http_api_proxy: HttpApiProxy, lambda_context: LambdaContext, ): # Update `created_at` to avoid future test failures dynamodb_persistence_layer.update_item( key=KeyPair('6c7e3d9b-f5d1-4da4-9e55-0825bb6ff2b8', '0'), update_expr='SET created_at = :created_at', expr_attr_values={':created_at': now()}, ) # This data was added from seeds r = app.lambda_handler( http_api_proxy( raw_path='/', method=HTTPMethod.POST, body={ 'ID_MATRICULA': '567', 'APROVEITAMENTO': '12.152173913043477', 'ANDAMENTO': '100', 'status': 'COMPLETED', }, ), lambda_context, ) assert r['statusCode'] == HTTPStatus.NO_CONTENT # Check `seeds.jsonl` for sample data related to this query r = dynamodb_persistence_layer.collection.query( PartitionKey('6c7e3d9b-f5d1-4da4-9e55-0825bb6ff2b8') ) assert any(item.get('sk') == 'FAILED' for item in r['items']) def test_set_as_archived( app, seeds, dynamodb_persistence_layer: DynamoDBPersistenceLayer, http_api_proxy: HttpApiProxy, lambda_context: LambdaContext, ): # This data was added from seeds r = app.lambda_handler( http_api_proxy( raw_path='/', method=HTTPMethod.POST, body={ 'ID_MATRICULA': '899', 'APROVEITAMENTO': '70', 'ANDAMENTO': '100', 'status': 'COMPLETED', }, ), lambda_context, ) assert r['statusCode'] == HTTPStatus.NO_CONTENT # Check `seeds.jsonl` for sample data related to this query r = dynamodb_persistence_layer.collection.query( PartitionKey('cc2c3bce-c34a-4e82-aa6c-1a19e70ec5ae') ) assert any(item.get('sk') == 'ARCHIVED' for item in r['items']) assert len(r['items']) == 4 def test_set_as_expired( app, seeds, dynamodb_persistence_layer: DynamoDBPersistenceLayer, http_api_proxy: HttpApiProxy, lambda_context: LambdaContext, ): # This data was added from seeds r = app.lambda_handler( http_api_proxy( raw_path='/', method=HTTPMethod.POST, body={ 'ID_MATRICULA': '221', 'APROVEITAMENTO': '69', 'ANDAMENTO': '100', 'status': 'COMPLETED', }, ), lambda_context, ) assert r['statusCode'] == HTTPStatus.NO_CONTENT # Check `seeds.jsonl` for sample data related to this query r = dynamodb_persistence_layer.collection.query( PartitionKey('5db53b35-0bae-4907-afda-a213cb5bf651') ) assert any(item.get('sk') == 'EXPIRED' for item in r['items']) assert len(r['items']) == 3