Skip to content

mongo

MongoCollection (EntryCollection)

Class for querying MongoDB collections (implemented by either pymongo or mongomock) containing serialized EntryResources objects.

Source code in optimade/server/entry_collections/mongo.py
class MongoCollection(EntryCollection):
    """Class for querying MongoDB collections (implemented by either pymongo or mongomock)
    containing serialized [`EntryResource`][optimade.models.entries.EntryResource]s objects.

    """

    def __init__(
        self,
        name: str,
        resource_cls: type[EntryResource],
        resource_mapper: type[BaseResourceMapper],
        database: str = CONFIG.mongo_database,
    ):
        """Initialize the MongoCollection for the given parameters.

        Parameters:
            name: The name of the collection.
            resource_cls: The type of entry resource that is stored by the collection.
            resource_mapper: A resource mapper object that handles aliases and
                format changes between deserialization and response.
            database: The name of the underlying MongoDB database to connect to.

        """
        super().__init__(
            resource_cls,
            resource_mapper,
            MongoTransformer(mapper=resource_mapper),
        )

        self.collection = CLIENT[database][name]

        # check aliases do not clash with mongo operators
        self._check_aliases(self.resource_mapper.all_aliases())
        self._check_aliases(self.resource_mapper.all_length_aliases())

    def __len__(self) -> int:
        """Returns the total number of entries in the collection."""
        return self.collection.estimated_document_count()

    def count(self, **kwargs: Any) -> Union[int, None]:
        """Returns the number of entries matching the query specified
        by the keyword arguments, or `None` if the count timed out.

        Parameters:
            **kwargs: Query parameters as keyword arguments. The keys
                'filter', 'skip', 'limit', 'hint' and 'maxTimeMS' will be passed
                to the `pymongo.collection.Collection.count_documents` method.

        """
        for k in list(kwargs.keys()):
            if k not in ("filter", "skip", "limit", "hint", "maxTimeMS"):
                del kwargs[k]
        if "filter" not in kwargs:
            return self.collection.estimated_document_count()
        else:
            if "maxTimeMS" not in kwargs:
                kwargs["maxTimeMS"] = 1000 * CONFIG.mongo_count_timeout
            try:
                return self.collection.count_documents(**kwargs)
            except ExecutionTimeout:
                return None

    def insert(self, data: list[EntryResource]) -> None:
        """Add the given entries to the underlying database.

        Warning:
            No validation is performed on the incoming data.

        Arguments:
            data: The entry resource objects to add to the database.

        """
        self.collection.insert_many(data)

    def handle_query_params(
        self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
    ) -> dict[str, Any]:
        """Parse and interpret the backend-agnostic query parameter models into a dictionary
        that can be used by MongoDB.

        This Mongo-specific method calls the base `EntryCollection.handle_query_params` method
        and adds additional handling of the MongoDB ObjectID type.

        Parameters:
            params: The initialized query parameter model from the server.

        Raises:
            Forbidden: If too large of a page limit is provided.
            BadRequest: If an invalid request is made, e.g., with incorrect fields
                or response format.

        Returns:
            A dictionary representation of the query parameters.

        """
        criteria = super().handle_query_params(params)
        # Handle MongoDB ObjectIDs:
        # - If they were not requested, then explicitly remove them
        # - If they were requested, then cast them to strings in the response
        if "_id" not in criteria.get("projection", {}):
            criteria["projection"]["_id"] = False

        if "page_above" in criteria:
            raise NotImplementedError(
                "`page_above` is not implemented for this backend."
            )

        if criteria.get("projection", {}).get("_id"):
            criteria["projection"]["_id"] = {"$toString": "$_id"}

        return criteria

    def _run_db_query(
        self, criteria: dict[str, Any], single_entry: bool = False
    ) -> tuple[list[dict[str, Any]], Optional[int], bool]:
        """Run the query on the backend and collect the results.

        Arguments:
            criteria: A dictionary representation of the query parameters.
            single_entry: Whether or not the caller is expecting a single entry response.

        Returns:
            The list of entries from the database (without any re-mapping), the total number of
            entries matching the query and a boolean for whether or not there is more data available.

        """
        results = list(self.collection.find(**criteria))

        if CONFIG.database_backend == SupportedBackend.MONGOMOCK and criteria.get(
            "projection", {}
        ).get("_id"):
            # mongomock does not support `$toString`` in projection, so we have to do it manually
            for ind, doc in enumerate(results):
                results[ind]["_id"] = str(doc["_id"])

        nresults_now = len(results)
        if not single_entry:
            criteria_nolimit = criteria.copy()
            criteria_nolimit.pop("limit", None)
            skip = criteria_nolimit.pop("skip", 0)
            data_returned = self.count(**criteria_nolimit)
            # Only correct most of the time: if the total number of remaining results is exactly the page limit
            # then this will incorrectly say there is more_data_available
            if data_returned is None:
                more_data_available = nresults_now == criteria.get("limit", 0)
            else:
                more_data_available = nresults_now + skip < data_returned
        else:
            # SingleEntryQueryParams, e.g., /structures/{entry_id}
            data_returned = nresults_now
            more_data_available = False

        return results, data_returned, more_data_available

    def _check_aliases(self, aliases):
        """Check that aliases do not clash with mongo keywords."""
        if any(
            alias[0].startswith("$") or alias[1].startswith("$") for alias in aliases
        ):
            raise RuntimeError(f"Cannot define an alias starting with a '$': {aliases}")

__init__(self, name, resource_cls, resource_mapper, database='optimade') special

Initialize the MongoCollection for the given parameters.

Parameters:

Name Type Description Default
name str

The name of the collection.

required
resource_cls type

The type of entry resource that is stored by the collection.

required
resource_mapper type

A resource mapper object that handles aliases and format changes between deserialization and response.

required
database str

The name of the underlying MongoDB database to connect to.

'optimade'
Source code in optimade/server/entry_collections/mongo.py
def __init__(
    self,
    name: str,
    resource_cls: type[EntryResource],
    resource_mapper: type[BaseResourceMapper],
    database: str = CONFIG.mongo_database,
):
    """Initialize the MongoCollection for the given parameters.

    Parameters:
        name: The name of the collection.
        resource_cls: The type of entry resource that is stored by the collection.
        resource_mapper: A resource mapper object that handles aliases and
            format changes between deserialization and response.
        database: The name of the underlying MongoDB database to connect to.

    """
    super().__init__(
        resource_cls,
        resource_mapper,
        MongoTransformer(mapper=resource_mapper),
    )

    self.collection = CLIENT[database][name]

    # check aliases do not clash with mongo operators
    self._check_aliases(self.resource_mapper.all_aliases())
    self._check_aliases(self.resource_mapper.all_length_aliases())

count(self, **kwargs)

Returns the number of entries matching the query specified by the keyword arguments, or None if the count timed out.

Parameters:

Name Type Description Default
**kwargs Any

Query parameters as keyword arguments. The keys 'filter', 'skip', 'limit', 'hint' and 'maxTimeMS' will be passed to the pymongo.collection.Collection.count_documents method.

{}
Source code in optimade/server/entry_collections/mongo.py
def count(self, **kwargs: Any) -> Union[int, None]:
    """Returns the number of entries matching the query specified
    by the keyword arguments, or `None` if the count timed out.

    Parameters:
        **kwargs: Query parameters as keyword arguments. The keys
            'filter', 'skip', 'limit', 'hint' and 'maxTimeMS' will be passed
            to the `pymongo.collection.Collection.count_documents` method.

    """
    for k in list(kwargs.keys()):
        if k not in ("filter", "skip", "limit", "hint", "maxTimeMS"):
            del kwargs[k]
    if "filter" not in kwargs:
        return self.collection.estimated_document_count()
    else:
        if "maxTimeMS" not in kwargs:
            kwargs["maxTimeMS"] = 1000 * CONFIG.mongo_count_timeout
        try:
            return self.collection.count_documents(**kwargs)
        except ExecutionTimeout:
            return None

handle_query_params(self, params)

Parse and interpret the backend-agnostic query parameter models into a dictionary that can be used by MongoDB.

This Mongo-specific method calls the base EntryCollection.handle_query_params method and adds additional handling of the MongoDB ObjectID type.

Parameters:

Name Type Description Default
params Union[optimade.server.query_params.EntryListingQueryParams, optimade.server.query_params.SingleEntryQueryParams]

The initialized query parameter model from the server.

required

Exceptions:

Type Description
Forbidden

If too large of a page limit is provided.

BadRequest

If an invalid request is made, e.g., with incorrect fields or response format.

Returns:

Type Description
dict

A dictionary representation of the query parameters.

Source code in optimade/server/entry_collections/mongo.py
def handle_query_params(
    self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
) -> dict[str, Any]:
    """Parse and interpret the backend-agnostic query parameter models into a dictionary
    that can be used by MongoDB.

    This Mongo-specific method calls the base `EntryCollection.handle_query_params` method
    and adds additional handling of the MongoDB ObjectID type.

    Parameters:
        params: The initialized query parameter model from the server.

    Raises:
        Forbidden: If too large of a page limit is provided.
        BadRequest: If an invalid request is made, e.g., with incorrect fields
            or response format.

    Returns:
        A dictionary representation of the query parameters.

    """
    criteria = super().handle_query_params(params)
    # Handle MongoDB ObjectIDs:
    # - If they were not requested, then explicitly remove them
    # - If they were requested, then cast them to strings in the response
    if "_id" not in criteria.get("projection", {}):
        criteria["projection"]["_id"] = False

    if "page_above" in criteria:
        raise NotImplementedError(
            "`page_above` is not implemented for this backend."
        )

    if criteria.get("projection", {}).get("_id"):
        criteria["projection"]["_id"] = {"$toString": "$_id"}

    return criteria

insert(self, data)

Add the given entries to the underlying database.

Warning

No validation is performed on the incoming data.

Parameters:

Name Type Description Default
data list

The entry resource objects to add to the database.

required
Source code in optimade/server/entry_collections/mongo.py
def insert(self, data: list[EntryResource]) -> None:
    """Add the given entries to the underlying database.

    Warning:
        No validation is performed on the incoming data.

    Arguments:
        data: The entry resource objects to add to the database.

    """
    self.collection.insert_many(data)