collection¶
AsyncMongoCollection
¶
MongoDB Collection for use with asyncio
The asynchronicity is implemented using motor
and asyncio
.
__init__(self, name, resource_cls, resource_mapper)
special
¶
Initialize the AsyncMongoCollection for the given parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name | str | The name of the collection. | required |
resource_cls | EntryResource | The | required |
resource_mapper | BaseResourceMapper | A resource mapper object that handles aliases and format changes between deserialization and response. | required |
Source code in optimade_gateway/mongo/collection.py
def __init__(
self,
name: str,
resource_cls: EntryResource,
resource_mapper: BaseResourceMapper,
):
"""Initialize the AsyncMongoCollection for the given parameters.
Parameters:
name: The name of the collection.
resource_cls: The `EntryResource` model that is stored by the collection.
resource_mapper: A resource mapper object that handles aliases and format changes between deserialization and response.
"""
from optimade_gateway.mongo.database import MONGO_DB
super().__init__(
resource_cls=resource_cls,
resource_mapper=resource_mapper,
transformer=MongoTransformer(mapper=resource_mapper),
)
self.parser = LarkParser(version=(1, 0, 0), variant="default")
self.collection: MongoCollection = MONGO_DB[name]
# Check aliases do not clash with mongo operators
self._check_aliases(self.resource_mapper.all_aliases())
self._check_aliases(self.resource_mapper.all_length_aliases())
__len__(self)
special
¶
Length of collection
Source code in optimade_gateway/mongo/collection.py
def __len__(self) -> int:
"""Length of collection"""
import warnings
warnings.warn(
"Cannot calculate length of collection using `len()`. Use `count()` instead.",
OptimadeGatewayWarning,
)
return 0
__repr__(self)
special
¶
Representation of instance
Source code in optimade_gateway/mongo/collection.py
def __repr__(self) -> str:
"""Representation of instance"""
return f"{self.__class__.__name__}(name={self.collection.name!r}, resource_cls={self.resource_cls!r}, resource_mapper={self.resource_mapper!r}"
__str__(self)
special
¶
Standard printing result for an instance
Source code in optimade_gateway/mongo/collection.py
def __str__(self) -> str:
"""Standard printing result for an instance"""
return f"<{self.__class__.__name__}: resource={self.resource_cls.__name__} endpoint(mapper)={self.resource_mapper.ENDPOINT} DB_collection={self.collection.name}>"
count(self, params=None, **kwargs)
async
¶
Count documents in Collection
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params | Union[optimade.server.query_params.EntryListingQueryParams, optimade.server.query_params.SingleEntryQueryParams] | URL query parameters, either from a general entry endpoint or a single-entry endpoint. | None |
kwargs | dict | Query parameters as keyword arguments. Valid keys will be passed to the | {} |
Returns:
Type | Description |
---|---|
int | int: The number of entries matching the query specified by the keyword arguments. |
Source code in optimade_gateway/mongo/collection.py
async def count(
self,
params: Union[EntryListingQueryParams, SingleEntryQueryParams] = None,
**kwargs,
) -> int:
"""Count documents in Collection
Parameters:
params: URL query parameters, either from a general entry endpoint or a single-entry endpoint.
kwargs (dict): Query parameters as keyword arguments. Valid keys will be passed
to the
[`AsyncIOMotorCollection.count_documents`](https://motor.readthedocs.io/en/stable/api-asyncio/asyncio_motor_collection.html#motor.motor_asyncio.AsyncIOMotorCollection.count_documents)
method.
Returns:
int: The number of entries matching the query specified by the keyword arguments.
"""
if params is not None and kwargs:
raise ValueError(
"When 'params' is supplied, no other parameters can be supplied."
)
if params is not None:
kwargs = await self.handle_query_params(params)
valid_method_keys = (
"filter",
"skip",
"limit",
"hint",
"maxTimeMS",
"collation",
"session",
)
criteria = {key: kwargs[key] for key in valid_method_keys if key in kwargs}
if criteria.get("filter") is None:
criteria["filter"] = {}
return await self.collection.count_documents(**criteria)
create_one(self, resource)
async
¶
Create a new document in the MongoDB collection based on query parameters.
Update the newly created document with an "id"
field. The value will be the string representation of the "_id"
field. This will only be done if "id"
is not already present in resource
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource | EntryResourceCreate | The resource to be created. | required |
Returns:
Type | Description |
---|---|
EntryResource | The newly created document as a pydantic model entry resource. |
Source code in optimade_gateway/mongo/collection.py
async def create_one(self, resource: EntryResourceCreate) -> EntryResource:
"""Create a new document in the MongoDB collection based on query parameters.
Update the newly created document with an `"id"` field.
The value will be the string representation of the `"_id"` field.
This will only be done if `"id"` is not already present in `resource`.
Parameters:
resource: The resource to be created.
Returns:
The newly created document as a pydantic model entry resource.
"""
resource.last_modified = datetime.utcnow()
result = await self.collection.insert_one(
await clean_python_types(resource.dict(exclude_unset=True))
)
LOGGER.debug(
"Inserted resource %r in DB collection %s with ID %s",
resource,
self.collection.name,
result.inserted_id,
)
if not resource.id:
LOGGER.debug("Updating resource with an `id` field equal to str(id_).")
await self.collection.update_one(
{"_id": result.inserted_id}, {"$set": {"id": str(result.inserted_id)}}
)
return self.resource_cls(
**self.resource_mapper.map_back(
await self.collection.find_one({"_id": result.inserted_id})
)
)
exists(self, entry_id)
async
¶
Assert whether entry_id exists in the collection (value of "id"
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entry_id | str | The | required |
Source code in optimade_gateway/mongo/collection.py
async def exists(self, entry_id: str) -> bool:
"""Assert whether entry_id exists in the collection (value of `"id"`)
Parameters:
entry_id: The `"id"` value of the entry.
"""
return bool(await self.collection.count_documents({"id": entry_id}))
find(self, params=None, criteria=None)
async
¶
Perform the query on the underlying MongoDB Collection, handling projection and pagination of the output.
Either provide params
or criteria
. Not both, but at least one.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params | Union[optimade.server.query_params.EntryListingQueryParams, optimade.server.query_params.SingleEntryQueryParams] | URL query parameters, either from a general entry endpoint or a single-entry endpoint. | None |
criteria | Dict[str, Any] | Already handled/parsed URL query parameters. | None |
Returns:
Type | Description |
---|---|
Tuple[Union[List[optimade.models.entries.EntryResource], optimade.models.entries.EntryResource], bool, set] | A list of entry resource objects, whether more data is available with pagination, and fields. |
Source code in optimade_gateway/mongo/collection.py
async def find(
self,
params: Union[EntryListingQueryParams, SingleEntryQueryParams] = None,
criteria: Dict[str, Any] = None,
) -> Tuple[Union[List[EntryResource], EntryResource], bool, set]:
"""Perform the query on the underlying MongoDB Collection, handling projection
and pagination of the output.
Either provide `params` or `criteria`. Not both, but at least one.
Parameters:
params: URL query parameters, either from a general entry endpoint or a single-entry endpoint.
criteria: Already handled/parsed URL query parameters.
Returns:
A list of entry resource objects, whether more data is available with pagination, and fields.
"""
if (params is None and criteria is None) or (
params is not None and criteria is not None
):
raise ValueError(
"Exacly one of either `params` and `criteria` must be specified."
)
if criteria is None:
criteria = await self.handle_query_params(params)
fields = criteria.get("fields", self.all_fields)
results = []
async for document in self.collection.find(**self._valid_find_keys(**criteria)):
if criteria.get("projection", {}).get("_id"):
document["_id"] = str(document["_id"])
results.append(self.resource_cls(**self.resource_mapper.map_back(document)))
if params is None or isinstance(params, EntryListingQueryParams):
criteria_nolimit = criteria.copy()
criteria_nolimit.pop("limit", None)
more_data_available = len(results) < await self.count(**criteria_nolimit)
else:
# SingleEntryQueryParams, e.g., /structures/{entry_id}
more_data_available = False
if len(results) > 1:
raise HTTPException(
status_code=404,
detail=f"Instead of a single entry, {len(results)} entries were found",
)
results = results[0] if results else None
return results, more_data_available, self.all_fields - fields
get_multiple(self, **criteria)
async
¶
Get a list of resources based on criteria
Warning
This is not to be used for creating a REST API response, but is rather a utility function to easily retrieve a list of resources.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria | Dict[str, Any] | Already handled/parsed URL query parameters. | {} |
Returns:
Type | Description |
---|---|
List[optimade.models.entries.EntryResource] | A list of resources from the MongoDB (mapped to pydantic models). |
Source code in optimade_gateway/mongo/collection.py
async def get_multiple(self, **criteria: Dict[str, Any]) -> List[EntryResource]:
"""Get a list of resources based on criteria
!!! warning
This is not to be used for creating a REST API response,
but is rather a utility function to easily retrieve a list of resources.
Parameters:
criteria: Already handled/parsed URL query parameters.
Returns:
A list of resources from the MongoDB (mapped to pydantic models).
"""
criteria = criteria or {}
results = []
async for document in self.collection.find(**self._valid_find_keys(**criteria)):
results.append(self.resource_cls(**self.resource_mapper.map_back(document)))
return results
get_one(self, **criteria)
async
¶
Get one resource based on criteria
Warning
This is not to be used for creating a REST API response, but is rather a utility function to easily retrieve a single resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria | Dict[str, Any] | Already handled/parsed URL query parameters. | {} |
Returns:
Type | Description |
---|---|
EntryResource | A single resource from the MongoDB (mapped to pydantic models). |
Source code in optimade_gateway/mongo/collection.py
async def get_one(self, **criteria: Dict[str, Any]) -> EntryResource:
"""Get one resource based on criteria
!!! warning
This is not to be used for creating a REST API response,
but is rather a utility function to easily retrieve a single resource.
Parameters:
criteria: Already handled/parsed URL query parameters.
Returns:
A single resource from the MongoDB (mapped to pydantic models).
"""
criteria = criteria or {}
return self.resource_cls(
**self.resource_mapper.map_back(
await self.collection.find_one(**self._valid_find_keys(**criteria))
)
)
handle_query_params(self, params)
async
¶
Parse and interpret the backend-agnostic query parameter models into a dictionary that can be used by the specific backend.
Note
Currently this method returns the pymongo interpretation of the parameters, which will need modification for modified for other backends.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params | Union[optimade.server.query_params.EntryListingQueryParams, optimade.server.query_params.SingleEntryQueryParams] | The initialized query parameter model from the server. | required |
Exceptions:
Type | Description |
---|---|
Forbidden | If too large of a page limit is provided. |
BadRequest | If an invalid request is made, e.g., with incorrect fields or response format. |
Returns:
Type | Description |
---|---|
dict | A dictionary representation of the query parameters. |
Source code in optimade_gateway/mongo/collection.py
async def handle_query_params(
self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
) -> dict:
cursor_kwargs = super().handle_query_params(params)
if getattr(params, "response_fields", False):
fields = set(params.response_fields.split(","))
fields |= self.resource_mapper.get_required_fields()
cursor_kwargs["fields"] = fields
else:
# cursor_kwargs["fields"] is already set to self.all_fields
pass
return cursor_kwargs
insert(self, data)
async
¶
Add the given entries to the underlying database.
Warning
No validation is performed on the incoming data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data | List[optimade.models.entries.EntryResource] | The entry resource objects to add to the database. | required |
Source code in optimade_gateway/mongo/collection.py
async def insert(self, data: List[EntryResource]) -> None:
"""Add the given entries to the underlying database.
Warning:
No validation is performed on the incoming data.
Parameters:
data: The entry resource objects to add to the database.
"""
await self.collection.insert_many(await clean_python_types(data))