This commit is contained in:
2025-06-06 18:32:09 -03:00
parent 3e44761e69
commit 53613d0a48
34 changed files with 939 additions and 752 deletions

View File

@@ -1,62 +0,0 @@
import pprint
from meilisearch import Client as Meilisearch
MASTER_KEY = 'zrYPsSAG1hgq2zB1dkF0sB9xLoIwTLAz6uw38pWRf5abdpTjY2eeMTIsfPbDbqQR'
API_KEY = '1aa4c720611269e9425e8467df7e802f3a20ad6c5f31fe875ac886fc4efa2c83'
client = Meilisearch(
# 'https://meili.vps.eduseg.com.br',
# '1aa4c720611269e9425e8467df7e802f3a20ad6c5f31fe875ac886fc4efa2c83',
'http://localhost:7700'
)
pp = pprint.PrettyPrinter(indent=4)
courses = client.index('test-courses')
courses.update_settings(
{
'sortableAttributes': ['create_date'],
'filterableAttributes': ['tenant__org_id'],
}
)
# with open('cli/search-results.json') as fp:
# docs = json.load(fp)
# courses.add_documents(docs)
# pp.pprint(courses.search(''))
# client.create_index('betaeducacao-prod-orders', {'primaryKey': 'id'})
# client.create_index('betaeducacao-prod-enrollments', {'primaryKey': 'id'})
# client.create_index('betaeducacao-prod-users_d2o3r5gmm4it7j', {'primaryKey': 'id'})
# An index is where the documents are stored.
# index = client.index('users')
# pp.pprint(index.search(query='*'))
# documents = [
# {'id': 1, 'title': 'Carol', 'genres': ['Romance', 'Drama']},
# {'id': 2, 'title': 'Wonder Woman', 'genres': ['Action', 'Adventure']},
# {'id': 3, 'title': 'Life of Pi', 'genres': ['Adventure', 'Drama']},
# {
# 'id': 4,
# 'title': 'Mad Max: Fury Road',
# 'genres': ['Adventure', 'Science Fiction'],
# },
# {'id': 5, 'title': 'Moana', 'genres': ['Fantasy', 'Action']},
# {'id': 6, 'title': 'Philadelphia', 'genres': ['Drama']},
# ]
# # # If the index 'movies' does not exist, Meilisearch creates it when you first add the documents.
# index.add_documents(documents
#
# )
# pp.pprint(client.get_keys({'limit': 3}).model_dump_json())
#
# uid='fdbdda56-00dd-4f53-934f-6629f3b08ee3' name=None description='Add, get documents and search' actions=['documents.add', 'documents.get', 'search'] indexes=['users', 'courses', 'enrollments', 'orders'] expires_at=None key='1aa4c720611269e9425e8467df7e802f3a20ad6c5f31fe875ac886fc4efa2c83' created_at=datetime.datetime(2025, 4, 1, 0, 46, 27, 751365) updated_at=datetime.datetime(2025, 4, 1, 0, 46, 27, 751365)

View File

@@ -1,8 +0,0 @@
import json
from layercake.dynamodb import serialize
with open('cli/search-results.json') as fp:
docs = json.load(fp)
for doc in docs:
print(json.dumps(serialize(doc)))

View File

@@ -0,0 +1,66 @@
import json
import sqlite3
from functools import partial
from pathlib import Path
from typing import Generator
import jsonlines
from aws_lambda_powertools.shared.json_encoder import Encoder
from layercake.dynamodb import deserialize
from tqdm import tqdm
class JSONEncoder(Encoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return super().default(obj)
def readlines(dirpath: Path) -> Generator:
for path in dirpath.iterdir():
if not path.is_file():
continue
with jsonlines.open(path) as fp:
for obj in fp:
yield deserialize(obj['Item'])
sqlite3.register_adapter(dict, partial(json.dumps, cls=JSONEncoder))
if __name__ == '__main__':
try:
input_dirpath = Path(input('📂 Path to the folder with .jsonl files: '))
if not input_dirpath.exists() or not input_dirpath.is_dir():
print(f'❌ Directory "{input_dirpath}" not found or is not a folder.')
exit(1)
table_name = input('💾 Enter the name of the table (e.g., users): ')
with sqlite3.connect('mydatabase.db') as conn:
cursor = conn.cursor()
cursor.execute(
'CREATE TABLE IF NOT EXISTS %s (id TEXT, sk TEXT, json JSON)'
% table_name
)
for record in tqdm(
readlines(input_dirpath),
desc=f'⏳ Inserting into table {table_name}',
):
cursor.execute(
'INSERT INTO %s (id, sk, json) VALUES (:id, :sk, :json)'
% table_name,
{
'id': record['id'],
'sk': record['sk'],
'json': record,
},
)
except KeyboardInterrupt:
print('\n👋 Cancelled by user')
except Exception as e:
print(f'💥 Error: {e}')

View File

@@ -2,12 +2,23 @@ from typing import Any, Generator
import boto3
import jsonlines
from aws_lambda_powertools.shared.json_encoder import Encoder
from elasticsearch import Elasticsearch
from layercake.dynamodb import deserialize
from meilisearch import Client as Meilisearch
from tqdm import tqdm
elastic_client = Elasticsearch('http://127.0.0.1:9200')
dynamodb_client = boto3.client('dynamodb', endpoint_url='http://127.0.0.1:8000')
meili_client = Meilisearch('http://127.0.0.1:7700')
class JSONEncoder(Encoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return super(__class__, self).default(obj)
jsonl_files = (
'test-orders.jsonl',
@@ -99,9 +110,24 @@ if __name__ == '__main__':
put_item(line, table_name, dynamodb_client) # type: ignore
# Scan DynamoDB tables and index the data into Elasticsearch
# for file in tqdm(jsonl_files, desc='Scanning tables'):
# table_name = file.removesuffix('.jsonl')
# elastic.delete_index(table_name)
# for doc in tqdm(
# scan_table(
# table_name,
# dynamodb_client,
# FilterExpression='sk = :sk',
# ExpressionAttributeValues={':sk': {'S': '0'}},
# ),
# desc=f'Indexing {table_name}',
# ):
# elastic.index_item(id=doc['id'], index=table_name, doc=doc)
# Scan DynamoDB tables and index the data into Meilisearch
for file in tqdm(jsonl_files, desc='Scanning tables'):
table_name = file.removesuffix('.jsonl')
elastic.delete_index(table_name)
for doc in tqdm(
scan_table(
@@ -112,4 +138,16 @@ if __name__ == '__main__':
),
desc=f'Indexing {table_name}',
):
elastic.index_item(id=doc['id'], index=table_name, doc=doc)
meili_client.index(table_name).add_documents([doc], serializer=JSONEncoder)
if table_name == 'test-enrollments':
print('a')
print(doc)
index = meili_client.index(table_name)
index.update_settings(
{
'sortableAttributes': ['create_date'],
'filterableAttributes': ['metadata__tenant_id'],
}
)

View File

@@ -1,3 +1,10 @@
"""
Extracts all .gz files from a given directory and writes the uncompressed
outputs to an 'out/' subfolder within that directory.
Useful for simple, batch decompression of Gzip files via CLI.
"""
import gzip
from pathlib import Path
@@ -9,15 +16,11 @@ def unzip_gzip(file: Path, target: str):
def unzip_gzfiles(dirpath: Path) -> None:
"""Unzip the .gz files from a dir."""
if not dirpath.exists() or not dirpath.is_dir():
print(f'"{dirpath}" not found')
return None
"""Unzips all .gz files from a given directory into an 'out/' folder."""
gzfiles = list(dirpath.glob('*.gz'))
if not gzfiles:
print(f'No .gz files found in "{dirpath}"')
print(f' No .gz files found in "{dirpath}"')
return None
# Create a directory to output the files
@@ -30,12 +33,19 @@ def unzip_gzfiles(dirpath: Path) -> None:
continue
if unzip_gzip(file, f'{dirpath}/out/{filename}'):
print(f'Unzipped "{file.name}"')
print(f'Unzipped: {file.name}')
if __name__ == '__main__':
try:
dirpath = input('Type the directory path\n')
unzip_gzfiles(Path(dirpath))
except (KeyboardInterrupt, Exception):
print('See ya')
dirpath = Path(input('📂 Enter the directory containing .gz files: '))
if not dirpath.exists() or not dirpath.is_dir():
print(f'❌ Directory "{dirpath}" not found or is not a folder.')
exit(1)
unzip_gzfiles(dirpath)
except KeyboardInterrupt:
print('\n👋 Cancelled by user')
except Exception as e:
print(f'💥 Error: {e}')