Files
saladeaula.digital/http-api/elastic.py
2025-03-21 16:18:40 -03:00

47 lines
981 B
Python

import math
from typing import TypedDict
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
MAX_PAGE_SIZE = 100
class PaginatedResult(TypedDict):
total_items: int
total_pages: int
items: list[dict]
def search(
index: str,
*,
query: dict,
page_size: int = 25,
elastic_client: Elasticsearch,
) -> PaginatedResult:
if page_size > MAX_PAGE_SIZE:
page_size = MAX_PAGE_SIZE
s = Search(
using=elastic_client,
index=index,
)
s.update_from_dict(query)
s.extra(size=page_size)
try:
r = s.execute()
except Exception:
return {
'total_items': 0,
'total_pages': 0,
'items': [],
}
else:
return {
'total_items': r.hits.total.value, # type: ignore
'total_pages': math.ceil(r.hits.total.value / page_size), # type: ignore
'items': [hit.to_dict() for hit in r],
}