92 lines
2.8 KiB
Python
92 lines
2.8 KiB
Python
from typing import Generator
|
|
|
|
import boto3
|
|
import jsonlines
|
|
from aws_lambda_powertools.shared.json_encoder import Encoder
|
|
from layercake.dynamodb import deserialize, serialize
|
|
from meilisearch import Client as Meilisearch
|
|
from tqdm import tqdm
|
|
|
|
dynamodb_client = boto3.client('dynamodb', endpoint_url='http://127.0.0.1:8000')
|
|
meili_client = Meilisearch('http://127.0.0.1:7700')
|
|
|
|
|
|
JSONL_FILES = (
|
|
# 'test-orders.jsonl',
|
|
'test-users.jsonl',
|
|
# 'test-enrollments.jsonl',
|
|
'test-courses.jsonl',
|
|
)
|
|
|
|
|
|
class JSONEncoder(Encoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, set):
|
|
return list(obj)
|
|
return super(__class__, self).default(obj)
|
|
|
|
|
|
def put_item(item: dict, table_name: str, /, dynamodb_client) -> bool:
|
|
try:
|
|
dynamodb_client.put_item(
|
|
TableName=table_name,
|
|
Item=serialize(item),
|
|
)
|
|
except Exception:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def scan_table(table_name: str, /, dynamodb_client, **kwargs) -> Generator:
|
|
try:
|
|
r = dynamodb_client.scan(TableName=table_name, **kwargs)
|
|
except Exception:
|
|
yield from ()
|
|
else:
|
|
for item in r['Items']:
|
|
yield item
|
|
|
|
if 'LastEvaluatedKey' in r:
|
|
yield from scan_table(
|
|
table_name,
|
|
dynamodb_client=dynamodb_client,
|
|
ExclusiveStartKey=r['LastEvaluatedKey'],
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Populate DynamoDB tables with data from JSONL files
|
|
for file in tqdm(JSONL_FILES, desc='Processing files'):
|
|
with open(f'seeds/{file}') as fp:
|
|
table_name = file.removesuffix('.jsonl')
|
|
reader = jsonlines.Reader(fp).iter(skip_invalid=True)
|
|
|
|
for line in tqdm(reader, desc=f'Processing lines in {file}'):
|
|
put_item(line, table_name, dynamodb_client) # type: ignore
|
|
|
|
# Scan DynamoDB tables and index the data into Meilisearch
|
|
for file in tqdm(JSONL_FILES, desc='Scanning tables'):
|
|
table_name = file.removesuffix('.jsonl')
|
|
|
|
for doc in tqdm(
|
|
scan_table(
|
|
table_name,
|
|
dynamodb_client,
|
|
FilterExpression='sk = :sk',
|
|
ExpressionAttributeValues={':sk': {'S': '0'}},
|
|
),
|
|
desc=f'Indexing {table_name}',
|
|
):
|
|
doc = deserialize(doc)
|
|
meili_client.index(table_name).add_documents([doc], serializer=JSONEncoder)
|
|
meili_client.index('pytest').add_documents([doc], serializer=JSONEncoder)
|
|
|
|
index = meili_client.index(table_name)
|
|
index.update_settings(
|
|
{
|
|
'sortableAttributes': ['create_date', 'createDate', 'created_at'],
|
|
'filterableAttributes': ['tenant_id', 'status', 'cnpj'],
|
|
}
|
|
)
|