Skip to content

collection

AsyncMongoCollection

MongoDB Collection for use with asyncio

The asynchronicity is implemented using motor and asyncio.

__init__(self, name, resource_cls, resource_mapper) special

Initialize the AsyncMongoCollection for the given parameters.

Parameters:

Name Type Description Default
name str

The name of the collection.

required
resource_cls EntryResource

The EntryResource model that is stored by the collection.

required
resource_mapper BaseResourceMapper

A resource mapper object that handles aliases and format changes between deserialization and response.

required
Source code in optimade_gateway/mongo/collection.py
def __init__(
    self,
    name: str,
    resource_cls: EntryResource,
    resource_mapper: BaseResourceMapper,
):
    """Initialize the AsyncMongoCollection for the given parameters.

    Parameters:
        name: The name of the collection.
        resource_cls: The `EntryResource` model that is stored by the collection.
        resource_mapper: A resource mapper object that handles aliases and format changes between deserialization and response.

    """
    from optimade_gateway.mongo.database import MONGO_DB

    super().__init__(
        resource_cls=resource_cls,
        resource_mapper=resource_mapper,
        transformer=MongoTransformer(mapper=resource_mapper),
    )

    self.parser = LarkParser(version=(1, 0, 0), variant="default")
    self.collection: MongoCollection = MONGO_DB[name]

    # Check aliases do not clash with mongo operators
    self._check_aliases(self.resource_mapper.all_aliases())
    self._check_aliases(self.resource_mapper.all_length_aliases())

__len__(self) special

Length of collection

Source code in optimade_gateway/mongo/collection.py
def __len__(self) -> int:
    """Length of collection"""
    import warnings

    warnings.warn(
        "Cannot calculate length of collection using `len()`. Use `count()` instead.",
        OptimadeGatewayWarning,
    )
    return 0

__repr__(self) special

Representation of instance

Source code in optimade_gateway/mongo/collection.py
def __repr__(self) -> str:
    """Representation of instance"""
    return f"{self.__class__.__name__}(name={self.collection.name!r}, resource_cls={self.resource_cls!r}, resource_mapper={self.resource_mapper!r}"

__str__(self) special

Standard printing result for an instance

Source code in optimade_gateway/mongo/collection.py
def __str__(self) -> str:
    """Standard printing result for an instance"""
    return f"<{self.__class__.__name__}: resource={self.resource_cls.__name__} endpoint(mapper)={self.resource_mapper.ENDPOINT} DB_collection={self.collection.name}>"

count(self, params=None, **kwargs) async

Count documents in Collection

Parameters:

Name Type Description Default
params Union[optimade.server.query_params.EntryListingQueryParams, optimade.server.query_params.SingleEntryQueryParams]

URL query parameters, either from a general entry endpoint or a single-entry endpoint.

None
kwargs dict

Query parameters as keyword arguments. Valid keys will be passed to the AsyncIOMotorCollection.count_documents method.

{}

Returns:

Type Description
int

int: The number of entries matching the query specified by the keyword arguments.

Source code in optimade_gateway/mongo/collection.py
async def count(
    self,
    params: Union[EntryListingQueryParams, SingleEntryQueryParams] = None,
    **kwargs,
) -> int:
    """Count documents in Collection

    Parameters:
        params: URL query parameters, either from a general entry endpoint or a single-entry endpoint.
        kwargs (dict): Query parameters as keyword arguments. Valid keys will be passed
            to the
            [`AsyncIOMotorCollection.count_documents`](https://motor.readthedocs.io/en/stable/api-asyncio/asyncio_motor_collection.html#motor.motor_asyncio.AsyncIOMotorCollection.count_documents)
            method.

    Returns:
        int: The number of entries matching the query specified by the keyword arguments.

    """
    if params is not None and kwargs:
        raise ValueError(
            "When 'params' is supplied, no other parameters can be supplied."
        )

    if params is not None:
        kwargs = await self.handle_query_params(params)

    valid_method_keys = (
        "filter",
        "skip",
        "limit",
        "hint",
        "maxTimeMS",
        "collation",
        "session",
    )
    criteria = {key: kwargs[key] for key in valid_method_keys if key in kwargs}

    if criteria.get("filter") is None:
        criteria["filter"] = {}

    return await self.collection.count_documents(**criteria)

create_one(self, resource) async

Create a new document in the MongoDB collection based on query parameters.

Update the newly created document with an "id" field. The value will be the string representation of the "_id" field. This will only be done if "id" is not already present in resource.

Parameters:

Name Type Description Default
resource EntryResourceCreate

The resource to be created.

required

Returns:

Type Description
EntryResource

The newly created document as a pydantic model entry resource.

Source code in optimade_gateway/mongo/collection.py
async def create_one(self, resource: EntryResourceCreate) -> EntryResource:
    """Create a new document in the MongoDB collection based on query parameters.

    Update the newly created document with an `"id"` field.
    The value will be the string representation of the `"_id"` field.
    This will only be done if `"id"` is not already present in `resource`.

    Parameters:
        resource: The resource to be created.

    Returns:
        The newly created document as a pydantic model entry resource.

    """
    resource.last_modified = datetime.utcnow()
    result = await self.collection.insert_one(
        await clean_python_types(resource.dict(exclude_unset=True))
    )
    LOGGER.debug(
        "Inserted resource %r in DB collection %s with ID %s",
        resource,
        self.collection.name,
        result.inserted_id,
    )

    if not resource.id:
        LOGGER.debug("Updating resource with an `id` field equal to str(id_).")
        await self.collection.update_one(
            {"_id": result.inserted_id}, {"$set": {"id": str(result.inserted_id)}}
        )

    return self.resource_cls(
        **self.resource_mapper.map_back(
            await self.collection.find_one({"_id": result.inserted_id})
        )
    )

exists(self, entry_id) async

Assert whether entry_id exists in the collection (value of "id")

Parameters:

Name Type Description Default
entry_id str

The "id" value of the entry.

required
Source code in optimade_gateway/mongo/collection.py
async def exists(self, entry_id: str) -> bool:
    """Assert whether entry_id exists in the collection (value of `"id"`)

    Parameters:
        entry_id: The `"id"` value of the entry.

    """
    return bool(await self.collection.count_documents({"id": entry_id}))

find(self, params=None, criteria=None) async

Perform the query on the underlying MongoDB Collection, handling projection and pagination of the output.

Either provide params or criteria. Not both, but at least one.

Parameters:

Name Type Description Default
params Union[optimade.server.query_params.EntryListingQueryParams, optimade.server.query_params.SingleEntryQueryParams]

URL query parameters, either from a general entry endpoint or a single-entry endpoint.

None
criteria Dict[str, Any]

Already handled/parsed URL query parameters.

None

Returns:

Type Description
Tuple[Union[List[optimade.models.entries.EntryResource], optimade.models.entries.EntryResource], bool, set]

A list of entry resource objects, whether more data is available with pagination, and fields.

Source code in optimade_gateway/mongo/collection.py
async def find(
    self,
    params: Union[EntryListingQueryParams, SingleEntryQueryParams] = None,
    criteria: Dict[str, Any] = None,
) -> Tuple[Union[List[EntryResource], EntryResource], bool, set]:
    """Perform the query on the underlying MongoDB Collection, handling projection
    and pagination of the output.

    Either provide `params` or `criteria`. Not both, but at least one.

    Parameters:
        params: URL query parameters, either from a general entry endpoint or a single-entry endpoint.
        criteria: Already handled/parsed URL query parameters.

    Returns:
        A list of entry resource objects, whether more data is available with pagination, and fields.

    """
    if (params is None and criteria is None) or (
        params is not None and criteria is not None
    ):
        raise ValueError(
            "Exacly one of either `params` and `criteria` must be specified."
        )

    if criteria is None:
        criteria = await self.handle_query_params(params)

    fields = criteria.get("fields", self.all_fields)

    results = []
    async for document in self.collection.find(**self._valid_find_keys(**criteria)):
        if criteria.get("projection", {}).get("_id"):
            document["_id"] = str(document["_id"])
        results.append(self.resource_cls(**self.resource_mapper.map_back(document)))

    if params is None or isinstance(params, EntryListingQueryParams):
        criteria_nolimit = criteria.copy()
        criteria_nolimit.pop("limit", None)
        more_data_available = len(results) < await self.count(**criteria_nolimit)
    else:
        # SingleEntryQueryParams, e.g., /structures/{entry_id}
        more_data_available = False
        if len(results) > 1:
            raise HTTPException(
                status_code=404,
                detail=f"Instead of a single entry, {len(results)} entries were found",
            )
        results = results[0] if results else None

    return results, more_data_available, self.all_fields - fields

get_multiple(self, **criteria) async

Get a list of resources based on criteria

Warning

This is not to be used for creating a REST API response, but is rather a utility function to easily retrieve a list of resources.

Parameters:

Name Type Description Default
criteria Dict[str, Any]

Already handled/parsed URL query parameters.

{}

Returns:

Type Description
List[optimade.models.entries.EntryResource]

A list of resources from the MongoDB (mapped to pydantic models).

Source code in optimade_gateway/mongo/collection.py
async def get_multiple(self, **criteria: Dict[str, Any]) -> List[EntryResource]:
    """Get a list of resources based on criteria

    !!! warning
        This is not to be used for creating a REST API response,
        but is rather a utility function to easily retrieve a list of resources.

    Parameters:
        criteria: Already handled/parsed URL query parameters.

    Returns:
        A list of resources from the MongoDB (mapped to pydantic models).

    """
    criteria = criteria or {}

    results = []
    async for document in self.collection.find(**self._valid_find_keys(**criteria)):
        results.append(self.resource_cls(**self.resource_mapper.map_back(document)))

    return results

get_one(self, **criteria) async

Get one resource based on criteria

Warning

This is not to be used for creating a REST API response, but is rather a utility function to easily retrieve a single resource.

Parameters:

Name Type Description Default
criteria Dict[str, Any]

Already handled/parsed URL query parameters.

{}

Returns:

Type Description
EntryResource

A single resource from the MongoDB (mapped to pydantic models).

Source code in optimade_gateway/mongo/collection.py
async def get_one(self, **criteria: Dict[str, Any]) -> EntryResource:
    """Get one resource based on criteria

    !!! warning
        This is not to be used for creating a REST API response,
        but is rather a utility function to easily retrieve a single resource.

    Parameters:
        criteria: Already handled/parsed URL query parameters.

    Returns:
        A single resource from the MongoDB (mapped to pydantic models).

    """
    criteria = criteria or {}

    return self.resource_cls(
        **self.resource_mapper.map_back(
            await self.collection.find_one(**self._valid_find_keys(**criteria))
        )
    )

handle_query_params(self, params) async

Parse and interpret the backend-agnostic query parameter models into a dictionary that can be used by the specific backend.

Note

Currently this method returns the pymongo interpretation of the parameters, which will need modification for modified for other backends.

Parameters:

Name Type Description Default
params Union[optimade.server.query_params.EntryListingQueryParams, optimade.server.query_params.SingleEntryQueryParams]

The initialized query parameter model from the server.

required

Exceptions:

Type Description
Forbidden

If too large of a page limit is provided.

BadRequest

If an invalid request is made, e.g., with incorrect fields or response format.

Returns:

Type Description
dict

A dictionary representation of the query parameters.

Source code in optimade_gateway/mongo/collection.py
async def handle_query_params(
    self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
) -> dict:
    cursor_kwargs = super().handle_query_params(params)

    if getattr(params, "response_fields", False):
        fields = set(params.response_fields.split(","))
        fields |= self.resource_mapper.get_required_fields()
        cursor_kwargs["fields"] = fields
    else:
        # cursor_kwargs["fields"] is already set to self.all_fields
        pass

    return cursor_kwargs

insert(self, data) async

Add the given entries to the underlying database.

Warning

No validation is performed on the incoming data.

Parameters:

Name Type Description Default
data List[optimade.models.entries.EntryResource]

The entry resource objects to add to the database.

required
Source code in optimade_gateway/mongo/collection.py
async def insert(self, data: List[EntryResource]) -> None:
    """Add the given entries to the underlying database.

    Warning:
        No validation is performed on the incoming data.

    Parameters:
        data: The entry resource objects to add to the database.

    """
    await self.collection.insert_many(await clean_python_types(data))
Back to top