Files
saladeaula.digital/http-api/cli/seeds.py
2025-07-11 19:24:30 -03:00

135 lines
3.7 KiB
Python

from typing import Any, Generator
import boto3
import jsonlines
from aws_lambda_powertools.shared.json_encoder import Encoder
from elasticsearch import Elasticsearch
from layercake.dynamodb import deserialize
from meilisearch import Client as Meilisearch
from tqdm import tqdm
elastic_client = Elasticsearch('http://127.0.0.1:9200')
dynamodb_client = boto3.client('dynamodb', endpoint_url='http://127.0.0.1:8000')
meili_client = Meilisearch('http://127.0.0.1:7700')
class JSONEncoder(Encoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return super(__class__, self).default(obj)
jsonl_files = (
'test-orders.jsonl',
'test-users.jsonl',
'test-enrollments.jsonl',
'test-courses.jsonl',
)
def put_item(item: dict, table_name: str, /, dynamodb_client) -> bool:
try:
dynamodb_client.put_item(
TableName=table_name,
Item=item,
)
except Exception:
return False
else:
return True
def scan_table(table_name: str, /, dynamodb_client, **kwargs) -> Generator:
try:
r = dynamodb_client.scan(TableName=table_name, **kwargs)
except Exception:
yield from ()
else:
for item in r['Items']:
yield deserialize(item)
if 'LastEvaluatedKey' in r:
yield from scan_table(
table_name,
dynamodb_client=dynamodb_client,
ExclusiveStartKey=r['LastEvaluatedKey'],
)
class Elastic:
def __init__(self, client: Elasticsearch) -> None:
self.client = client
def index_item(
self,
/,
id: str,
index: str,
doc: dict,
):
return self.client.update(
index=index,
id=id,
doc=_serialize_to_basic_types(doc),
doc_as_upsert=True,
)
def delete_index(self, index: str) -> bool:
try:
self.client.indices.delete(index=index)
except Exception:
return False
else:
return True
def _serialize_to_basic_types(value: Any) -> Any:
if isinstance(value, dict):
return {k: _serialize_to_basic_types(v) for k, v in value.items()}
if isinstance(value, set):
return list(value)
if isinstance(value, list):
return [_serialize_to_basic_types(v) for v in value]
return value
if __name__ == '__main__':
elastic = Elastic(elastic_client)
# Populate DynamoDB tables with data from JSONL files
for file in tqdm(jsonl_files, desc='Processing files'):
with open(f'seeds/{file}') as fp:
table_name = file.removesuffix('.jsonl')
reader = jsonlines.Reader(fp).iter(skip_invalid=True)
for line in tqdm(reader, desc=f'Processing lines in {file}'):
put_item(line, table_name, dynamodb_client) # type: ignore
# Scan DynamoDB tables and index the data into Meilisearch
for file in tqdm(jsonl_files, desc='Scanning tables'):
table_name = file.removesuffix('.jsonl')
for doc in tqdm(
scan_table(
table_name,
dynamodb_client,
FilterExpression='sk = :sk',
ExpressionAttributeValues={':sk': {'S': '0'}},
),
desc=f'Indexing {table_name}',
):
meili_client.index(table_name).add_documents([doc], serializer=JSONEncoder)
meili_client.index('pytest').add_documents([doc], serializer=JSONEncoder)
index = meili_client.index(table_name)
index.update_settings(
{
'sortableAttributes': ['create_date', 'createDate'],
'filterableAttributes': ['tenant_id', 'status'],
}
)