Skip to content

perform

Perform OPTIMADE queries

db_find(database, endpoint, response_model, query_params='', raw_url=None)

Imitate Collection.find() for any given database for entry-resource endpoints

Parameters:

Name Type Description Default
database Union[LinksResource, Dict[str, Any]]

The OPTIMADE implementation to be queried. It must have a valid base URL and id.

required
endpoint str

The entry-listing endpoint, e.g., "structures".

required
response_model Union[EntryResponseMany, EntryResponseOne]

The expected OPTIMADE pydantic response model, e.g., optimade.models.StructureResponseMany.

required
query_params str

URL query parameters to pass to the database.

''
raw_url str

A raw URL to use straight up instead of deriving a URL from database, endpoint, and query_params.

None

Returns:

Type Description
Tuple[Union[ErrorResponse, EntryResponseMany, EntryResponseOne], str]

Response as an optimade pydantic model and the database's ID.

Source code in optimade_gateway/queries/perform.py
def db_find(
    database: "Union[LinksResource, Dict[str, Any]]",
    endpoint: str,
    response_model: "Union[EntryResponseMany, EntryResponseOne]",
    query_params: str = "",
    raw_url: str = None,
) -> "Tuple[Union[ErrorResponse, EntryResponseMany, EntryResponseOne], str]":
    """Imitate `Collection.find()` for any given database for entry-resource endpoints

    Parameters:
        database: The OPTIMADE implementation to be queried.
            It **must** have a valid base URL and id.
        endpoint: The entry-listing endpoint, e.g., `"structures"`.
        response_model: The expected OPTIMADE pydantic response model, e.g.,
            `optimade.models.StructureResponseMany`.
        query_params: URL query parameters to pass to the database.
        raw_url: A raw URL to use straight up instead of deriving a URL from `database`,
            `endpoint`, and `query_params`.

    Returns:
        Response as an `optimade` pydantic model and the `database`'s ID.

    """
    if TYPE_CHECKING or bool(os.getenv("MKDOCS_BUILD", "")):  # pragma: no cover
        response: "Union[httpx.Response, Dict[str, Any], EntryResponseMany, EntryResponseOne, ErrorResponse]"  # pylint: disable=line-too-long

    if raw_url:
        url = raw_url
    else:
        url = (
            f"{str(get_resource_attribute(database, 'attributes.base_url')).strip('/')}"
            f"{BASE_URL_PREFIXES['major']}/{endpoint.strip('/')}?{query_params}"
        )
    response = httpx.get(url, timeout=60)

    try:
        response = response.json()
    except json.JSONDecodeError:
        return (
            ErrorResponse(
                errors=[
                    {
                        "detail": f"Could not JSONify response from {url}",
                        "id": "OPTIMADE_GATEWAY_DB_FIND_MANY_JSONDECODEERROR",
                    }
                ],
                meta={
                    "query": {
                        "representation": f"/{endpoint.strip('/')}?{query_params}"
                    },
                    "api_version": __api_version__,
                    "more_data_available": False,
                },
            ),
            get_resource_attribute(database, "id"),
        )

    try:
        response = response_model(**response)
    except ValidationError:
        try:
            response = ErrorResponse(**response)
        except ValidationError as exc:
            # If it's an error and `meta` is missing, it is not a valid OPTIMADE response,
            # but this happens a lot, and is therefore worth having an edge-case for.
            if "errors" in response:
                errors = list(response["errors"])
                errors.append(
                    {
                        "detail": (
                            f"Could not pass response from {url} as either a "
                            f"{response_model.__name__!r} or 'ErrorResponse'. "
                            f"ValidationError: {exc}"
                        ),
                        "id": "OPTIMADE_GATEWAY_DB_FINDS_MANY_VALIDATIONERRORS",
                    }
                )
                return (
                    ErrorResponse(
                        errors=errors,
                        meta={
                            "query": {
                                "representation": f"/{endpoint.strip('/')}?{query_params}"
                            },
                            "api_version": __api_version__,
                            "more_data_available": False,
                        },
                    ),
                    get_resource_attribute(database, "id"),
                )

            return (
                ErrorResponse(
                    errors=[
                        {
                            "detail": (
                                f"Could not pass response from {url} as either a "
                                f"{response_model.__name__!r} or 'ErrorResponse'. "
                                f"ValidationError: {exc}"
                            ),
                            "id": "OPTIMADE_GATEWAY_DB_FINDS_MANY_VALIDATIONERRORS",
                        }
                    ],
                    meta={
                        "query": {
                            "representation": f"/{endpoint.strip('/')}?{query_params}"
                        },
                        "api_version": __api_version__,
                        "more_data_available": False,
                    },
                ),
                get_resource_attribute(database, "id"),
            )

    return response, get_resource_attribute(database, "id")

db_get_all_resources(database, endpoint, response_model, query_params='', raw_url=None) async

Recursively retrieve all resources from an entry-listing endpoint

This function keeps pulling the links.next link if meta.more_data_available is True to ultimately retrieve all entries for endpoint.

Warning

This function can be dangerous if an endpoint with hundreds or thousands of entries is requested.

Parameters:

Name Type Description Default
database Union[LinksResource, Dict[str, Any]]

The OPTIMADE implementation to be queried. It must have a valid base URL and id.

required
endpoint str

The entry-listing endpoint, e.g., "structures".

required
response_model EntryResponseMany

The expected OPTIMADE pydantic response model, e.g., optimade.models.StructureResponseMany.

required
query_params str

URL query parameters to pass to the database.

''
raw_url str

A raw URL to use straight up instead of deriving a URL from database, endpoint, and query_params.

None

Returns:

Type Description
Tuple[List[Union[EntryResource, Dict[str, Any]]], Union[LinksResource, Dict[str, Any]]]

A collected list of successful responses' data value and the database's ID.

Source code in optimade_gateway/queries/perform.py
async def db_get_all_resources(
    database: "Union[LinksResource, Dict[str, Any]]",
    endpoint: str,
    response_model: "EntryResponseMany",
    query_params: str = "",
    raw_url: str = None,
) -> "Tuple[List[Union[EntryResource, Dict[str, Any]]], Union[LinksResource, Dict[str, Any]]]":  # pylint: disable=line-too-long
    """Recursively retrieve all resources from an entry-listing endpoint

    This function keeps pulling the `links.next` link if `meta.more_data_available` is
    `True` to ultimately retrieve *all* entries for `endpoint`.

    !!! warning
        This function can be dangerous if an endpoint with hundreds or thousands of
        entries is requested.

    Parameters:
        database: The OPTIMADE implementation to be queried.
            It **must** have a valid base URL and id.
        endpoint: The entry-listing endpoint, e.g., `"structures"`.
        response_model: The expected OPTIMADE pydantic response model, e.g.,
            `optimade.models.StructureResponseMany`.
        query_params: URL query parameters to pass to the database.
        raw_url: A raw URL to use straight up instead of deriving a URL from `database`,
            `endpoint`, and `query_params`.

    Returns:
        A collected list of successful responses' `data` value and the `database`'s ID.

    """
    resulting_resources = []

    response, _ = db_find(
        database=database,
        endpoint=endpoint,
        response_model=response_model,
        query_params=query_params,
        raw_url=raw_url,
    )

    if isinstance(response, ErrorResponse):
        # An errored response will result in no databases from a provider.
        LOGGER.error(
            "Error while querying database (id=%r). Full response: %s",
            get_resource_attribute(database, "id"),
            response.json(indent=2),
        )
        return [], database

    resulting_resources.extend(response.data)

    if response.meta.more_data_available:
        next_page = get_resource_attribute(response, "links.next")
        if next_page is None:
            LOGGER.error(
                "Could not find a 'next' link for an OPTIMADE query request to %r "
                "(id=%r). Cannot get all resources from /%s, even though this was asked "
                "and `more_data_available` is `True` in the response.",
                get_resource_attribute(database, "attributes.name", "N/A"),
                get_resource_attribute(database, "id"),
                endpoint,
            )
            return resulting_resources, database

        more_resources, _ = await db_get_all_resources(
            database=database,
            endpoint=endpoint,
            response_model=response_model,
            query_params=query_params,
            raw_url=next_page,
        )
        resulting_resources.extend(more_resources)

    return resulting_resources, database

perform_query(url, query) async

Perform OPTIMADE query with gateway.

Parameters:

Name Type Description Default
url URL

Original request URL.

required
query QueryResource

The query to be performed.

required

Returns:

Type Description
Union[EntryResponseMany, ErrorResponse, GatewayQueryResponse]

This function returns the final response; a GatewayQueryResponse.

Source code in optimade_gateway/queries/perform.py
async def perform_query(
    url: "URL",
    query: "QueryResource",
) -> "Union[EntryResponseMany, ErrorResponse, GatewayQueryResponse]":
    """Perform OPTIMADE query with gateway.

    Parameters:
        url: Original request URL.
        query: The query to be performed.

    Returns:
        This function returns the final response; a
        [`GatewayQueryResponse`][optimade_gateway.models.queries.GatewayQueryResponse].

    """
    await update_query(query, "state", QueryState.STARTED)

    gateway: GatewayResource = await get_valid_resource(
        await collection_factory(CONFIG.gateways_collection),
        query.attributes.gateway_id,
    )

    filter_queries = await prepare_query_filter(
        database_ids=[_.id for _ in gateway.attributes.databases],
        filter_query=query.attributes.query_parameters.filter,
    )

    url = url.replace(path=f"{url.path.rstrip('/')}/{query.id}")
    await update_query(
        query,
        "response",
        GatewayQueryResponse(
            data={},
            links=ToplevelLinks(next=None),
            meta=meta_values(
                url=url,
                data_available=0,
                data_returned=0,
                more_data_available=False,
            ),
        ),
        operator=None,
        **{"$set": {"state": QueryState.IN_PROGRESS}},
    )

    loop = asyncio.get_running_loop()
    with ThreadPoolExecutor(
        max_workers=min(
            32, (os.cpu_count() or 0) + 4, len(gateway.attributes.databases)
        )
    ) as executor:
        # Run OPTIMADE DB queries in a thread pool, i.e., not using the main OS thread,
        # where the asyncio event loop is running.
        query_tasks = []
        for database in gateway.attributes.databases:
            query_params = await get_query_params(
                query_parameters=query.attributes.query_parameters,
                database_id=database.id,
                filter_mapping=filter_queries,
            )
            query_tasks.append(
                loop.run_in_executor(
                    executor=executor,
                    func=functools.partial(
                        db_find,
                        database=database,
                        endpoint=query.attributes.endpoint.value,
                        response_model=query.attributes.endpoint.get_response_model(),
                        query_params=query_params,
                    ),
                )
            )

        for query_task in query_tasks:
            (db_response, db_id) = await query_task

            await process_db_response(
                response=db_response,
                database_id=db_id,
                query=query,
                gateway=gateway,
            )

    # Pagination
    #
    # if isinstance(results, list) and get_resource_attribute(
    #     query,
    #     "attributes.response.meta.more_data_available",
    #     False,
    #     disambiguate=False,  # Extremely minor speed-up
    # ):
    #     # Deduce the `next` link from the current request
    #     query_string = urllib.parse.parse_qs(url.query)
    #     query_string["page_offset"] = [
    #         int(query_string.get("page_offset", [0])[0])  # type: ignore[list-item]
    #         + len(results[: query.attributes.query_parameters.page_limit])
    #     ]
    #     urlencoded = urllib.parse.urlencode(query_string, doseq=True)
    #     base_url = get_base_url(url)

    #     links = ToplevelLinks(next=f"{base_url}{url.path}?{urlencoded}")

    #     await update_query(query, "response.links", links)

    await update_query(query, "state", QueryState.FINISHED)
    return query.attributes.response
Back to top