import math from typing import Any from elasticsearch import Elasticsearch from elasticsearch_dsl import Search MAX_PAGE_SIZE = 100 def search( index: str, *, query: dict, page_size: int = 25, elastic_client: Elasticsearch, ) -> dict[str, Any]: s = Search( using=elastic_client, index=index, ) s.update_from_dict(query) if page_size > MAX_PAGE_SIZE: page_size = MAX_PAGE_SIZE s.extra(size=page_size) try: r = s.execute() except Exception: return { 'total_hits': 0, 'total_pages': 0, 'hits': [], } else: return { 'total_hits': r.hits.total.value, # type: ignore 'total_pages': math.ceil(r.hits.total.value / page_size), # type: ignore 'hits': [hit.to_dict() for hit in r], }