collection¶
MongoDB collection for entry-endpoint resources.
The AsyncMongoCollection
represents an asynchronous version of the equivalent MongoDB collection in optimade
:
MongoCollection
.
AsyncMongoCollection (EntryCollection)
¶
MongoDB Collection for use with asyncio
The asynchronicity is implemented using motor
and
asyncio
.
__init__(self, name, resource_cls, resource_mapper)
special
¶
Initialize the AsyncMongoCollection for the given parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the collection. |
required |
resource_cls |
EntryResource |
The |
required |
resource_mapper |
BaseResourceMapper |
A resource mapper object that handles aliases and format changes between deserialization and response. |
required |
Source code in optimade_gateway/mongo/collection.py
def __init__(
self,
name: str,
resource_cls: "EntryResource",
resource_mapper: "BaseResourceMapper",
):
"""Initialize the AsyncMongoCollection for the given parameters.
Parameters:
name: The name of the collection.
resource_cls: The `EntryResource` model that is stored by the collection.
resource_mapper: A resource mapper object that handles aliases and format
changes between deserialization and response.
"""
from optimade_gateway.mongo.database import ( # pylint: disable=import-outside-toplevel
MONGO_DB,
)
super().__init__(
resource_cls=resource_cls,
resource_mapper=resource_mapper,
transformer=MongoTransformer(mapper=resource_mapper),
)
self.parser = LarkParser(version=(1, 0, 0), variant="default")
self.collection: MongoCollection = MONGO_DB[name]
# Check aliases do not clash with mongo operators
self._check_aliases(self.resource_mapper.all_aliases())
self._check_aliases(self.resource_mapper.all_length_aliases())
acount(self, params=None, **kwargs)
async
¶
Count documents in Collection.
This is the asynchronous version of the parent class method named count()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]] |
URL query parameters, either from a general entry endpoint or a single-entry endpoint. |
None |
kwargs |
dict |
Query parameters as keyword arguments. Valid keys will be passed
to the
|
{} |
Returns:
Type | Description |
---|---|
int |
The number of entries matching the query specified by the keyword arguments. |
Source code in optimade_gateway/mongo/collection.py
async def acount(
self,
params: "Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]]" = None,
**kwargs,
) -> int:
"""Count documents in Collection.
This is the asynchronous version of the parent class method named `count()`.
Parameters:
params: URL query parameters, either from a general entry endpoint or a
single-entry endpoint.
kwargs (dict): Query parameters as keyword arguments. Valid keys will be passed
to the
[`AsyncIOMotorCollection.count_documents`](https://motor.readthedocs.io/en/stable/api-asyncio/asyncio_motor_collection.html#motor.motor_asyncio.AsyncIOMotorCollection.count_documents)
method.
Returns:
int: The number of entries matching the query specified by the keyword
arguments.
"""
if params is not None and kwargs:
raise ValueError(
"When 'params' is supplied, no other parameters can be supplied."
)
if params is not None:
kwargs = await self.ahandle_query_params(params)
valid_method_keys = (
"filter",
"skip",
"limit",
"hint",
"maxTimeMS",
"collation",
"session",
)
criteria = {key: kwargs[key] for key in valid_method_keys if key in kwargs}
if criteria.get("filter") is None:
criteria["filter"] = {}
return await self.collection.count_documents(**criteria)
afind(self, params=None, criteria=None)
async
¶
Perform the query on the underlying MongoDB Collection, handling projection and pagination of the output.
This is the asynchronous version of the parent class method named count()
.
Either provide params
or criteria
. Not both, but at least one.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]] |
URL query parameters, either from a general entry endpoint or a single-entry endpoint. |
None |
criteria |
Optional[Dict[str, Any]] |
Already handled/parsed URL query parameters. |
None |
Returns:
Type | Description |
---|---|
Tuple[Union[List[EntryResource], EntryResource, None], int, bool, Set[str], Set[str]] |
A list of entry resource objects, how much data was returned for the query, whether more data is available with pagination, and fields (excluded, included). |
Source code in optimade_gateway/mongo/collection.py
async def afind(
self,
params: "Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]]" = None,
criteria: "Optional[Dict[str, Any]]" = None,
) -> "Tuple[Union[List[EntryResource], EntryResource, None], int, bool, Set[str], Set[str]]":
"""Perform the query on the underlying MongoDB Collection, handling projection
and pagination of the output.
This is the asynchronous version of the parent class method named `count()`.
Either provide `params` or `criteria`. Not both, but at least one.
Parameters:
params: URL query parameters, either from a general entry endpoint or a
single-entry endpoint.
criteria: Already handled/parsed URL query parameters.
Returns:
A list of entry resource objects, how much data was returned for the query,
whether more data is available with pagination, and fields (excluded,
included).
"""
if (params is None and criteria is None) or (
params is not None and criteria is not None
):
raise ValueError(
"Exacly one of either `params` and `criteria` must be specified."
)
# Set single_entry to False, this is done since if criteria is defined,
# this is an unknown factor - better to then get a list of results.
single_entry = False
if criteria is None:
criteria = await self.ahandle_query_params(params)
else:
single_entry = isinstance(params, SingleEntryQueryParams)
response_fields = criteria.pop("fields", self.all_fields)
results, data_returned, more_data_available = await self._arun_db_query(
criteria=criteria,
single_entry=single_entry,
)
if single_entry:
results = results[0] if results else None # type: ignore[assignment]
if data_returned > 1:
raise NotFound(
detail=(
f"Instead of a single entry, {data_returned} entries were found"
),
)
include_fields = (
response_fields - self.resource_mapper.TOP_LEVEL_NON_ATTRIBUTES_FIELDS
)
bad_optimade_fields = set()
bad_provider_fields = set()
for field in include_fields:
if field not in self.resource_mapper.ALL_ATTRIBUTES:
if field.startswith("_"):
if any(
field.startswith(f"_{prefix}_")
for prefix in self.resource_mapper.SUPPORTED_PREFIXES
):
bad_provider_fields.add(field)
else:
bad_optimade_fields.add(field)
if bad_provider_fields:
warn(
UnknownProviderProperty(
detail=(
"Unrecognised field(s) for this provider requested in "
f"`response_fields`: {bad_provider_fields}."
)
)
)
if bad_optimade_fields:
raise BadRequest(
detail=(
"Unrecognised OPTIMADE field(s) in requested `response_fields`: "
f"{bad_optimade_fields}."
)
)
if results:
results = await self.resource_mapper.adeserialize(results)
return ( # type: ignore[return-value]
results,
data_returned,
more_data_available,
self.all_fields - response_fields,
include_fields,
)
ahandle_query_params(self, params)
async
¶
Parse and interpret the backend-agnostic query parameter models into a dictionary that can be used by the specific backend.
This is the asynchronous version of the parent class method named
handle_query_params()
.
Note
Currently this method returns the pymongo interpretation of the parameters, which will need modification for modified for other backends.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
Union[EntryListingQueryParams, SingleEntryQueryParams] |
The initialized query parameter model from the server. |
required |
Exceptions:
Type | Description |
---|---|
Forbidden |
If too large of a page limit is provided. |
BadRequest |
If an invalid request is made, e.g., with incorrect fields or response format. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dictionary representation of the query parameters. |
Source code in optimade_gateway/mongo/collection.py
async def ahandle_query_params(
self, params: "Union[EntryListingQueryParams, SingleEntryQueryParams]"
) -> "Dict[str, Any]":
"""Parse and interpret the backend-agnostic query parameter models into a
dictionary that can be used by the specific backend.
This is the asynchronous version of the parent class method named
`handle_query_params()`.
Note:
Currently this method returns the pymongo interpretation of the parameters,
which will need modification for modified for other backends.
Parameters:
params: The initialized query parameter model from the server.
Raises:
Forbidden: If too large of a page limit is provided.
BadRequest: If an invalid request is made, e.g., with incorrect fields or
response format.
Returns:
A dictionary representation of the query parameters.
"""
return super().handle_query_params(params)
ainsert(self, data)
async
¶
Add the given entries to the underlying database.
This is the asynchronous version of the parent class method named insert()
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
List[EntryResource] |
The entry resource objects to add to the database. |
required |
Source code in optimade_gateway/mongo/collection.py
async def ainsert(self, data: "List[EntryResource]") -> None:
"""Add the given entries to the underlying database.
This is the asynchronous version of the parent class method named `insert()`.
Arguments:
data: The entry resource objects to add to the database.
"""
await self.collection.insert_many(await clean_python_types(data))
count(self, **kwargs)
¶
Returns the number of entries matching the query specified by the keyword arguments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs |
dict |
Query parameters as keyword arguments. |
{} |
Source code in optimade_gateway/mongo/collection.py
def count(self, **kwargs) -> int:
raise NotImplementedError(
"This method cannot be used with this class and is a remnant from the parent "
"class. Use instead the asynchronous method `acount(params: "
"Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]], "
"**kwargs)`."
)
create_one(self, resource)
async
¶
Create a new document in the MongoDB collection based on query parameters.
Update the newly created document with an "id"
field.
The value will be the string representation of the "_id"
field.
This will only be done if "id"
is not already present in resource
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
resource |
EntryResourceCreate |
The resource to be created. |
required |
Returns:
Type | Description |
---|---|
EntryResource |
The newly created document as a pydantic model entry resource. |
Source code in optimade_gateway/mongo/collection.py
async def create_one(self, resource: "EntryResourceCreate") -> "EntryResource":
"""Create a new document in the MongoDB collection based on query parameters.
Update the newly created document with an `"id"` field.
The value will be the string representation of the `"_id"` field.
This will only be done if `"id"` is not already present in `resource`.
Parameters:
resource: The resource to be created.
Returns:
The newly created document as a pydantic model entry resource.
"""
resource.last_modified = datetime.utcnow()
result = await self.collection.insert_one(
await clean_python_types(resource.dict(exclude_unset=True))
)
LOGGER.debug(
"Inserted resource %r in DB collection %s with ID %s",
resource,
self.collection.name,
result.inserted_id,
)
if not resource.id:
LOGGER.debug("Updating resource with an `id` field equal to str(id_).")
await self.collection.update_one(
{"_id": result.inserted_id}, {"$set": {"id": str(result.inserted_id)}}
)
return self.resource_cls(
**self.resource_mapper.map_back(
await self.collection.find_one({"_id": result.inserted_id})
)
)
exists(self, entry_id)
async
¶
Assert whether entry_id exists in the collection (value of "id"
)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entry_id |
str |
The |
required |
Source code in optimade_gateway/mongo/collection.py
async def exists(self, entry_id: str) -> bool:
"""Assert whether entry_id exists in the collection (value of `"id"`)
Parameters:
entry_id: The `"id"` value of the entry.
"""
return bool(await self.collection.count_documents({"id": entry_id}))
find(self, params)
¶
Fetches results and indicates if more data is available.
Also gives the total number of data available in the absence of page_limit
.
See
EntryListingQueryParams
for more information.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
Union[EntryListingQueryParams, SingleEntryQueryParams] |
Entry listing URL query params. |
required |
Returns:
Type | Description |
---|---|
A tuple of various relevant values |
( |
Source code in optimade_gateway/mongo/collection.py
def find(
self, params: "Union[EntryListingQueryParams, SingleEntryQueryParams]"
) -> "Tuple[Union[List[EntryResource], EntryResource, None], int, bool, Set[str], Set[str]]":
"""
Fetches results and indicates if more data is available.
Also gives the total number of data available in the absence of `page_limit`.
See
[`EntryListingQueryParams`](https://www.optimade.org/optimade-python-tools/api_reference/server/query_params/#optimade.server.query_params.EntryListingQueryParams)
for more information.
Parameters:
params: Entry listing URL query params.
Returns:
A tuple of various relevant values:
(`results`, `data_returned`, `more_data_available`, `exclude_fields`,
`include_fields`).
"""
raise NotImplementedError(
"This method cannot be used with this class and is a remnant from the parent "
"class. Use instead the asynchronous method `afind(params: "
"Optional[Union[EntryListingQueryParams, SingleEntryQueryParams]], criteria: "
"Optional[Dict[str, Any]])`."
)
get_multiple(self, **criteria)
async
¶
Get a list of resources based on criteria
Warning
This is not to be used for creating a REST API response, but is rather a utility function to easily retrieve a list of resources.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Dict[str, Any] |
Already handled/parsed URL query parameters. |
{} |
Returns:
Type | Description |
---|---|
List[EntryResource] |
A list of resources from the MongoDB (mapped to pydantic models). |
Source code in optimade_gateway/mongo/collection.py
async def get_multiple(self, **criteria: "Dict[str, Any]") -> "List[EntryResource]":
"""Get a list of resources based on criteria
Warning:
This is not to be used for creating a REST API response,
but is rather a utility function to easily retrieve a list of resources.
Parameters:
criteria: Already handled/parsed URL query parameters.
Returns:
A list of resources from the MongoDB (mapped to pydantic models).
"""
criteria = criteria or {}
results = []
async for document in self.collection.find(**self._valid_find_keys(**criteria)):
results.append(self.resource_cls(**self.resource_mapper.map_back(document)))
return results
get_one(self, **criteria)
async
¶
Get one resource based on criteria
Warning
This is not to be used for creating a REST API response, but is rather a utility function to easily retrieve a single resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
criteria |
Dict[str, Any] |
Already handled/parsed URL query parameters. |
{} |
Returns:
Type | Description |
---|---|
EntryResource |
A single resource from the MongoDB (mapped to pydantic models). |
Source code in optimade_gateway/mongo/collection.py
async def get_one(self, **criteria: "Dict[str, Any]") -> "EntryResource":
"""Get one resource based on criteria
Warning:
This is not to be used for creating a REST API response,
but is rather a utility function to easily retrieve a single resource.
Parameters:
criteria: Already handled/parsed URL query parameters.
Returns:
A single resource from the MongoDB (mapped to pydantic models).
"""
criteria = criteria or {}
return self.resource_cls(
**self.resource_mapper.map_back(
await self.collection.find_one(**self._valid_find_keys(**criteria))
)
)
handle_query_params(self, params)
¶
Parse and interpret the backend-agnostic query parameter models into a dictionary that can be used by the specific backend.
Note
Currently this method returns the pymongo interpretation of the parameters, which will need modification for modified for other backends.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
Union[EntryListingQueryParams, SingleEntryQueryParams] |
The initialized query parameter model from the server. |
required |
Exceptions:
Type | Description |
---|---|
Forbidden |
If too large of a page limit is provided. |
BadRequest |
If an invalid request is made, e.g., with incorrect fields or response format. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dictionary representation of the query parameters. |
Source code in optimade_gateway/mongo/collection.py
def handle_query_params(
self, params: "Union[EntryListingQueryParams, SingleEntryQueryParams]"
) -> "Dict[str, Any]":
"""Parse and interpret the backend-agnostic query parameter models into a
dictionary that can be used by the specific backend.
Note:
Currently this method returns the pymongo interpretation of the parameters,
which will need modification for modified for other backends.
Parameters:
params: The initialized query parameter model from the server.
Raises:
Forbidden: If too large of a page limit is provided.
BadRequest: If an invalid request is made, e.g., with incorrect fields
or response format.
Returns:
A dictionary representation of the query parameters.
"""
raise NotImplementedError(
"This method cannot be used with this class and is a remnant from the parent "
"class. Use instead the asynchronous method `ahandle_query_params(params: "
"Union[EntryListingQueryParams, SingleEntryQueryParams])`."
)
insert(self, data)
¶
Add the given entries to the underlying database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
List[EntryResource] |
The entry resource objects to add to the database. |
required |
Source code in optimade_gateway/mongo/collection.py
def insert(self, data: "List[EntryResource]") -> None:
raise NotImplementedError(
"This method cannot be used with this class and is a remnant from the parent "
"class. Use instead the asynchronous method `ainsert(data: "
"List[EntryResource])`."
)