Skip to content

collection

MongoDB collection for entry-endpoint resources.

The AsyncMongoCollection represents an asynchronous version of the equivalent MongoDB collection in optimade: MongoCollection.

AsyncMongoCollection (EntryCollection)

MongoDB Collection for use with asyncio

The asynchronicity is implemented using motor and asyncio.

__init__(self, name, resource_cls, resource_mapper) special

Initialize the AsyncMongoCollection for the given parameters.

Parameters:

Name Type Description Default
name str

The name of the collection.

required
resource_cls EntryResource

The EntryResource model that is stored by the collection.

required
resource_mapper BaseResourceMapper

A resource mapper object that handles aliases and format changes between deserialization and response.

required
Source code in optimade_gateway/mongo/collection.py
def __init__(
    self,
    name: str,
    resource_cls: "EntryResource",
    resource_mapper: "BaseResourceMapper",
):
    """Initialize the AsyncMongoCollection for the given parameters.

    Parameters:
        name: The name of the collection.
        resource_cls: The `EntryResource` model that is stored by the collection.
        resource_mapper: A resource mapper object that handles aliases and format
            changes between deserialization and response.

    """
    from optimade_gateway.mongo.database import (  # pylint: disable=import-outside-toplevel
        MONGO_DB,
    )

    super().__init__(
        resource_cls=resource_cls,
        resource_mapper=resource_mapper,
        transformer=MongoTransformer(mapper=resource_mapper),
    )

    self.parser = LarkParser(version=(1, 0, 0), variant="default")
    self.collection: MongoCollection = MONGO_DB[name]

    # Check aliases do not clash with mongo operators
    self._check_aliases(self.resource_mapper.all_aliases())
    self._check_aliases(self.resource_mapper.all_length_aliases())

acount(self, params=None, **kwargs) async

Count documents in Collection.

This is the asynchronous version of the parent class method named count().

Parameters:

Name Type Description Default
params Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]]

URL query parameters, either from a general entry endpoint or a single-entry endpoint.

None
kwargs dict

Query parameters as keyword arguments. Valid keys will be passed to the AsyncIOMotorCollection.count_documents method.

{}

Returns:

Type Description
int

The number of entries matching the query specified by the keyword arguments.

Source code in optimade_gateway/mongo/collection.py
async def acount(
    self,
    params: "Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]]" = None,
    **kwargs,
) -> int:
    """Count documents in Collection.

    This is the asynchronous version of the parent class method named `count()`.

    Parameters:
        params: URL query parameters, either from a general entry endpoint or a
            single-entry endpoint.
        kwargs (dict): Query parameters as keyword arguments. Valid keys will be passed
            to the
            [`AsyncIOMotorCollection.count_documents`](https://motor.readthedocs.io/en/stable/api-asyncio/asyncio_motor_collection.html#motor.motor_asyncio.AsyncIOMotorCollection.count_documents)
            method.

    Returns:
        int: The number of entries matching the query specified by the keyword
            arguments.

    """
    if params is not None and kwargs:
        raise ValueError(
            "When 'params' is supplied, no other parameters can be supplied."
        )

    if params is not None:
        kwargs = await self.ahandle_query_params(params)

    valid_method_keys = (
        "filter",
        "skip",
        "limit",
        "hint",
        "maxTimeMS",
        "collation",
        "session",
    )
    criteria = {key: kwargs[key] for key in valid_method_keys if key in kwargs}

    if criteria.get("filter") is None:
        criteria["filter"] = {}

    return await self.collection.count_documents(**criteria)

afind(self, params=None, criteria=None) async

Perform the query on the underlying MongoDB Collection, handling projection and pagination of the output.

This is the asynchronous version of the parent class method named count().

Either provide params or criteria. Not both, but at least one.

Parameters:

Name Type Description Default
params Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]]

URL query parameters, either from a general entry endpoint or a single-entry endpoint.

None
criteria Optional[Dict[str, Any]]

Already handled/parsed URL query parameters.

None

Returns:

Type Description
Tuple[Union[List[EntryResource], EntryResource, None], int, bool, Set[str], Set[str]]

A list of entry resource objects, how much data was returned for the query, whether more data is available with pagination, and fields (excluded, included).

Source code in optimade_gateway/mongo/collection.py
async def afind(
    self,
    params: "Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]]" = None,
    criteria: "Optional[Dict[str, Any]]" = None,
) -> "Tuple[Union[List[EntryResource], EntryResource, None], int, bool, Set[str], Set[str]]":
    """Perform the query on the underlying MongoDB Collection, handling projection
    and pagination of the output.

    This is the asynchronous version of the parent class method named `count()`.

    Either provide `params` or `criteria`. Not both, but at least one.

    Parameters:
        params: URL query parameters, either from a general entry endpoint or a
            single-entry endpoint.
        criteria: Already handled/parsed URL query parameters.

    Returns:
        A list of entry resource objects, how much data was returned for the query,
        whether more data is available with pagination, and fields (excluded,
        included).

    """
    if (params is None and criteria is None) or (
        params is not None and criteria is not None
    ):
        raise ValueError(
            "Exacly one of either `params` and `criteria` must be specified."
        )

    # Set single_entry to False, this is done since if criteria is defined,
    # this is an unknown factor - better to then get a list of results.
    single_entry = False
    if criteria is None:
        criteria = await self.ahandle_query_params(params)
    else:
        single_entry = isinstance(params, SingleEntryQueryParams)

    response_fields = criteria.pop("fields", self.all_fields)

    results, data_returned, more_data_available = await self._arun_db_query(
        criteria=criteria,
        single_entry=single_entry,
    )

    if single_entry:
        results = results[0] if results else None  # type: ignore[assignment]

        if data_returned > 1:
            raise NotFound(
                detail=(
                    f"Instead of a single entry, {data_returned} entries were found"
                ),
            )

    include_fields = (
        response_fields - self.resource_mapper.TOP_LEVEL_NON_ATTRIBUTES_FIELDS
    )
    bad_optimade_fields = set()
    bad_provider_fields = set()
    for field in include_fields:
        if field not in self.resource_mapper.ALL_ATTRIBUTES:
            if field.startswith("_"):
                if any(
                    field.startswith(f"_{prefix}_")
                    for prefix in self.resource_mapper.SUPPORTED_PREFIXES
                ):
                    bad_provider_fields.add(field)
            else:
                bad_optimade_fields.add(field)

    if bad_provider_fields:
        warn(
            UnknownProviderProperty(
                detail=(
                    "Unrecognised field(s) for this provider requested in "
                    f"`response_fields`: {bad_provider_fields}."
                )
            )
        )

    if bad_optimade_fields:
        raise BadRequest(
            detail=(
                "Unrecognised OPTIMADE field(s) in requested `response_fields`: "
                f"{bad_optimade_fields}."
            )
        )

    if results:
        results = await self.resource_mapper.adeserialize(results)

    return (  # type: ignore[return-value]
        results,
        data_returned,
        more_data_available,
        self.all_fields - response_fields,
        include_fields,
    )

ahandle_query_params(self, params) async

Parse and interpret the backend-agnostic query parameter models into a dictionary that can be used by the specific backend.

This is the asynchronous version of the parent class method named handle_query_params().

Note

Currently this method returns the pymongo interpretation of the parameters, which will need modification for modified for other backends.

Parameters:

Name Type Description Default
params Union[EntryListingQueryParams, SingleEntryQueryParams]

The initialized query parameter model from the server.

required

Exceptions:

Type Description
Forbidden

If too large of a page limit is provided.

BadRequest

If an invalid request is made, e.g., with incorrect fields or response format.

Returns:

Type Description
Dict[str, Any]

A dictionary representation of the query parameters.

Source code in optimade_gateway/mongo/collection.py
async def ahandle_query_params(
    self, params: "Union[EntryListingQueryParams, SingleEntryQueryParams]"
) -> "Dict[str, Any]":
    """Parse and interpret the backend-agnostic query parameter models into a
    dictionary that can be used by the specific backend.

    This is the asynchronous version of the parent class method named
    `handle_query_params()`.

    Note:
        Currently this method returns the pymongo interpretation of the parameters,
        which will need modification for modified for other backends.

    Parameters:
        params: The initialized query parameter model from the server.

    Raises:
        Forbidden: If too large of a page limit is provided.
        BadRequest: If an invalid request is made, e.g., with incorrect fields or
            response format.

    Returns:
        A dictionary representation of the query parameters.

    """
    return super().handle_query_params(params)

ainsert(self, data) async

Add the given entries to the underlying database.

This is the asynchronous version of the parent class method named insert().

Parameters:

Name Type Description Default
data List[EntryResource]

The entry resource objects to add to the database.

required
Source code in optimade_gateway/mongo/collection.py
async def ainsert(self, data: "List[EntryResource]") -> None:
    """Add the given entries to the underlying database.

    This is the asynchronous version of the parent class method named `insert()`.

    Arguments:
        data: The entry resource objects to add to the database.

    """
    await self.collection.insert_many(await clean_python_types(data))

count(self, **kwargs)

Returns the number of entries matching the query specified by the keyword arguments.

Parameters:

Name Type Description Default
kwargs dict

Query parameters as keyword arguments.

{}
Source code in optimade_gateway/mongo/collection.py
def count(self, **kwargs) -> int:
    raise NotImplementedError(
        "This method cannot be used with this class and is a remnant from the parent "
        "class. Use instead the asynchronous method `acount(params: "
        "Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]], "
        "**kwargs)`."
    )

create_one(self, resource) async

Create a new document in the MongoDB collection based on query parameters.

Update the newly created document with an "id" field. The value will be the string representation of the "_id" field. This will only be done if "id" is not already present in resource.

Parameters:

Name Type Description Default
resource EntryResourceCreate

The resource to be created.

required

Returns:

Type Description
EntryResource

The newly created document as a pydantic model entry resource.

Source code in optimade_gateway/mongo/collection.py
async def create_one(self, resource: "EntryResourceCreate") -> "EntryResource":
    """Create a new document in the MongoDB collection based on query parameters.

    Update the newly created document with an `"id"` field.
    The value will be the string representation of the `"_id"` field.
    This will only be done if `"id"` is not already present in `resource`.

    Parameters:
        resource: The resource to be created.

    Returns:
        The newly created document as a pydantic model entry resource.

    """
    resource.last_modified = datetime.utcnow()
    result = await self.collection.insert_one(
        await clean_python_types(resource.dict(exclude_unset=True))
    )
    LOGGER.debug(
        "Inserted resource %r in DB collection %s with ID %s",
        resource,
        self.collection.name,
        result.inserted_id,
    )

    if not resource.id:
        LOGGER.debug("Updating resource with an `id` field equal to str(id_).")
        await self.collection.update_one(
            {"_id": result.inserted_id}, {"$set": {"id": str(result.inserted_id)}}
        )

    return self.resource_cls(
        **self.resource_mapper.map_back(
            await self.collection.find_one({"_id": result.inserted_id})
        )
    )

exists(self, entry_id) async

Assert whether entry_id exists in the collection (value of "id")

Parameters:

Name Type Description Default
entry_id str

The "id" value of the entry.

required
Source code in optimade_gateway/mongo/collection.py
async def exists(self, entry_id: str) -> bool:
    """Assert whether entry_id exists in the collection (value of `"id"`)

    Parameters:
        entry_id: The `"id"` value of the entry.

    """
    return bool(await self.collection.count_documents({"id": entry_id}))

find(self, params)

Fetches results and indicates if more data is available.

Also gives the total number of data available in the absence of page_limit. See EntryListingQueryParams for more information.

Parameters:

Name Type Description Default
params Union[EntryListingQueryParams, SingleEntryQueryParams]

Entry listing URL query params.

required

Returns:

Type Description
A tuple of various relevant values

(results, data_returned, more_data_available, exclude_fields, include_fields).

Source code in optimade_gateway/mongo/collection.py
def find(
    self, params: "Union[EntryListingQueryParams, SingleEntryQueryParams]"
) -> "Tuple[Union[List[EntryResource], EntryResource, None], int, bool, Set[str], Set[str]]":
    """
    Fetches results and indicates if more data is available.

    Also gives the total number of data available in the absence of `page_limit`.
    See
    [`EntryListingQueryParams`](https://www.optimade.org/optimade-python-tools/api_reference/server/query_params/#optimade.server.query_params.EntryListingQueryParams)
    for more information.

    Parameters:
        params: Entry listing URL query params.

    Returns:
        A tuple of various relevant values:
        (`results`, `data_returned`, `more_data_available`, `exclude_fields`,
        `include_fields`).

    """
    raise NotImplementedError(
        "This method cannot be used with this class and is a remnant from the parent "
        "class. Use instead the asynchronous method `afind(params: "
        "Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]], criteria: "
        "Optional[Dict[str, Any]])`."
    )

get_multiple(self, **criteria) async

Get a list of resources based on criteria

Warning

This is not to be used for creating a REST API response, but is rather a utility function to easily retrieve a list of resources.

Parameters:

Name Type Description Default
criteria Dict[str, Any]

Already handled/parsed URL query parameters.

{}

Returns:

Type Description
List[EntryResource]

A list of resources from the MongoDB (mapped to pydantic models).

Source code in optimade_gateway/mongo/collection.py
async def get_multiple(self, **criteria: "Dict[str, Any]") -> "List[EntryResource]":
    """Get a list of resources based on criteria

    Warning:
        This is not to be used for creating a REST API response,
        but is rather a utility function to easily retrieve a list of resources.

    Parameters:
        criteria: Already handled/parsed URL query parameters.

    Returns:
        A list of resources from the MongoDB (mapped to pydantic models).

    """
    criteria = criteria or {}

    results = []
    async for document in self.collection.find(**self._valid_find_keys(**criteria)):
        results.append(self.resource_cls(**self.resource_mapper.map_back(document)))

    return results

get_one(self, **criteria) async

Get one resource based on criteria

Warning

This is not to be used for creating a REST API response, but is rather a utility function to easily retrieve a single resource.

Parameters:

Name Type Description Default
criteria Dict[str, Any]

Already handled/parsed URL query parameters.

{}

Returns:

Type Description
EntryResource

A single resource from the MongoDB (mapped to pydantic models).

Source code in optimade_gateway/mongo/collection.py
async def get_one(self, **criteria: "Dict[str, Any]") -> "EntryResource":
    """Get one resource based on criteria

    Warning:
        This is not to be used for creating a REST API response,
        but is rather a utility function to easily retrieve a single resource.

    Parameters:
        criteria: Already handled/parsed URL query parameters.

    Returns:
        A single resource from the MongoDB (mapped to pydantic models).

    """
    criteria = criteria or {}

    return self.resource_cls(
        **self.resource_mapper.map_back(
            await self.collection.find_one(**self._valid_find_keys(**criteria))
        )
    )

handle_query_params(self, params)

Parse and interpret the backend-agnostic query parameter models into a dictionary that can be used by the specific backend.

Note

Currently this method returns the pymongo interpretation of the parameters, which will need modification for modified for other backends.

Parameters:

Name Type Description Default
params Union[EntryListingQueryParams, SingleEntryQueryParams]

The initialized query parameter model from the server.

required

Exceptions:

Type Description
Forbidden

If too large of a page limit is provided.

BadRequest

If an invalid request is made, e.g., with incorrect fields or response format.

Returns:

Type Description
Dict[str, Any]

A dictionary representation of the query parameters.

Source code in optimade_gateway/mongo/collection.py
def handle_query_params(
    self, params: "Union[EntryListingQueryParams, SingleEntryQueryParams]"
) -> "Dict[str, Any]":
    """Parse and interpret the backend-agnostic query parameter models into a
    dictionary that can be used by the specific backend.

    Note:
        Currently this method returns the pymongo interpretation of the parameters,
        which will need modification for modified for other backends.

    Parameters:
        params: The initialized query parameter model from the server.

    Raises:
        Forbidden: If too large of a page limit is provided.
        BadRequest: If an invalid request is made, e.g., with incorrect fields
            or response format.

    Returns:
        A dictionary representation of the query parameters.

    """
    raise NotImplementedError(
        "This method cannot be used with this class and is a remnant from the parent "
        "class. Use instead the asynchronous method `ahandle_query_params(params: "
        "Union[EntryListingQueryParams, SingleEntryQueryParams])`."
    )

insert(self, data)

Add the given entries to the underlying database.

Parameters:

Name Type Description Default
data List[EntryResource]

The entry resource objects to add to the database.

required
Source code in optimade_gateway/mongo/collection.py
def insert(self, data: "List[EntryResource]") -> None:
    raise NotImplementedError(
        "This method cannot be used with this class and is a remnant from the parent "
        "class. Use instead the asynchronous method `ainsert(data: "
        "List[EntryResource])`."
    )
Back to top