Skip to content

elasticsearch

CLIENT = Elasticsearch(hosts=CONFIG.elastic_hosts) module-attribute

CONFIG: ServerConfig = ServerConfig() module-attribute

This singleton loads the config from a hierarchy of sources (see customise_sources) and makes it importable in the server code.

LOGGER = logging.getLogger('optimade') module-attribute

BaseResourceMapper

Generic Resource Mapper that defines and performs the mapping between objects in the database and the resource objects defined by the specification.

Attributes:

Name Type Description
ALIASES tuple[tuple[str, str], ...]

a tuple of aliases between OPTIMADE field names and the field names in the database , e.g. (("elements", "custom_elements_field")).

LENGTH_ALIASES tuple[tuple[str, str], ...]

a tuple of aliases between a field name and another field that defines its length, to be used when querying, e.g. (("elements", "nelements")). e.g. (("elements", "custom_elements_field")).

ENTRY_RESOURCE_CLASS type[EntryResource]

The entry type that this mapper corresponds to.

PROVIDER_FIELDS tuple[str, ...]

a tuple of extra field names that this mapper should support when querying with the database prefix.

TOP_LEVEL_NON_ATTRIBUTES_FIELDS set[str]

the set of top-level field names common to all endpoints.

SUPPORTED_PREFIXES set[str]

The set of prefixes registered by this mapper.

ALL_ATTRIBUTES set[str]

The set of attributes defined across the entry resource class and the server configuration.

ENTRY_RESOURCE_ATTRIBUTES dict[str, Any]

A dictionary of attributes and their definitions defined by the schema of the entry resource class.

ENDPOINT str

The expected endpoint name for this resource, as defined by the type in the schema of the entry resource class.

Source code in optimade/server/mappers/entries.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
class BaseResourceMapper:
    """Generic Resource Mapper that defines and performs the mapping
    between objects in the database and the resource objects defined by
    the specification.

    Attributes:
        ALIASES: a tuple of aliases between
            OPTIMADE field names and the field names in the database ,
            e.g. `(("elements", "custom_elements_field"))`.
        LENGTH_ALIASES: a tuple of aliases between
            a field name and another field that defines its length, to be used
            when querying, e.g. `(("elements", "nelements"))`.
            e.g. `(("elements", "custom_elements_field"))`.
        ENTRY_RESOURCE_CLASS: The entry type that this mapper corresponds to.
        PROVIDER_FIELDS: a tuple of extra field names that this
            mapper should support when querying with the database prefix.
        TOP_LEVEL_NON_ATTRIBUTES_FIELDS: the set of top-level
            field names common to all endpoints.
        SUPPORTED_PREFIXES: The set of prefixes registered by this mapper.
        ALL_ATTRIBUTES: The set of attributes defined across the entry
            resource class and the server configuration.
        ENTRY_RESOURCE_ATTRIBUTES: A dictionary of attributes and their definitions
            defined by the schema of the entry resource class.
        ENDPOINT: The expected endpoint name for this resource, as defined by
            the `type` in the schema of the entry resource class.

    """

    try:
        from optimade.server.data import providers as PROVIDERS  # type: ignore
    except (ImportError, ModuleNotFoundError):
        PROVIDERS = {}

    KNOWN_PROVIDER_PREFIXES: set[str] = {
        prov["id"] for prov in PROVIDERS.get("data", [])
    }
    ALIASES: tuple[tuple[str, str], ...] = ()
    LENGTH_ALIASES: tuple[tuple[str, str], ...] = ()
    PROVIDER_FIELDS: tuple[str, ...] = ()
    ENTRY_RESOURCE_CLASS: type[EntryResource] = EntryResource
    RELATIONSHIP_ENTRY_TYPES: set[str] = {"references", "structures"}
    TOP_LEVEL_NON_ATTRIBUTES_FIELDS: set[str] = {"id", "type", "relationships", "links"}

    @classmethod
    @lru_cache(maxsize=NUM_ENTRY_TYPES)
    def all_aliases(cls) -> Iterable[tuple[str, str]]:
        """Returns all of the associated aliases for this entry type,
        including those defined by the server config. The first member
        of each tuple is the OPTIMADE-compliant field name, the second
        is the backend-specific field name.

        Returns:
            A tuple of alias tuples.

        """
        from optimade.server.config import CONFIG

        return (
            tuple(
                (f"_{CONFIG.provider.prefix}_{field}", field)
                if not field.startswith("_")
                else (field, field)
                for field in CONFIG.provider_fields.get(cls.ENDPOINT, [])
                if isinstance(field, str)
            )
            + tuple(
                (f"_{CONFIG.provider.prefix}_{field['name']}", field["name"])
                if not field["name"].startswith("_")
                else (field["name"], field["name"])
                for field in CONFIG.provider_fields.get(cls.ENDPOINT, [])
                if isinstance(field, dict)
            )
            + tuple(
                (f"_{CONFIG.provider.prefix}_{field}", field)
                if not field.startswith("_")
                else (field, field)
                for field in cls.PROVIDER_FIELDS
            )
            + tuple(CONFIG.aliases.get(cls.ENDPOINT, {}).items())
            + cls.ALIASES
        )

    @classproperty
    @lru_cache(maxsize=1)
    def SUPPORTED_PREFIXES(cls) -> set[str]:
        """A set of prefixes handled by this entry type.

        !!! note
            This implementation only includes the provider prefix,
            but in the future this property may be extended to include other
            namespaces (for serving fields from, e.g., other providers or
            domain-specific terms).

        """
        from optimade.server.config import CONFIG

        return {CONFIG.provider.prefix}

    @classproperty
    def ALL_ATTRIBUTES(cls) -> set[str]:
        """Returns all attributes served by this entry."""
        from optimade.server.config import CONFIG

        return (
            set(cls.ENTRY_RESOURCE_ATTRIBUTES)
            .union(
                cls.get_optimade_field(field)
                for field in CONFIG.provider_fields.get(cls.ENDPOINT, ())
                if isinstance(field, str)
            )
            .union(
                cls.get_optimade_field(field["name"])
                for field in CONFIG.provider_fields.get(cls.ENDPOINT, ())
                if isinstance(field, dict)
            )
            .union({cls.get_optimade_field(field) for field in cls.PROVIDER_FIELDS})
        )

    @classproperty
    def ENTRY_RESOURCE_ATTRIBUTES(cls) -> dict[str, Any]:
        """Returns the dictionary of attributes defined by the underlying entry resource class."""
        from optimade.server.schemas import retrieve_queryable_properties

        return retrieve_queryable_properties(cls.ENTRY_RESOURCE_CLASS)

    @classproperty
    @lru_cache(maxsize=NUM_ENTRY_TYPES)
    def ENDPOINT(cls) -> str:
        """Returns the expected endpoint for this mapper, corresponding
        to the `type` property of the resource class.

        """
        endpoint = cls.ENTRY_RESOURCE_CLASS.model_fields["type"].default
        if not endpoint and not isinstance(endpoint, str):
            raise ValueError("Type not set for this entry type!")
        return endpoint

    @classmethod
    @lru_cache(maxsize=NUM_ENTRY_TYPES)
    def all_length_aliases(cls) -> tuple[tuple[str, str], ...]:
        """Returns all of the associated length aliases for this class,
        including those defined by the server config.

        Returns:
            A tuple of length alias tuples.

        """
        from optimade.server.config import CONFIG

        return cls.LENGTH_ALIASES + tuple(
            CONFIG.length_aliases.get(cls.ENDPOINT, {}).items()
        )

    @classmethod
    @lru_cache(maxsize=128)
    def length_alias_for(cls, field: str) -> Optional[str]:
        """Returns the length alias for the particular field,
        or `None` if no such alias is found.

        Parameters:
            field: OPTIMADE field name.

        Returns:
            Aliased field as found in [`all_length_aliases()`][optimade.server.mappers.entries.BaseResourceMapper.all_length_aliases].

        """
        return dict(cls.all_length_aliases()).get(field, None)

    @classmethod
    @lru_cache(maxsize=128)
    def get_backend_field(cls, optimade_field: str) -> str:
        """Return the field name configured for the particular
        underlying database for the passed OPTIMADE field name, that would
        be used in an API filter.

        Aliases are read from
        [`all_aliases()`][optimade.server.mappers.entries.BaseResourceMapper.all_aliases].

        If a dot-separated OPTIMADE field is provided, e.g., `species.mass`, only the first part will be mapped.
        This means for an (OPTIMADE, DB) alias of (`species`, `kinds`), `get_backend_fields("species.mass")`
        will return `kinds.mass`.

        Arguments:
            optimade_field: The OPTIMADE field to attempt to map to the backend-specific field.

        Examples:
            >>> get_backend_field("chemical_formula_anonymous")
            'formula_anon'
            >>> get_backend_field("formula_anon")
            'formula_anon'
            >>> get_backend_field("_exmpl_custom_provider_field")
            'custom_provider_field'

        Returns:
            The mapped field name to be used in the query to the backend.

        """
        split = optimade_field.split(".")
        alias = dict(cls.all_aliases()).get(split[0], None)
        if alias is not None:
            return alias + ("." + ".".join(split[1:]) if len(split) > 1 else "")
        return optimade_field

    @classmethod
    @lru_cache(maxsize=128)
    def alias_for(cls, field: str) -> str:
        """Return aliased field name.

        !!! warning "Deprecated"
            This method is deprecated could be removed without further warning. Please use
            [`get_backend_field()`][optimade.server.mappers.entries.BaseResourceMapper.get_backend_field].

        Parameters:
            field: OPTIMADE field name.

        Returns:
            Aliased field as found in [`all_aliases()`][optimade.server.mappers.entries.BaseResourceMapper.all_aliases].

        """
        warnings.warn(
            "The `.alias_for(...)` method is deprecated, please use `.get_backend_field(...)`.",
            DeprecationWarning,
        )
        return cls.get_backend_field(field)

    @classmethod
    @lru_cache(maxsize=128)
    def get_optimade_field(cls, backend_field: str) -> str:
        """Return the corresponding OPTIMADE field name for the underlying database field,
        ready to be used to construct the OPTIMADE-compliant JSON response.

        Aliases are read from
        [`all_aliases()`][optimade.server.mappers.entries.BaseResourceMapper.all_aliases].

        Arguments:
            backend_field: The backend field to attempt to map to an OPTIMADE field.

        Examples:
            >>> get_optimade_field("chemical_formula_anonymous")
            'chemical_formula_anonymous'
            >>> get_optimade_field("formula_anon")
            'chemical_formula_anonymous'
            >>> get_optimade_field("custom_provider_field")
            '_exmpl_custom_provider_field'

        Returns:
            The mapped field name to be used in an OPTIMADE-compliant response.

        """
        return {alias: real for real, alias in cls.all_aliases()}.get(
            backend_field, backend_field
        )

    @classmethod
    @lru_cache(maxsize=128)
    def alias_of(cls, field: str) -> str:
        """Return de-aliased field name, if it exists,
        otherwise return the input field name.

        !!! warning "Deprecated"
            This method is deprecated could be removed without further warning. Please use
            [`get_optimade_field()`][optimade.server.mappers.entries.BaseResourceMapper.get_optimade_field].

        Parameters:
            field: Field name to be de-aliased.

        Returns:
            De-aliased field name, falling back to returning `field`.

        """
        warnings.warn(
            "The `.alias_of(...)` method is deprecated, please use `.get_optimade_field(...)`.",
            DeprecationWarning,
        )
        return cls.get_optimade_field(field)

    @classmethod
    @lru_cache(maxsize=NUM_ENTRY_TYPES)
    def get_required_fields(cls) -> set:
        """Get REQUIRED response fields.

        Returns:
            REQUIRED response fields.

        """
        return cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS

    @classmethod
    def map_back(cls, doc: dict) -> dict:
        """Map properties from MongoDB to OPTIMADE.

        Starting from a MongoDB document `doc`, map the DB fields to the corresponding OPTIMADE fields.
        Then, the fields are all added to the top-level field "attributes",
        with the exception of other top-level fields, defined in `cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS`.
        All fields not in `cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS` + "attributes" will be removed.
        Finally, the `type` is given the value of the specified `cls.ENDPOINT`.

        Parameters:
            doc: A resource object in MongoDB format.

        Returns:
            A resource object in OPTIMADE format.

        """
        mapping = ((real, alias) for alias, real in cls.all_aliases())
        newdoc = {}
        reals = {real for _, real in cls.all_aliases()}
        for key in doc:
            if key not in reals:
                newdoc[key] = doc[key]
        for real, alias in mapping:
            if real in doc:
                newdoc[alias] = doc[real]

        if "attributes" in newdoc:
            raise Exception("Will overwrite doc field!")
        attributes = newdoc.copy()

        for field in cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS:
            value = attributes.pop(field, None)
            if value is not None:
                newdoc[field] = value
        for field in list(newdoc.keys()):
            if field not in cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS:
                del newdoc[field]

        newdoc["type"] = cls.ENDPOINT
        newdoc["attributes"] = attributes

        return newdoc

    @classmethod
    def deserialize(
        cls, results: Union[dict, Iterable[dict]]
    ) -> Union[list[EntryResource], EntryResource]:
        """Converts the raw database entries for this class into serialized models,
        mapping the data along the way.

        """
        if isinstance(results, dict):
            return cls.ENTRY_RESOURCE_CLASS(**cls.map_back(results))

        return [cls.ENTRY_RESOURCE_CLASS(**cls.map_back(doc)) for doc in results]

ALL_ATTRIBUTES()

Returns all attributes served by this entry.

Source code in optimade/server/mappers/entries.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@classproperty
def ALL_ATTRIBUTES(cls) -> set[str]:
    """Returns all attributes served by this entry."""
    from optimade.server.config import CONFIG

    return (
        set(cls.ENTRY_RESOURCE_ATTRIBUTES)
        .union(
            cls.get_optimade_field(field)
            for field in CONFIG.provider_fields.get(cls.ENDPOINT, ())
            if isinstance(field, str)
        )
        .union(
            cls.get_optimade_field(field["name"])
            for field in CONFIG.provider_fields.get(cls.ENDPOINT, ())
            if isinstance(field, dict)
        )
        .union({cls.get_optimade_field(field) for field in cls.PROVIDER_FIELDS})
    )

ENDPOINT() cached

Returns the expected endpoint for this mapper, corresponding to the type property of the resource class.

Source code in optimade/server/mappers/entries.py
161
162
163
164
165
166
167
168
169
170
171
@classproperty
@lru_cache(maxsize=NUM_ENTRY_TYPES)
def ENDPOINT(cls) -> str:
    """Returns the expected endpoint for this mapper, corresponding
    to the `type` property of the resource class.

    """
    endpoint = cls.ENTRY_RESOURCE_CLASS.model_fields["type"].default
    if not endpoint and not isinstance(endpoint, str):
        raise ValueError("Type not set for this entry type!")
    return endpoint

ENTRY_RESOURCE_ATTRIBUTES()

Returns the dictionary of attributes defined by the underlying entry resource class.

Source code in optimade/server/mappers/entries.py
154
155
156
157
158
159
@classproperty
def ENTRY_RESOURCE_ATTRIBUTES(cls) -> dict[str, Any]:
    """Returns the dictionary of attributes defined by the underlying entry resource class."""
    from optimade.server.schemas import retrieve_queryable_properties

    return retrieve_queryable_properties(cls.ENTRY_RESOURCE_CLASS)

SUPPORTED_PREFIXES() cached

A set of prefixes handled by this entry type.

Note

This implementation only includes the provider prefix, but in the future this property may be extended to include other namespaces (for serving fields from, e.g., other providers or domain-specific terms).

Source code in optimade/server/mappers/entries.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@classproperty
@lru_cache(maxsize=1)
def SUPPORTED_PREFIXES(cls) -> set[str]:
    """A set of prefixes handled by this entry type.

    !!! note
        This implementation only includes the provider prefix,
        but in the future this property may be extended to include other
        namespaces (for serving fields from, e.g., other providers or
        domain-specific terms).

    """
    from optimade.server.config import CONFIG

    return {CONFIG.provider.prefix}

alias_for(field) cached classmethod

Return aliased field name.

Deprecated

This method is deprecated could be removed without further warning. Please use get_backend_field().

Parameters:

Name Type Description Default
field str

OPTIMADE field name.

required

Returns:

Type Description
str

Aliased field as found in all_aliases().

Source code in optimade/server/mappers/entries.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
@classmethod
@lru_cache(maxsize=128)
def alias_for(cls, field: str) -> str:
    """Return aliased field name.

    !!! warning "Deprecated"
        This method is deprecated could be removed without further warning. Please use
        [`get_backend_field()`][optimade.server.mappers.entries.BaseResourceMapper.get_backend_field].

    Parameters:
        field: OPTIMADE field name.

    Returns:
        Aliased field as found in [`all_aliases()`][optimade.server.mappers.entries.BaseResourceMapper.all_aliases].

    """
    warnings.warn(
        "The `.alias_for(...)` method is deprecated, please use `.get_backend_field(...)`.",
        DeprecationWarning,
    )
    return cls.get_backend_field(field)

alias_of(field) cached classmethod

Return de-aliased field name, if it exists, otherwise return the input field name.

Deprecated

This method is deprecated could be removed without further warning. Please use get_optimade_field().

Parameters:

Name Type Description Default
field str

Field name to be de-aliased.

required

Returns:

Type Description
str

De-aliased field name, falling back to returning field.

Source code in optimade/server/mappers/entries.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
@classmethod
@lru_cache(maxsize=128)
def alias_of(cls, field: str) -> str:
    """Return de-aliased field name, if it exists,
    otherwise return the input field name.

    !!! warning "Deprecated"
        This method is deprecated could be removed without further warning. Please use
        [`get_optimade_field()`][optimade.server.mappers.entries.BaseResourceMapper.get_optimade_field].

    Parameters:
        field: Field name to be de-aliased.

    Returns:
        De-aliased field name, falling back to returning `field`.

    """
    warnings.warn(
        "The `.alias_of(...)` method is deprecated, please use `.get_optimade_field(...)`.",
        DeprecationWarning,
    )
    return cls.get_optimade_field(field)

all_aliases() cached classmethod

Returns all of the associated aliases for this entry type, including those defined by the server config. The first member of each tuple is the OPTIMADE-compliant field name, the second is the backend-specific field name.

Returns:

Type Description
Iterable[tuple[str, str]]

A tuple of alias tuples.

Source code in optimade/server/mappers/entries.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@classmethod
@lru_cache(maxsize=NUM_ENTRY_TYPES)
def all_aliases(cls) -> Iterable[tuple[str, str]]:
    """Returns all of the associated aliases for this entry type,
    including those defined by the server config. The first member
    of each tuple is the OPTIMADE-compliant field name, the second
    is the backend-specific field name.

    Returns:
        A tuple of alias tuples.

    """
    from optimade.server.config import CONFIG

    return (
        tuple(
            (f"_{CONFIG.provider.prefix}_{field}", field)
            if not field.startswith("_")
            else (field, field)
            for field in CONFIG.provider_fields.get(cls.ENDPOINT, [])
            if isinstance(field, str)
        )
        + tuple(
            (f"_{CONFIG.provider.prefix}_{field['name']}", field["name"])
            if not field["name"].startswith("_")
            else (field["name"], field["name"])
            for field in CONFIG.provider_fields.get(cls.ENDPOINT, [])
            if isinstance(field, dict)
        )
        + tuple(
            (f"_{CONFIG.provider.prefix}_{field}", field)
            if not field.startswith("_")
            else (field, field)
            for field in cls.PROVIDER_FIELDS
        )
        + tuple(CONFIG.aliases.get(cls.ENDPOINT, {}).items())
        + cls.ALIASES
    )

all_length_aliases() cached classmethod

Returns all of the associated length aliases for this class, including those defined by the server config.

Returns:

Type Description
tuple[tuple[str, str], ...]

A tuple of length alias tuples.

Source code in optimade/server/mappers/entries.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
@classmethod
@lru_cache(maxsize=NUM_ENTRY_TYPES)
def all_length_aliases(cls) -> tuple[tuple[str, str], ...]:
    """Returns all of the associated length aliases for this class,
    including those defined by the server config.

    Returns:
        A tuple of length alias tuples.

    """
    from optimade.server.config import CONFIG

    return cls.LENGTH_ALIASES + tuple(
        CONFIG.length_aliases.get(cls.ENDPOINT, {}).items()
    )

deserialize(results) classmethod

Converts the raw database entries for this class into serialized models, mapping the data along the way.

Source code in optimade/server/mappers/entries.py
367
368
369
370
371
372
373
374
375
376
377
378
@classmethod
def deserialize(
    cls, results: Union[dict, Iterable[dict]]
) -> Union[list[EntryResource], EntryResource]:
    """Converts the raw database entries for this class into serialized models,
    mapping the data along the way.

    """
    if isinstance(results, dict):
        return cls.ENTRY_RESOURCE_CLASS(**cls.map_back(results))

    return [cls.ENTRY_RESOURCE_CLASS(**cls.map_back(doc)) for doc in results]

get_backend_field(optimade_field) cached classmethod

Return the field name configured for the particular underlying database for the passed OPTIMADE field name, that would be used in an API filter.

Aliases are read from all_aliases().

If a dot-separated OPTIMADE field is provided, e.g., species.mass, only the first part will be mapped. This means for an (OPTIMADE, DB) alias of (species, kinds), get_backend_fields("species.mass") will return kinds.mass.

Parameters:

Name Type Description Default
optimade_field str

The OPTIMADE field to attempt to map to the backend-specific field.

required

Examples:

>>> get_backend_field("chemical_formula_anonymous")
'formula_anon'
>>> get_backend_field("formula_anon")
'formula_anon'
>>> get_backend_field("_exmpl_custom_provider_field")
'custom_provider_field'

Returns:

Type Description
str

The mapped field name to be used in the query to the backend.

Source code in optimade/server/mappers/entries.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
@classmethod
@lru_cache(maxsize=128)
def get_backend_field(cls, optimade_field: str) -> str:
    """Return the field name configured for the particular
    underlying database for the passed OPTIMADE field name, that would
    be used in an API filter.

    Aliases are read from
    [`all_aliases()`][optimade.server.mappers.entries.BaseResourceMapper.all_aliases].

    If a dot-separated OPTIMADE field is provided, e.g., `species.mass`, only the first part will be mapped.
    This means for an (OPTIMADE, DB) alias of (`species`, `kinds`), `get_backend_fields("species.mass")`
    will return `kinds.mass`.

    Arguments:
        optimade_field: The OPTIMADE field to attempt to map to the backend-specific field.

    Examples:
        >>> get_backend_field("chemical_formula_anonymous")
        'formula_anon'
        >>> get_backend_field("formula_anon")
        'formula_anon'
        >>> get_backend_field("_exmpl_custom_provider_field")
        'custom_provider_field'

    Returns:
        The mapped field name to be used in the query to the backend.

    """
    split = optimade_field.split(".")
    alias = dict(cls.all_aliases()).get(split[0], None)
    if alias is not None:
        return alias + ("." + ".".join(split[1:]) if len(split) > 1 else "")
    return optimade_field

get_optimade_field(backend_field) cached classmethod

Return the corresponding OPTIMADE field name for the underlying database field, ready to be used to construct the OPTIMADE-compliant JSON response.

Aliases are read from all_aliases().

Parameters:

Name Type Description Default
backend_field str

The backend field to attempt to map to an OPTIMADE field.

required

Examples:

>>> get_optimade_field("chemical_formula_anonymous")
'chemical_formula_anonymous'
>>> get_optimade_field("formula_anon")
'chemical_formula_anonymous'
>>> get_optimade_field("custom_provider_field")
'_exmpl_custom_provider_field'

Returns:

Type Description
str

The mapped field name to be used in an OPTIMADE-compliant response.

Source code in optimade/server/mappers/entries.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
@classmethod
@lru_cache(maxsize=128)
def get_optimade_field(cls, backend_field: str) -> str:
    """Return the corresponding OPTIMADE field name for the underlying database field,
    ready to be used to construct the OPTIMADE-compliant JSON response.

    Aliases are read from
    [`all_aliases()`][optimade.server.mappers.entries.BaseResourceMapper.all_aliases].

    Arguments:
        backend_field: The backend field to attempt to map to an OPTIMADE field.

    Examples:
        >>> get_optimade_field("chemical_formula_anonymous")
        'chemical_formula_anonymous'
        >>> get_optimade_field("formula_anon")
        'chemical_formula_anonymous'
        >>> get_optimade_field("custom_provider_field")
        '_exmpl_custom_provider_field'

    Returns:
        The mapped field name to be used in an OPTIMADE-compliant response.

    """
    return {alias: real for real, alias in cls.all_aliases()}.get(
        backend_field, backend_field
    )

get_required_fields() cached classmethod

Get REQUIRED response fields.

Returns:

Type Description
set

REQUIRED response fields.

Source code in optimade/server/mappers/entries.py
312
313
314
315
316
317
318
319
320
321
@classmethod
@lru_cache(maxsize=NUM_ENTRY_TYPES)
def get_required_fields(cls) -> set:
    """Get REQUIRED response fields.

    Returns:
        REQUIRED response fields.

    """
    return cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS

length_alias_for(field) cached classmethod

Returns the length alias for the particular field, or None if no such alias is found.

Parameters:

Name Type Description Default
field str

OPTIMADE field name.

required

Returns:

Type Description
Optional[str]

Aliased field as found in all_length_aliases().

Source code in optimade/server/mappers/entries.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@classmethod
@lru_cache(maxsize=128)
def length_alias_for(cls, field: str) -> Optional[str]:
    """Returns the length alias for the particular field,
    or `None` if no such alias is found.

    Parameters:
        field: OPTIMADE field name.

    Returns:
        Aliased field as found in [`all_length_aliases()`][optimade.server.mappers.entries.BaseResourceMapper.all_length_aliases].

    """
    return dict(cls.all_length_aliases()).get(field, None)

map_back(doc) classmethod

Map properties from MongoDB to OPTIMADE.

Starting from a MongoDB document doc, map the DB fields to the corresponding OPTIMADE fields. Then, the fields are all added to the top-level field "attributes", with the exception of other top-level fields, defined in cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS. All fields not in cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS + "attributes" will be removed. Finally, the type is given the value of the specified cls.ENDPOINT.

Parameters:

Name Type Description Default
doc dict

A resource object in MongoDB format.

required

Returns:

Type Description
dict

A resource object in OPTIMADE format.

Source code in optimade/server/mappers/entries.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
@classmethod
def map_back(cls, doc: dict) -> dict:
    """Map properties from MongoDB to OPTIMADE.

    Starting from a MongoDB document `doc`, map the DB fields to the corresponding OPTIMADE fields.
    Then, the fields are all added to the top-level field "attributes",
    with the exception of other top-level fields, defined in `cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS`.
    All fields not in `cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS` + "attributes" will be removed.
    Finally, the `type` is given the value of the specified `cls.ENDPOINT`.

    Parameters:
        doc: A resource object in MongoDB format.

    Returns:
        A resource object in OPTIMADE format.

    """
    mapping = ((real, alias) for alias, real in cls.all_aliases())
    newdoc = {}
    reals = {real for _, real in cls.all_aliases()}
    for key in doc:
        if key not in reals:
            newdoc[key] = doc[key]
    for real, alias in mapping:
        if real in doc:
            newdoc[alias] = doc[real]

    if "attributes" in newdoc:
        raise Exception("Will overwrite doc field!")
    attributes = newdoc.copy()

    for field in cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS:
        value = attributes.pop(field, None)
        if value is not None:
            newdoc[field] = value
    for field in list(newdoc.keys()):
        if field not in cls.TOP_LEVEL_NON_ATTRIBUTES_FIELDS:
            del newdoc[field]

    newdoc["type"] = cls.ENDPOINT
    newdoc["attributes"] = attributes

    return newdoc

ElasticCollection

Bases: EntryCollection

Source code in optimade/server/entry_collections/elasticsearch.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
class ElasticCollection(EntryCollection):
    pagination_mechanism = PaginationMechanism("page_offset")

    def __init__(
        self,
        name: str,
        resource_cls: type[EntryResource],
        resource_mapper: type[BaseResourceMapper],
        client: Optional["Elasticsearch"] = None,
    ):
        """Initialize the ElasticCollection for the given parameters.

        Parameters:
            name: The name of the collection.
            resource_cls: The type of entry resource that is stored by the collection.
            resource_mapper: A resource mapper object that handles aliases and
                format changes between deserialization and response.
            client: A preconfigured Elasticsearch client.

        """
        super().__init__(
            resource_cls=resource_cls,
            resource_mapper=resource_mapper,
            transformer=ElasticTransformer(mapper=resource_mapper),
        )

        self.client = client if client else CLIENT
        self.name = name

        # If we are creating a new collection from scratch, also create the index,
        # otherwise assume it has already been created externally
        if CONFIG.insert_test_data:
            self.create_optimade_index()

    def count(self, *args, **kwargs) -> int:
        raise NotImplementedError

    def create_optimade_index(self) -> None:
        """Load or create an index that can handle aliased OPTIMADE fields and attach it
        to the current client.

        """
        body = self.predefined_index.get(self.name)
        if body is None:
            body = self.create_elastic_index_from_mapper(
                self.resource_mapper, self.all_fields
            )

        properties = {}
        for field in list(body["mappings"]["properties"].keys()):
            properties[self.resource_mapper.get_backend_field(field)] = body[
                "mappings"
            ]["properties"].pop(field)
        properties["id"] = {"type": "keyword"}
        body["mappings"]["properties"] = properties
        self.client.indices.create(index=self.name, ignore=400, **body)

        LOGGER.debug(f"Created Elastic index for {self.name!r} with parameters {body}")

    @property
    def predefined_index(self) -> dict[str, Any]:
        """Loads and returns the default pre-defined index."""
        with open(Path(__file__).parent.joinpath("elastic_indexes.json")) as f:
            index = json.load(f)
        return index

    @staticmethod
    def create_elastic_index_from_mapper(
        resource_mapper: type[BaseResourceMapper], fields: Iterable[str]
    ) -> dict[str, Any]:
        """Create a fallback elastic index based on a resource mapper.

        Arguments:
            resource_mapper: The resource mapper to create the index for.
            fields: The list of fields to use in the index.

        Returns:
            The parameters to pass to `client.indices.create(...)` (previously
                the 'body' parameters).

        """
        properties = {
            resource_mapper.get_optimade_field(field): {"type": "keyword"}
            for field in fields
        }
        properties["id"] = {"type": "keyword"}
        return {"mappings": {"properties": properties}}

    def __len__(self):
        """Returns the total number of entries in the collection."""
        return Search(using=self.client, index=self.name).execute().hits.total.value

    def insert(self, data: list[EntryResource]) -> None:
        """Add the given entries to the underlying database.

        Warning:
            No validation is performed on the incoming data.

        Arguments:
            data: The entry resource objects to add to the database.

        """

        def get_id(item):
            if self.name == "links":
                id_ = f"{item['id']}-{item['type']}"
            elif "id" in item:
                id_ = item["id"]
            elif "_id" in item:
                # use the existing MongoDB ids in the test data
                id_ = str(item["_id"])
            else:
                # ES will generate ids
                id_ = None
            item.pop("_id", None)
            return id_

        bulk(
            self.client,
            (
                {
                    "_index": self.name,
                    "_id": get_id(item),
                    "_source": item,
                }
                for item in data
            ),
        )

    def _run_db_query(
        self, criteria: dict[str, Any], single_entry=False
    ) -> tuple[list[dict[str, Any]], int, bool]:
        """Run the query on the backend and collect the results.

        Arguments:
            criteria: A dictionary representation of the query parameters.
            single_entry: Whether or not the caller is expecting a single entry response.

        Returns:
            The list of entries from the database (without any re-mapping), the total number of
            entries matching the query and a boolean for whether or not there is more data available.

        """

        search = Search(using=self.client, index=self.name)

        if criteria.get("filter", False):
            search = search.query(criteria["filter"])

        page_offset = criteria.get("skip", None)
        page_above = criteria.get("page_above", None)

        limit = criteria.get("limit", CONFIG.page_limit)

        all_aliased_fields = [
            self.resource_mapper.get_backend_field(field) for field in self.all_fields
        ]
        search = search.source(includes=all_aliased_fields)

        elastic_sort = [
            {field: {"order": "desc" if sort_dir == -1 else "asc"}}
            for field, sort_dir in criteria.get("sort", {})
        ]
        if not elastic_sort:
            elastic_sort = [
                {self.resource_mapper.get_backend_field("id"): {"order": "asc"}}
            ]

        search = search.sort(*elastic_sort)

        if page_offset:
            search = search[page_offset : page_offset + limit]

        elif page_above:
            search = search.extra(search_after=page_above, limit=limit)

        else:
            search = search[0:limit]
            page_offset = 0

        search = search.extra(track_total_hits=True)
        response = search.execute()

        results = [hit.to_dict() for hit in response.hits]

        more_data_available = False
        if not single_entry:
            data_returned = response.hits.total.value
            if page_above is not None:
                more_data_available = len(results) == limit and data_returned != limit
            else:
                more_data_available = page_offset + limit < data_returned
        else:
            # SingleEntryQueryParams, e.g., /structures/{entry_id}
            data_returned = len(results)

        return results, data_returned, more_data_available

all_fields: set[str] property

Get the set of all fields handled in this collection, from attribute fields in the schema, provider fields and top-level OPTIMADE fields.

The set of all fields are lazily created and then cached. This means the set is created the first time the property is requested and then cached.

Returns:

Type Description
set[str]

All fields handled in this collection.

predefined_index: dict[str, Any] property

Loads and returns the default pre-defined index.

__init__(name, resource_cls, resource_mapper, client=None)

Initialize the ElasticCollection for the given parameters.

Parameters:

Name Type Description Default
name str

The name of the collection.

required
resource_cls type[EntryResource]

The type of entry resource that is stored by the collection.

required
resource_mapper type[BaseResourceMapper]

A resource mapper object that handles aliases and format changes between deserialization and response.

required
client Optional[Elasticsearch]

A preconfigured Elasticsearch client.

None
Source code in optimade/server/entry_collections/elasticsearch.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    name: str,
    resource_cls: type[EntryResource],
    resource_mapper: type[BaseResourceMapper],
    client: Optional["Elasticsearch"] = None,
):
    """Initialize the ElasticCollection for the given parameters.

    Parameters:
        name: The name of the collection.
        resource_cls: The type of entry resource that is stored by the collection.
        resource_mapper: A resource mapper object that handles aliases and
            format changes between deserialization and response.
        client: A preconfigured Elasticsearch client.

    """
    super().__init__(
        resource_cls=resource_cls,
        resource_mapper=resource_mapper,
        transformer=ElasticTransformer(mapper=resource_mapper),
    )

    self.client = client if client else CLIENT
    self.name = name

    # If we are creating a new collection from scratch, also create the index,
    # otherwise assume it has already been created externally
    if CONFIG.insert_test_data:
        self.create_optimade_index()

__len__()

Returns the total number of entries in the collection.

Source code in optimade/server/entry_collections/elasticsearch.py
110
111
112
def __len__(self):
    """Returns the total number of entries in the collection."""
    return Search(using=self.client, index=self.name).execute().hits.total.value

create_elastic_index_from_mapper(resource_mapper, fields) staticmethod

Create a fallback elastic index based on a resource mapper.

Parameters:

Name Type Description Default
resource_mapper type[BaseResourceMapper]

The resource mapper to create the index for.

required
fields Iterable[str]

The list of fields to use in the index.

required

Returns:

Type Description
dict[str, Any]

The parameters to pass to client.indices.create(...) (previously the 'body' parameters).

Source code in optimade/server/entry_collections/elasticsearch.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@staticmethod
def create_elastic_index_from_mapper(
    resource_mapper: type[BaseResourceMapper], fields: Iterable[str]
) -> dict[str, Any]:
    """Create a fallback elastic index based on a resource mapper.

    Arguments:
        resource_mapper: The resource mapper to create the index for.
        fields: The list of fields to use in the index.

    Returns:
        The parameters to pass to `client.indices.create(...)` (previously
            the 'body' parameters).

    """
    properties = {
        resource_mapper.get_optimade_field(field): {"type": "keyword"}
        for field in fields
    }
    properties["id"] = {"type": "keyword"}
    return {"mappings": {"properties": properties}}

create_optimade_index()

Load or create an index that can handle aliased OPTIMADE fields and attach it to the current client.

Source code in optimade/server/entry_collections/elasticsearch.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def create_optimade_index(self) -> None:
    """Load or create an index that can handle aliased OPTIMADE fields and attach it
    to the current client.

    """
    body = self.predefined_index.get(self.name)
    if body is None:
        body = self.create_elastic_index_from_mapper(
            self.resource_mapper, self.all_fields
        )

    properties = {}
    for field in list(body["mappings"]["properties"].keys()):
        properties[self.resource_mapper.get_backend_field(field)] = body[
            "mappings"
        ]["properties"].pop(field)
    properties["id"] = {"type": "keyword"}
    body["mappings"]["properties"] = properties
    self.client.indices.create(index=self.name, ignore=400, **body)

    LOGGER.debug(f"Created Elastic index for {self.name!r} with parameters {body}")

find(params)

Fetches results and indicates if more data is available.

Also gives the total number of data available in the absence of page_limit. See EntryListingQueryParams for more information.

Returns a list of the mapped database reponse.

If no results match the query, then results is set to None.

Parameters:

Name Type Description Default
params Union[EntryListingQueryParams, SingleEntryQueryParams]

Entry listing URL query params.

required

Returns:

Type Description
Optional[Union[dict[str, Any], list[dict[str, Any]]]]

A tuple of various relevant values:

Optional[int]

(results, data_returned, more_data_available, exclude_fields, include_fields).

Source code in optimade/server/entry_collections/entry_collections.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def find(
    self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
) -> tuple[
    Optional[Union[dict[str, Any], list[dict[str, Any]]]],
    Optional[int],
    bool,
    set[str],
    set[str],
]:
    """
    Fetches results and indicates if more data is available.

    Also gives the total number of data available in the absence of `page_limit`.
    See [`EntryListingQueryParams`][optimade.server.query_params.EntryListingQueryParams]
    for more information.

    Returns a list of the mapped database reponse.

    If no results match the query, then `results` is set to `None`.

    Parameters:
        params: Entry listing URL query params.

    Returns:
        A tuple of various relevant values:
        (`results`, `data_returned`, `more_data_available`, `exclude_fields`, `include_fields`).

    """
    criteria = self.handle_query_params(params)
    single_entry = isinstance(params, SingleEntryQueryParams)
    response_fields: set[str] = criteria.pop("fields")

    raw_results, data_returned, more_data_available = self._run_db_query(
        criteria, single_entry
    )

    exclude_fields = self.all_fields - response_fields
    include_fields = (
        response_fields - self.resource_mapper.TOP_LEVEL_NON_ATTRIBUTES_FIELDS
    )

    bad_optimade_fields: set[str] = set()
    bad_provider_fields: set[str] = set()
    supported_prefixes = self.resource_mapper.SUPPORTED_PREFIXES
    all_attributes: set[str] = self.resource_mapper.ALL_ATTRIBUTES
    for field in include_fields:
        if field not in all_attributes:
            if field.startswith("_"):
                if any(
                    field.startswith(f"_{prefix}_") for prefix in supported_prefixes
                ):
                    bad_provider_fields.add(field)
            else:
                bad_optimade_fields.add(field)

    if bad_provider_fields:
        warnings.warn(
            message=f"Unrecognised field(s) for this provider requested in `response_fields`: {bad_provider_fields}.",
            category=UnknownProviderProperty,
        )

    if bad_optimade_fields:
        raise BadRequest(
            detail=f"Unrecognised OPTIMADE field(s) in requested `response_fields`: {bad_optimade_fields}."
        )

    results: Optional[Union[list[dict[str, Any]], dict[str, Any]]] = None

    if raw_results:
        results = [self.resource_mapper.map_back(doc) for doc in raw_results]

        if single_entry:
            results = results[0]

            if (
                CONFIG.validate_api_response
                and data_returned is not None
                and data_returned > 1
            ):
                raise NotFound(
                    detail=f"Instead of a single entry, {data_returned} entries were found",
                )
            else:
                data_returned = 1

    return (
        results,
        data_returned,
        more_data_available,
        exclude_fields,
        include_fields,
    )

get_attribute_fields()

Get the set of attribute fields

Return only the first-level attribute fields from the schema of the resource class, resolving references along the way if needed.

Note

It is not needed to take care of other special OpenAPI schema keys than allOf, since only allOf will be found in this context. Other special keys can be found in the Swagger documentation.

Returns:

Type Description
set[str]

Property names.

Source code in optimade/server/entry_collections/entry_collections.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def get_attribute_fields(self) -> set[str]:
    """Get the set of attribute fields

    Return only the _first-level_ attribute fields from the schema of the resource class,
    resolving references along the way if needed.

    Note:
        It is not needed to take care of other special OpenAPI schema keys than `allOf`,
        since only `allOf` will be found in this context.
        Other special keys can be found in [the Swagger documentation](https://swagger.io/docs/specification/data-models/oneof-anyof-allof-not/).

    Returns:
        Property names.

    """
    annotation = _get_origin_type(
        self.resource_cls.model_fields["attributes"].annotation
    )

    if annotation in (None, NoneType) or not issubclass(annotation, Attributes):
        raise TypeError(
            "resource class 'attributes' field must be a subclass of 'EntryResourceAttributes'"
        )

    return set(annotation.model_fields)  # type: ignore[attr-defined]

get_next_query_params(params, results)

Provides url query pagination parameters that will be used in the next link.

Parameters:

Name Type Description Default
results Optional[Union[dict[str, Any], list[dict[str, Any]]]]

The results produced by find.

required
params EntryListingQueryParams

The parsed request params produced by handle_query_params.

required

Returns:

Type Description
dict[str, list[str]]

A dictionary with the necessary query parameters.

Source code in optimade/server/entry_collections/entry_collections.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def get_next_query_params(
    self,
    params: EntryListingQueryParams,
    results: Optional[Union[dict[str, Any], list[dict[str, Any]]]],
) -> dict[str, list[str]]:
    """Provides url query pagination parameters that will be used in the next
    link.

    Arguments:
        results: The results produced by find.
        params: The parsed request params produced by handle_query_params.

    Returns:
        A dictionary with the necessary query parameters.

    """
    query: dict[str, list[str]] = dict()
    if isinstance(results, list) and results:
        # If a user passed a particular pagination mechanism, keep using it
        # Otherwise, use the default pagination mechanism of the collection
        pagination_mechanism = PaginationMechanism.OFFSET
        for pagination_key in (
            "page_offset",
            "page_number",
            "page_above",
        ):
            if getattr(params, pagination_key, None) is not None:
                pagination_mechanism = PaginationMechanism(pagination_key)
                break

        if pagination_mechanism == PaginationMechanism.OFFSET:
            query["page_offset"] = [
                str(params.page_offset + len(results))  # type: ignore[list-item]
            ]

    return query

handle_query_params(params)

Parse and interpret the backend-agnostic query parameter models into a dictionary that can be used by the specific backend.

Note

Currently this method returns the pymongo interpretation of the parameters, which will need modification for modified for other backends.

Parameters:

Name Type Description Default
params Union[EntryListingQueryParams, SingleEntryQueryParams]

The initialized query parameter model from the server.

required

Raises:

Type Description
Forbidden

If too large of a page limit is provided.

BadRequest

If an invalid request is made, e.g., with incorrect fields or response format.

Returns:

Type Description
dict[str, Any]

A dictionary representation of the query parameters.

Source code in optimade/server/entry_collections/entry_collections.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def handle_query_params(
    self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
) -> dict[str, Any]:
    """Parse and interpret the backend-agnostic query parameter models into a dictionary
    that can be used by the specific backend.

    Note:
        Currently this method returns the pymongo interpretation of the parameters,
        which will need modification for modified for other backends.

    Parameters:
        params: The initialized query parameter model from the server.

    Raises:
        Forbidden: If too large of a page limit is provided.
        BadRequest: If an invalid request is made, e.g., with incorrect fields
            or response format.

    Returns:
        A dictionary representation of the query parameters.

    """
    cursor_kwargs = {}

    # filter
    if getattr(params, "filter", False):
        cursor_kwargs["filter"] = self.transformer.transform(
            self.parser.parse(params.filter)  # type: ignore[union-attr]
        )
    else:
        cursor_kwargs["filter"] = {}

    # response_format
    if (
        getattr(params, "response_format", False)
        and params.response_format != "json"
    ):
        raise BadRequest(
            detail=f"Response format {params.response_format} is not supported, please use response_format='json'"
        )

    # page_limit
    if getattr(params, "page_limit", False):
        limit = params.page_limit  # type: ignore[union-attr]
        if limit > CONFIG.page_limit_max:
            raise Forbidden(
                detail=f"Max allowed page_limit is {CONFIG.page_limit_max}, you requested {limit}",
            )
        cursor_kwargs["limit"] = limit
    else:
        cursor_kwargs["limit"] = CONFIG.page_limit

    # response_fields
    cursor_kwargs["projection"] = {
        f"{self.resource_mapper.get_backend_field(f)}": True
        for f in self.all_fields
    }

    if getattr(params, "response_fields", False):
        response_fields = set(params.response_fields.split(","))
        response_fields |= self.resource_mapper.get_required_fields()
    else:
        response_fields = self.all_fields.copy()

    cursor_kwargs["fields"] = response_fields

    # sort
    if getattr(params, "sort", False):
        cursor_kwargs["sort"] = self.parse_sort_params(params.sort)  # type: ignore[union-attr]

    # warn if multiple pagination keys are present, and only use the first from this list
    received_pagination_option = False
    warn_multiple_keys = False

    if getattr(params, "page_offset", False):
        received_pagination_option = True
        cursor_kwargs["skip"] = params.page_offset  # type: ignore[union-attr]

    if isinstance(getattr(params, "page_number", None), int):
        if received_pagination_option:
            warn_multiple_keys = True
        else:
            received_pagination_option = True
            if params.page_number < 1:  # type: ignore[union-attr]
                warnings.warn(
                    message=f"'page_number' is 1-based, using 'page_number=1' instead of {params.page_number}",  # type: ignore[union-attr]
                    category=QueryParamNotUsed,
                )
                page_number = 1
            else:
                page_number = params.page_number  # type: ignore[union-attr]
            cursor_kwargs["skip"] = (page_number - 1) * cursor_kwargs["limit"]

    if isinstance(getattr(params, "page_above", None), str):
        if received_pagination_option:
            warn_multiple_keys = True
        else:
            received_pagination_option = True
            cursor_kwargs["page_above"] = params.page_above  # type: ignore[union-attr]

    if warn_multiple_keys:
        warnings.warn(
            message="Multiple pagination keys were provided, only using the first one of 'page_offset', 'page_number' or 'page_above'",
            category=QueryParamNotUsed,
        )

    return cursor_kwargs

insert(data)

Add the given entries to the underlying database.

Warning

No validation is performed on the incoming data.

Parameters:

Name Type Description Default
data list[EntryResource]

The entry resource objects to add to the database.

required
Source code in optimade/server/entry_collections/elasticsearch.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def insert(self, data: list[EntryResource]) -> None:
    """Add the given entries to the underlying database.

    Warning:
        No validation is performed on the incoming data.

    Arguments:
        data: The entry resource objects to add to the database.

    """

    def get_id(item):
        if self.name == "links":
            id_ = f"{item['id']}-{item['type']}"
        elif "id" in item:
            id_ = item["id"]
        elif "_id" in item:
            # use the existing MongoDB ids in the test data
            id_ = str(item["_id"])
        else:
            # ES will generate ids
            id_ = None
        item.pop("_id", None)
        return id_

    bulk(
        self.client,
        (
            {
                "_index": self.name,
                "_id": get_id(item),
                "_source": item,
            }
            for item in data
        ),
    )

parse_sort_params(sort_params)

Handles any sort parameters passed to the collection, resolving aliases and dealing with any invalid fields.

Raises:

Type Description
BadRequest

if an invalid sort is requested.

Returns:

Type Description
Iterable[tuple[str, int]]

A list of tuples containing the aliased field name and

Iterable[tuple[str, int]]

sort direction encoded as 1 (ascending) or -1 (descending).

Source code in optimade/server/entry_collections/entry_collections.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def parse_sort_params(self, sort_params: str) -> Iterable[tuple[str, int]]:
    """Handles any sort parameters passed to the collection,
    resolving aliases and dealing with any invalid fields.

    Raises:
        BadRequest: if an invalid sort is requested.

    Returns:
        A list of tuples containing the aliased field name and
        sort direction encoded as 1 (ascending) or -1 (descending).

    """
    sort_spec: list[tuple[str, int]] = []
    for field in sort_params.split(","):
        sort_dir = 1
        if field.startswith("-"):
            field = field[1:]
            sort_dir = -1
        aliased_field = self.resource_mapper.get_backend_field(field)
        sort_spec.append((aliased_field, sort_dir))

    unknown_fields = [
        field
        for field, _ in sort_spec
        if self.resource_mapper.get_optimade_field(field) not in self.all_fields
    ]

    if unknown_fields:
        error_detail = "Unable to sort on unknown field{} '{}'".format(
            "s" if len(unknown_fields) > 1 else "",
            "', '".join(unknown_fields),
        )

        # If all unknown fields are "other" provider-specific, then only provide a warning
        if all(
            (
                re.match(r"_[a-z_0-9]+_[a-z_0-9]*", field)
                and not field.startswith(f"_{self.provider_prefix}_")
            )
            for field in unknown_fields
        ):
            warnings.warn(error_detail, FieldValueNotRecognized)

        # Otherwise, if all fields are unknown, or some fields are unknown and do not
        # have other provider prefixes, then return 400: Bad Request
        else:
            raise BadRequest(detail=error_detail)

    # If at least one valid field has been provided for sorting, then use that
    sort_spec = [
        (field, sort_dir)
        for field, sort_dir in sort_spec
        if field not in unknown_fields
    ]

    return sort_spec

ElasticTransformer

Bases: BaseTransformer

Transformer that transforms v0.10.1/v1.0 grammar parse trees into Elasticsearch queries.

Uses elasticsearch_dsl and will produce an elasticsearch_dsl.Q instance.

Source code in optimade/filtertransformers/elasticsearch.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
class ElasticTransformer(BaseTransformer):
    """Transformer that transforms ``v0.10.1``/`v1.0` grammar parse
    trees into Elasticsearch queries.

    Uses elasticsearch_dsl and will produce an `elasticsearch_dsl.Q` instance.

    """

    operator_map = {
        "<": "lt",
        "<=": "lte",
        ">": "gt",
        ">=": "gte",
    }

    _quantity_type: type[ElasticsearchQuantity] = ElasticsearchQuantity

    def __init__(
        self,
        mapper: type[BaseResourceMapper],
        quantities: Optional[dict[str, Quantity]] = None,
    ):
        if quantities is not None:
            self.quantities = quantities

        super().__init__(mapper=mapper)

    def _field(
        self, quantity: Union[str, Quantity], nested: Optional[Quantity] = None
    ) -> str:
        """Used to unwrap from `property` to the string backend field name.

        If passed a `Quantity` (or a derived `ElasticsearchQuantity`), this method
        returns the backend field name, modulo some handling of nested fields.

        If passed a string quantity name:
        - Check that the name does not match a relationship type,
          raising a `NotImplementedError` if it does.
        - If the string is prefixed by an underscore, assume this is a
          provider-specific field from another provider and simply return it.
          The original `property` rule would have already filtered out provider
          fields for this backend appropriately as `Quantity` objects.

        Returns:
            The field name to use for database queries.

        """

        if isinstance(quantity, str):
            if quantity in self.mapper.RELATIONSHIP_ENTRY_TYPES:  # type: ignore[union-attr]
                raise NotImplementedError(
                    f"Unable to filter on relationships with type {quantity!r}"
                )

            # In this case, the property rule has already filtered out fields
            # that do not match this provider, so this indicates an "other provider"
            # field that should be passed over
            if quantity.startswith("_"):
                return quantity

        if nested is not None:
            return f"{nested.backend_field}.{quantity.name}"  # type: ignore[union-attr]

        return quantity.backend_field  # type: ignore[union-attr, return-value]

    def _query_op(
        self,
        quantity: Union[ElasticsearchQuantity, str],
        op: str,
        value: Union[str, float, int],
        nested: Optional[ElasticsearchQuantity] = None,
    ) -> Q:
        """Return a range, match, or term query for the given quantity, comparison
        operator, and value.

        Returns:
            An elasticsearch_dsl query.

        Raises:
            BadRequest: If the query is not well-defined or is not supported.
        """
        field = self._field(quantity, nested=nested)
        if op in self.operator_map:
            return Q("range", **{field: {self.operator_map[op]: value}})

        # If quantity is an "other provider" field then use Keyword as the default
        # mapping type. These queries should not match on anything as the field
        # is not present in the index.
        elastic_mapping_type = Keyword
        if isinstance(quantity, ElasticsearchQuantity):
            elastic_mapping_type = quantity.elastic_mapping_type

        if elastic_mapping_type == Text:
            query_type = "match"
        elif elastic_mapping_type in [Keyword, Integer]:
            query_type = "term"
        else:
            raise NotImplementedError("Quantity has unsupported ES field type")

        if op in ["=", ""]:
            return Q(query_type, **{field: value})

        if op == "!=":
            # != queries must also include an existence check
            # Note that for MongoDB, `$exists` will include null-valued fields,
            # where as in ES `exists` excludes them.
            # pylint: disable=invalid-unary-operand-type
            return ~Q(query_type, **{field: value}) & Q("exists", field=field)

    def _has_query_op(self, quantities, op, predicate_zip_list):
        """Returns a bool query that combines the operator calls `_query_op`
        for each predicate and zipped quantity predicate combination.
        """
        if op == "HAS":
            kind = "must"  # in case of HAS we do a must over the "list" of the one given element
        elif op == "HAS ALL":
            kind = "must"
        elif op == "HAS ANY":
            kind = "should"
        elif op == "HAS ONLY":
            # HAS ONLY comes with heavy limitations, because there is no such thing
            # in elastic search. Only supported for elements, where we can construct
            # an anonymous "formula" based on elements sorted by order number and
            # can do a = comparision to check if all elements are contained

            # @ml-evs: Disabling this HAS ONLY workaround as tests are not passing
            raise NotImplementedError(
                "HAS ONLY queries are not currently supported by the Elasticsearch backend."
            )

            # from optimade.models import CHEMICAL_SYMBOLS, ATOMIC_NUMBERS

            # if len(quantities) > 1:
            #     raise NotImplementedError("HAS ONLY is not supported with zip")
            # quantity = quantities[0]

            # if quantity.has_only_quantity is None:
            #     raise NotImplementedError(
            #         "HAS ONLY is not supported by %s" % quantity.name
            #     )

            # def values():
            #     for predicates in predicate_zip_list:
            #         if len(predicates) != 1:
            #             raise NotImplementedError("Tuples not supported in HAS ONLY")
            #         op, value = predicates[0]
            #         if op != "=":
            #             raise NotImplementedError(
            #                 "Predicated not supported in HAS ONLY"
            #             )
            #         if not isinstance(value, str):
            #             raise NotImplementedError("Only strings supported in HAS ONLY")
            #         yield value

            # try:
            #     order_numbers = list([ATOMIC_NUMBERS[element] for element in values()])
            #     order_numbers.sort()
            #     value = "".join(
            #         [CHEMICAL_SYMBOLS[number - 1] for number in order_numbers]
            #     )
            # except KeyError:
            #     raise NotImplementedError(
            #         "HAS ONLY is only supported for chemical symbols"
            #     )

            # return Q("term", **{quantity.has_only_quantity.name: value})
        else:
            raise NotImplementedError(f"Unrecognised operation {op}.")

        queries = [
            self._has_query(quantities, predicates) for predicates in predicate_zip_list
        ]
        return Q("bool", **{kind: queries})

    def _has_query(self, quantities, predicates):
        """
        Returns a bool query that combines the operator queries ():func:`_query_op`)
        for quantity pericate combination.
        """
        if len(quantities) != len(predicates):
            raise ValueError(
                "Tuple length does not match: %s <o> %s "
                % (":".join(quantities), ":".join(predicates))
            )

        if len(quantities) == 1:
            o, value = predicates[0]
            return self._query_op(quantities[0], o, value)

        nested_quantity = quantities[0].nested_quantity
        same_nested_quantity = any(
            q.nested_quantity != nested_quantity for q in quantities
        )
        if nested_quantity is None or same_nested_quantity:
            raise NotImplementedError(
                "Expression with tuples are only supported for %s"
                % ", ".join(quantities)
            )

        queries = [
            self._query_op(quantity, o, value, nested=nested_quantity)
            for quantity, (o, value) in zip(quantities, predicates)
        ]

        return Q(
            "nested",
            path=self._field(nested_quantity),
            query=dict(bool=dict(must=queries)),
        )

    def __default__(self, tree, children, *args, **kwargs):
        """Default behavior for rules that only replace one symbol with another"""
        return children[0]

    def filter(self, args):
        # filter: expression*
        if len(args) == 1:
            return args[0]
        return Q("bool", **{"must": args})

    def expression_clause(self, args):
        # expression_clause: expression_phrase ( _AND expression_phrase )*
        result = args[0]
        for arg in args[1:]:
            result &= arg
        return result

    def expression(self, args):
        # expression: expression_clause ( _OR expression_clause )*
        result = args[0]
        for arg in args[1:]:
            result |= arg
        return result

    def expression_phrase(self, args):
        # expression_phrase: [ NOT ] ( operator | "(" expression ")" )
        if args[0] == "NOT":
            return ~args[1]
        return args[0]

    @v_args(inline=True)
    def property_first_comparison(self, quantity, query):
        # property_first_comparison: property *_rhs
        return query(quantity)

    @v_args(inline=True)
    def constant_first_comparison(self, value, op, quantity):
        # constant_first_comparison: constant OPERATOR ( non_string_value | ...not_implemented_string )
        if not isinstance(quantity, ElasticsearchQuantity):
            raise TypeError("Only quantities can be compared to constant values.")

        return self._query_op(quantity, self._reversed_operator_map[op], value)

    @v_args(inline=True)
    def value_op_rhs(self, op, value):
        # value_op_rhs: OPERATOR value
        return lambda quantity: self._query_op(quantity, op, value)

    def length_op_rhs(self, args):
        # length_op_rhs: LENGTH [ OPERATOR ] signed_int
        value = args[-1]
        if len(args) == 3:
            op = args[1]
        else:
            op = "="

        def query(quantity):
            # This is only the case if quantity is an "other" provider's field,
            # in which case, we should treat it as unknown and try to do a null query
            if isinstance(quantity, str):
                return self._query_op(quantity, op, value)

            if quantity.length_quantity is None:
                raise NotImplementedError(
                    f"LENGTH is not supported for {quantity.name!r}"
                )
            quantity = quantity.length_quantity
            return self._query_op(quantity, op, value)

        return query

    @v_args(inline=True)
    def known_op_rhs(self, _, value):
        # known_op_rhs: IS ( KNOWN | UNKNOWN )

        def query(quantity):
            query = Q("exists", field=self._field(quantity))
            if value == "KNOWN":
                return query
            elif value == "UNKNOWN":
                return ~query  # pylint: disable=invalid-unary-operand-type
            raise NotImplementedError

        return query

    def set_op_rhs(self, args):
        # set_op_rhs: HAS ( [ OPERATOR ] value | ALL value_list | ... )
        values = args[-1]
        if not isinstance(values, list):
            if len(args) == 3:
                op = args[1]
            else:
                op = "="
            values = [(op, values)]

        if len(args) == 3:
            op = "HAS " + args[1]
        else:
            op = "HAS"

        return lambda quantity: self._has_query_op(
            [quantity], op, [[value] for value in values]
        )

    def set_zip_op_rhs(self, args):
        # set_zip_op_rhs: property_zip_addon HAS ( value_zip | ONLY value_zip_list | ALL value_zip_list | ANY value_zip_list )
        add_on = args[0]
        values = args[-1]
        if len(args) == 4:
            op = "HAS " + args[2]
        else:
            op = "HAS"
            values = [values]

        return lambda quantity: self._has_query_op([quantity] + add_on, op, values)

    def property_zip_addon(self, args):
        raise NotImplementedError("Correlated list queries are not supported.")
        return args

    def value_zip(self, args):
        raise NotImplementedError("Correlated list queries are not supported.")
        return self.value_list(args)

    def value_zip_list(self, args):
        raise NotImplementedError("Correlated list queries are not supported.")
        return args

    def value_list(self, args):
        result = []
        op = "="
        for arg in args:
            if arg in ["<", "<=", ">", ">=", "!=", "="]:
                op = arg
            else:
                result.append(
                    (
                        op,
                        arg,
                    )
                )
                op = "="
        return result

    def fuzzy_string_op_rhs(self, args):
        op = args[0]
        value = args[-1]
        if op == "CONTAINS":
            wildcard = "*%s*" % value
        if op == "STARTS":
            wildcard = "%s*" % value
        if op == "ENDS":
            wildcard = "*%s" % value

        return lambda quantity: Q("wildcard", **{self._field(quantity): wildcard})

    @v_args(inline=True)
    def string(self, string):
        # string: ESCAPED_STRING
        return string.strip('"')

    @v_args(inline=True)
    def signed_int(self, number):
        # signed_int : SIGNED_INT
        return int(number)

    @v_args(inline=True)
    def number(self, number):
        # number: SIGNED_INT | SIGNED_FLOAT
        if TYPE_CHECKING:  # pragma: no cover
            type_: Union[type[int], type[float]]

        if number.type == "SIGNED_INT":
            type_ = int
        elif number.type == "SIGNED_FLOAT":
            type_ = float
        return type_(number)

backend_mapping: dict[str, Quantity] property

A mapping between backend field names (aliases) and the corresponding Quantity object.

__default__(tree, children, *args, **kwargs)

Default behavior for rules that only replace one symbol with another

Source code in optimade/filtertransformers/elasticsearch.py
295
296
297
def __default__(self, tree, children, *args, **kwargs):
    """Default behavior for rules that only replace one symbol with another"""
    return children[0]

comparison(value)

comparison: constant_first_comparison | property_first_comparison

Source code in optimade/filtertransformers/base_transformer.py
300
301
302
303
304
@v_args(inline=True)
def comparison(self, value):
    """comparison: constant_first_comparison | property_first_comparison"""
    # Note: Return as is.
    return value

constant(value)

constant: string | number

Source code in optimade/filtertransformers/base_transformer.py
198
199
200
201
202
@v_args(inline=True)
def constant(self, value):
    """constant: string | number"""
    # Note: Return as is.
    return value

non_string_value(value)

non_string_value: number | property

Source code in optimade/filtertransformers/base_transformer.py
210
211
212
213
214
@v_args(inline=True)
def non_string_value(self, value):
    """non_string_value: number | property"""
    # Note: Return as is.
    return value

not_implemented_string(value)

not_implemented_string: value

Raises:

Type Description
NotImplementedError

For further information, see Materials-Consortia/OPTIMADE issue 157: https://github.com/Materials-Consortia/OPTIMADE/issues/157

Source code in optimade/filtertransformers/base_transformer.py
216
217
218
219
220
221
222
223
224
225
@v_args(inline=True)
def not_implemented_string(self, value):
    """not_implemented_string: value

    Raises:
        NotImplementedError: For further information, see Materials-Consortia/OPTIMADE issue 157:
            https://github.com/Materials-Consortia/OPTIMADE/issues/157

    """
    raise NotImplementedError("Comparing strings is not yet implemented.")

postprocess(query)

Post-process the query according to the rules defined for the backend, returning the backend-specific query.

Source code in optimade/filtertransformers/base_transformer.py
174
175
176
177
178
179
def postprocess(self, query) -> Any:
    """Post-process the query according to the rules defined for
    the backend, returning the backend-specific query.

    """
    return query

property(args)

property: IDENTIFIER ( "." IDENTIFIER )*

If this transformer has an associated mapper, the property will be compared to possible relationship entry types and for any supported provider prefixes. If there is a match, this rule will return a string and not a dereferenced Quantity.

Raises:

Type Description
BadRequest

If the property does not match any of the above rules.

Source code in optimade/filtertransformers/base_transformer.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def property(self, args: list) -> Any:
    """property: IDENTIFIER ( "." IDENTIFIER )*

    If this transformer has an associated mapper, the property
    will be compared to possible relationship entry types and
    for any supported provider prefixes. If there is a match,
    this rule will return a string and not a dereferenced
    [`Quantity`][optimade.filtertransformers.base_transformer.Quantity].

    Raises:
        BadRequest: If the property does not match any
            of the above rules.

    """
    quantity_name = str(args[0])

    # If the quantity name matches an entry type (indicating a relationship filter)
    # then simply return the quantity name; the inherited property
    # must then handle any further nested identifiers
    if self.mapper:
        if quantity_name in self.mapper.RELATIONSHIP_ENTRY_TYPES:
            return quantity_name

    if self.quantities and quantity_name not in self.quantities:
        # If the quantity is provider-specific, but does not match this provider,
        # then return the quantity name such that it can be treated as unknown.
        # If the prefix does not match another known provider, also emit a warning
        # If the prefix does match a known provider, do not return a warning.
        # Following [Handling unknown property names](https://github.com/Materials-Consortia/OPTIMADE/blob/master/optimade.rst#handling-unknown-property-names)
        if self.mapper and quantity_name.startswith("_"):
            prefix = quantity_name.split("_")[1]
            if prefix not in self.mapper.SUPPORTED_PREFIXES:
                if prefix not in self.mapper.KNOWN_PROVIDER_PREFIXES:
                    warnings.warn(
                        UnknownProviderProperty(
                            f"Field {quantity_name!r} has an unrecognised prefix: this property has been treated as UNKNOWN."
                        )
                    )

                return quantity_name

        raise BadRequest(
            detail=f"'{quantity_name}' is not a known or searchable quantity"
        )

    quantity = self.quantities.get(quantity_name, None)
    if quantity is None:
        quantity = self._quantity_type(name=str(quantity_name))

    return quantity

transform(tree)

Transform the query using the Lark Transformer then run the backend-specific post-processing methods.

Source code in optimade/filtertransformers/base_transformer.py
181
182
183
184
185
186
def transform(self, tree: Tree) -> Any:
    """Transform the query using the Lark `Transformer` then run the
    backend-specific post-processing methods.

    """
    return self.postprocess(super().transform(tree))

value(value)

value: string | number | property

Source code in optimade/filtertransformers/base_transformer.py
204
205
206
207
208
@v_args(inline=True)
def value(self, value):
    """value: string | number | property"""
    # Note: Return as is.
    return value

EntryCollection

Bases: ABC

Backend-agnostic base class for querying collections of EntryResources.

Source code in optimade/server/entry_collections/entry_collections.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
class EntryCollection(ABC):
    """Backend-agnostic base class for querying collections of
    [`EntryResource`][optimade.models.entries.EntryResource]s."""

    pagination_mechanism = PaginationMechanism("page_offset")
    """The default pagination mechansim to use with a given collection,
    if the user does not provide any pagination query parameters.
    """

    def __init__(
        self,
        resource_cls: type[EntryResource],
        resource_mapper: type[BaseResourceMapper],
        transformer: Transformer,
    ):
        """Initialize the collection for the given parameters.

        Parameters:
            resource_cls (EntryResource): The `EntryResource` model
                that is stored by the collection.
            resource_mapper (BaseResourceMapper): A resource mapper
                object that handles aliases and format changes between
                deserialization and response.
            transformer (Transformer): The Lark `Transformer` used to
                interpret the filter.

        """
        self.parser = LarkParser()
        self.resource_cls = resource_cls
        self.resource_mapper = resource_mapper
        self.transformer = transformer

        self.provider_prefix = CONFIG.provider.prefix
        self.provider_fields = [
            field if isinstance(field, str) else field["name"]
            for field in CONFIG.provider_fields.get(resource_mapper.ENDPOINT, [])
        ]

        self._all_fields: set[str] = set()

    @abstractmethod
    def __len__(self) -> int:
        """Returns the total number of entries in the collection."""

    @abstractmethod
    def insert(self, data: list[EntryResource]) -> None:
        """Add the given entries to the underlying database.

        Arguments:
            data: The entry resource objects to add to the database.

        """

    @abstractmethod
    def count(self, **kwargs: Any) -> Optional[int]:
        """Returns the number of entries matching the query specified
        by the keyword arguments.

        Parameters:
            **kwargs: Query parameters as keyword arguments.

        """

    def find(
        self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
    ) -> tuple[
        Optional[Union[dict[str, Any], list[dict[str, Any]]]],
        Optional[int],
        bool,
        set[str],
        set[str],
    ]:
        """
        Fetches results and indicates if more data is available.

        Also gives the total number of data available in the absence of `page_limit`.
        See [`EntryListingQueryParams`][optimade.server.query_params.EntryListingQueryParams]
        for more information.

        Returns a list of the mapped database reponse.

        If no results match the query, then `results` is set to `None`.

        Parameters:
            params: Entry listing URL query params.

        Returns:
            A tuple of various relevant values:
            (`results`, `data_returned`, `more_data_available`, `exclude_fields`, `include_fields`).

        """
        criteria = self.handle_query_params(params)
        single_entry = isinstance(params, SingleEntryQueryParams)
        response_fields: set[str] = criteria.pop("fields")

        raw_results, data_returned, more_data_available = self._run_db_query(
            criteria, single_entry
        )

        exclude_fields = self.all_fields - response_fields
        include_fields = (
            response_fields - self.resource_mapper.TOP_LEVEL_NON_ATTRIBUTES_FIELDS
        )

        bad_optimade_fields: set[str] = set()
        bad_provider_fields: set[str] = set()
        supported_prefixes = self.resource_mapper.SUPPORTED_PREFIXES
        all_attributes: set[str] = self.resource_mapper.ALL_ATTRIBUTES
        for field in include_fields:
            if field not in all_attributes:
                if field.startswith("_"):
                    if any(
                        field.startswith(f"_{prefix}_") for prefix in supported_prefixes
                    ):
                        bad_provider_fields.add(field)
                else:
                    bad_optimade_fields.add(field)

        if bad_provider_fields:
            warnings.warn(
                message=f"Unrecognised field(s) for this provider requested in `response_fields`: {bad_provider_fields}.",
                category=UnknownProviderProperty,
            )

        if bad_optimade_fields:
            raise BadRequest(
                detail=f"Unrecognised OPTIMADE field(s) in requested `response_fields`: {bad_optimade_fields}."
            )

        results: Optional[Union[list[dict[str, Any]], dict[str, Any]]] = None

        if raw_results:
            results = [self.resource_mapper.map_back(doc) for doc in raw_results]

            if single_entry:
                results = results[0]

                if (
                    CONFIG.validate_api_response
                    and data_returned is not None
                    and data_returned > 1
                ):
                    raise NotFound(
                        detail=f"Instead of a single entry, {data_returned} entries were found",
                    )
                else:
                    data_returned = 1

        return (
            results,
            data_returned,
            more_data_available,
            exclude_fields,
            include_fields,
        )

    @abstractmethod
    def _run_db_query(
        self, criteria: dict[str, Any], single_entry: bool = False
    ) -> tuple[list[dict[str, Any]], Optional[int], bool]:
        """Run the query on the backend and collect the results.

        Arguments:
            criteria: A dictionary representation of the query parameters.
            single_entry: Whether or not the caller is expecting a single entry response.

        Returns:
            The list of entries from the database (without any re-mapping), the total number of
            entries matching the query and a boolean for whether or not there is more data available.

        """

    @property
    def all_fields(self) -> set[str]:
        """Get the set of all fields handled in this collection,
        from attribute fields in the schema, provider fields and top-level OPTIMADE fields.

        The set of all fields are lazily created and then cached.
        This means the set is created the first time the property is requested and then cached.

        Returns:
            All fields handled in this collection.

        """
        if not self._all_fields:
            # All OPTIMADE fields
            self._all_fields = (
                self.resource_mapper.TOP_LEVEL_NON_ATTRIBUTES_FIELDS.copy()
            )
            self._all_fields |= self.get_attribute_fields()
            # All provider-specific fields
            self._all_fields |= {
                f"_{self.provider_prefix}_{field_name}"
                if not field_name.startswith("_")
                else field_name
                for field_name in self.provider_fields
            }

        return self._all_fields

    def get_attribute_fields(self) -> set[str]:
        """Get the set of attribute fields

        Return only the _first-level_ attribute fields from the schema of the resource class,
        resolving references along the way if needed.

        Note:
            It is not needed to take care of other special OpenAPI schema keys than `allOf`,
            since only `allOf` will be found in this context.
            Other special keys can be found in [the Swagger documentation](https://swagger.io/docs/specification/data-models/oneof-anyof-allof-not/).

        Returns:
            Property names.

        """
        annotation = _get_origin_type(
            self.resource_cls.model_fields["attributes"].annotation
        )

        if annotation in (None, NoneType) or not issubclass(annotation, Attributes):
            raise TypeError(
                "resource class 'attributes' field must be a subclass of 'EntryResourceAttributes'"
            )

        return set(annotation.model_fields)  # type: ignore[attr-defined]

    def handle_query_params(
        self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
    ) -> dict[str, Any]:
        """Parse and interpret the backend-agnostic query parameter models into a dictionary
        that can be used by the specific backend.

        Note:
            Currently this method returns the pymongo interpretation of the parameters,
            which will need modification for modified for other backends.

        Parameters:
            params: The initialized query parameter model from the server.

        Raises:
            Forbidden: If too large of a page limit is provided.
            BadRequest: If an invalid request is made, e.g., with incorrect fields
                or response format.

        Returns:
            A dictionary representation of the query parameters.

        """
        cursor_kwargs = {}

        # filter
        if getattr(params, "filter", False):
            cursor_kwargs["filter"] = self.transformer.transform(
                self.parser.parse(params.filter)  # type: ignore[union-attr]
            )
        else:
            cursor_kwargs["filter"] = {}

        # response_format
        if (
            getattr(params, "response_format", False)
            and params.response_format != "json"
        ):
            raise BadRequest(
                detail=f"Response format {params.response_format} is not supported, please use response_format='json'"
            )

        # page_limit
        if getattr(params, "page_limit", False):
            limit = params.page_limit  # type: ignore[union-attr]
            if limit > CONFIG.page_limit_max:
                raise Forbidden(
                    detail=f"Max allowed page_limit is {CONFIG.page_limit_max}, you requested {limit}",
                )
            cursor_kwargs["limit"] = limit
        else:
            cursor_kwargs["limit"] = CONFIG.page_limit

        # response_fields
        cursor_kwargs["projection"] = {
            f"{self.resource_mapper.get_backend_field(f)}": True
            for f in self.all_fields
        }

        if getattr(params, "response_fields", False):
            response_fields = set(params.response_fields.split(","))
            response_fields |= self.resource_mapper.get_required_fields()
        else:
            response_fields = self.all_fields.copy()

        cursor_kwargs["fields"] = response_fields

        # sort
        if getattr(params, "sort", False):
            cursor_kwargs["sort"] = self.parse_sort_params(params.sort)  # type: ignore[union-attr]

        # warn if multiple pagination keys are present, and only use the first from this list
        received_pagination_option = False
        warn_multiple_keys = False

        if getattr(params, "page_offset", False):
            received_pagination_option = True
            cursor_kwargs["skip"] = params.page_offset  # type: ignore[union-attr]

        if isinstance(getattr(params, "page_number", None), int):
            if received_pagination_option:
                warn_multiple_keys = True
            else:
                received_pagination_option = True
                if params.page_number < 1:  # type: ignore[union-attr]
                    warnings.warn(
                        message=f"'page_number' is 1-based, using 'page_number=1' instead of {params.page_number}",  # type: ignore[union-attr]
                        category=QueryParamNotUsed,
                    )
                    page_number = 1
                else:
                    page_number = params.page_number  # type: ignore[union-attr]
                cursor_kwargs["skip"] = (page_number - 1) * cursor_kwargs["limit"]

        if isinstance(getattr(params, "page_above", None), str):
            if received_pagination_option:
                warn_multiple_keys = True
            else:
                received_pagination_option = True
                cursor_kwargs["page_above"] = params.page_above  # type: ignore[union-attr]

        if warn_multiple_keys:
            warnings.warn(
                message="Multiple pagination keys were provided, only using the first one of 'page_offset', 'page_number' or 'page_above'",
                category=QueryParamNotUsed,
            )

        return cursor_kwargs

    def parse_sort_params(self, sort_params: str) -> Iterable[tuple[str, int]]:
        """Handles any sort parameters passed to the collection,
        resolving aliases and dealing with any invalid fields.

        Raises:
            BadRequest: if an invalid sort is requested.

        Returns:
            A list of tuples containing the aliased field name and
            sort direction encoded as 1 (ascending) or -1 (descending).

        """
        sort_spec: list[tuple[str, int]] = []
        for field in sort_params.split(","):
            sort_dir = 1
            if field.startswith("-"):
                field = field[1:]
                sort_dir = -1
            aliased_field = self.resource_mapper.get_backend_field(field)
            sort_spec.append((aliased_field, sort_dir))

        unknown_fields = [
            field
            for field, _ in sort_spec
            if self.resource_mapper.get_optimade_field(field) not in self.all_fields
        ]

        if unknown_fields:
            error_detail = "Unable to sort on unknown field{} '{}'".format(
                "s" if len(unknown_fields) > 1 else "",
                "', '".join(unknown_fields),
            )

            # If all unknown fields are "other" provider-specific, then only provide a warning
            if all(
                (
                    re.match(r"_[a-z_0-9]+_[a-z_0-9]*", field)
                    and not field.startswith(f"_{self.provider_prefix}_")
                )
                for field in unknown_fields
            ):
                warnings.warn(error_detail, FieldValueNotRecognized)

            # Otherwise, if all fields are unknown, or some fields are unknown and do not
            # have other provider prefixes, then return 400: Bad Request
            else:
                raise BadRequest(detail=error_detail)

        # If at least one valid field has been provided for sorting, then use that
        sort_spec = [
            (field, sort_dir)
            for field, sort_dir in sort_spec
            if field not in unknown_fields
        ]

        return sort_spec

    def get_next_query_params(
        self,
        params: EntryListingQueryParams,
        results: Optional[Union[dict[str, Any], list[dict[str, Any]]]],
    ) -> dict[str, list[str]]:
        """Provides url query pagination parameters that will be used in the next
        link.

        Arguments:
            results: The results produced by find.
            params: The parsed request params produced by handle_query_params.

        Returns:
            A dictionary with the necessary query parameters.

        """
        query: dict[str, list[str]] = dict()
        if isinstance(results, list) and results:
            # If a user passed a particular pagination mechanism, keep using it
            # Otherwise, use the default pagination mechanism of the collection
            pagination_mechanism = PaginationMechanism.OFFSET
            for pagination_key in (
                "page_offset",
                "page_number",
                "page_above",
            ):
                if getattr(params, pagination_key, None) is not None:
                    pagination_mechanism = PaginationMechanism(pagination_key)
                    break

            if pagination_mechanism == PaginationMechanism.OFFSET:
                query["page_offset"] = [
                    str(params.page_offset + len(results))  # type: ignore[list-item]
                ]

        return query

all_fields: set[str] property

Get the set of all fields handled in this collection, from attribute fields in the schema, provider fields and top-level OPTIMADE fields.

The set of all fields are lazily created and then cached. This means the set is created the first time the property is requested and then cached.

Returns:

Type Description
set[str]

All fields handled in this collection.

pagination_mechanism = PaginationMechanism('page_offset') class-attribute instance-attribute

The default pagination mechansim to use with a given collection, if the user does not provide any pagination query parameters.

__init__(resource_cls, resource_mapper, transformer)

Initialize the collection for the given parameters.

Parameters:

Name Type Description Default
resource_cls EntryResource

The EntryResource model that is stored by the collection.

required
resource_mapper BaseResourceMapper

A resource mapper object that handles aliases and format changes between deserialization and response.

required
transformer Transformer

The Lark Transformer used to interpret the filter.

required
Source code in optimade/server/entry_collections/entry_collections.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def __init__(
    self,
    resource_cls: type[EntryResource],
    resource_mapper: type[BaseResourceMapper],
    transformer: Transformer,
):
    """Initialize the collection for the given parameters.

    Parameters:
        resource_cls (EntryResource): The `EntryResource` model
            that is stored by the collection.
        resource_mapper (BaseResourceMapper): A resource mapper
            object that handles aliases and format changes between
            deserialization and response.
        transformer (Transformer): The Lark `Transformer` used to
            interpret the filter.

    """
    self.parser = LarkParser()
    self.resource_cls = resource_cls
    self.resource_mapper = resource_mapper
    self.transformer = transformer

    self.provider_prefix = CONFIG.provider.prefix
    self.provider_fields = [
        field if isinstance(field, str) else field["name"]
        for field in CONFIG.provider_fields.get(resource_mapper.ENDPOINT, [])
    ]

    self._all_fields: set[str] = set()

__len__() abstractmethod

Returns the total number of entries in the collection.

Source code in optimade/server/entry_collections/entry_collections.py
117
118
119
@abstractmethod
def __len__(self) -> int:
    """Returns the total number of entries in the collection."""

count(**kwargs) abstractmethod

Returns the number of entries matching the query specified by the keyword arguments.

Parameters:

Name Type Description Default
**kwargs Any

Query parameters as keyword arguments.

{}
Source code in optimade/server/entry_collections/entry_collections.py
130
131
132
133
134
135
136
137
138
@abstractmethod
def count(self, **kwargs: Any) -> Optional[int]:
    """Returns the number of entries matching the query specified
    by the keyword arguments.

    Parameters:
        **kwargs: Query parameters as keyword arguments.

    """

find(params)

Fetches results and indicates if more data is available.

Also gives the total number of data available in the absence of page_limit. See EntryListingQueryParams for more information.

Returns a list of the mapped database reponse.

If no results match the query, then results is set to None.

Parameters:

Name Type Description Default
params Union[EntryListingQueryParams, SingleEntryQueryParams]

Entry listing URL query params.

required

Returns:

Type Description
Optional[Union[dict[str, Any], list[dict[str, Any]]]]

A tuple of various relevant values:

Optional[int]

(results, data_returned, more_data_available, exclude_fields, include_fields).

Source code in optimade/server/entry_collections/entry_collections.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def find(
    self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
) -> tuple[
    Optional[Union[dict[str, Any], list[dict[str, Any]]]],
    Optional[int],
    bool,
    set[str],
    set[str],
]:
    """
    Fetches results and indicates if more data is available.

    Also gives the total number of data available in the absence of `page_limit`.
    See [`EntryListingQueryParams`][optimade.server.query_params.EntryListingQueryParams]
    for more information.

    Returns a list of the mapped database reponse.

    If no results match the query, then `results` is set to `None`.

    Parameters:
        params: Entry listing URL query params.

    Returns:
        A tuple of various relevant values:
        (`results`, `data_returned`, `more_data_available`, `exclude_fields`, `include_fields`).

    """
    criteria = self.handle_query_params(params)
    single_entry = isinstance(params, SingleEntryQueryParams)
    response_fields: set[str] = criteria.pop("fields")

    raw_results, data_returned, more_data_available = self._run_db_query(
        criteria, single_entry
    )

    exclude_fields = self.all_fields - response_fields
    include_fields = (
        response_fields - self.resource_mapper.TOP_LEVEL_NON_ATTRIBUTES_FIELDS
    )

    bad_optimade_fields: set[str] = set()
    bad_provider_fields: set[str] = set()
    supported_prefixes = self.resource_mapper.SUPPORTED_PREFIXES
    all_attributes: set[str] = self.resource_mapper.ALL_ATTRIBUTES
    for field in include_fields:
        if field not in all_attributes:
            if field.startswith("_"):
                if any(
                    field.startswith(f"_{prefix}_") for prefix in supported_prefixes
                ):
                    bad_provider_fields.add(field)
            else:
                bad_optimade_fields.add(field)

    if bad_provider_fields:
        warnings.warn(
            message=f"Unrecognised field(s) for this provider requested in `response_fields`: {bad_provider_fields}.",
            category=UnknownProviderProperty,
        )

    if bad_optimade_fields:
        raise BadRequest(
            detail=f"Unrecognised OPTIMADE field(s) in requested `response_fields`: {bad_optimade_fields}."
        )

    results: Optional[Union[list[dict[str, Any]], dict[str, Any]]] = None

    if raw_results:
        results = [self.resource_mapper.map_back(doc) for doc in raw_results]

        if single_entry:
            results = results[0]

            if (
                CONFIG.validate_api_response
                and data_returned is not None
                and data_returned > 1
            ):
                raise NotFound(
                    detail=f"Instead of a single entry, {data_returned} entries were found",
                )
            else:
                data_returned = 1

    return (
        results,
        data_returned,
        more_data_available,
        exclude_fields,
        include_fields,
    )

get_attribute_fields()

Get the set of attribute fields

Return only the first-level attribute fields from the schema of the resource class, resolving references along the way if needed.

Note

It is not needed to take care of other special OpenAPI schema keys than allOf, since only allOf will be found in this context. Other special keys can be found in the Swagger documentation.

Returns:

Type Description
set[str]

Property names.

Source code in optimade/server/entry_collections/entry_collections.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def get_attribute_fields(self) -> set[str]:
    """Get the set of attribute fields

    Return only the _first-level_ attribute fields from the schema of the resource class,
    resolving references along the way if needed.

    Note:
        It is not needed to take care of other special OpenAPI schema keys than `allOf`,
        since only `allOf` will be found in this context.
        Other special keys can be found in [the Swagger documentation](https://swagger.io/docs/specification/data-models/oneof-anyof-allof-not/).

    Returns:
        Property names.

    """
    annotation = _get_origin_type(
        self.resource_cls.model_fields["attributes"].annotation
    )

    if annotation in (None, NoneType) or not issubclass(annotation, Attributes):
        raise TypeError(
            "resource class 'attributes' field must be a subclass of 'EntryResourceAttributes'"
        )

    return set(annotation.model_fields)  # type: ignore[attr-defined]

get_next_query_params(params, results)

Provides url query pagination parameters that will be used in the next link.

Parameters:

Name Type Description Default
results Optional[Union[dict[str, Any], list[dict[str, Any]]]]

The results produced by find.

required
params EntryListingQueryParams

The parsed request params produced by handle_query_params.

required

Returns:

Type Description
dict[str, list[str]]

A dictionary with the necessary query parameters.

Source code in optimade/server/entry_collections/entry_collections.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def get_next_query_params(
    self,
    params: EntryListingQueryParams,
    results: Optional[Union[dict[str, Any], list[dict[str, Any]]]],
) -> dict[str, list[str]]:
    """Provides url query pagination parameters that will be used in the next
    link.

    Arguments:
        results: The results produced by find.
        params: The parsed request params produced by handle_query_params.

    Returns:
        A dictionary with the necessary query parameters.

    """
    query: dict[str, list[str]] = dict()
    if isinstance(results, list) and results:
        # If a user passed a particular pagination mechanism, keep using it
        # Otherwise, use the default pagination mechanism of the collection
        pagination_mechanism = PaginationMechanism.OFFSET
        for pagination_key in (
            "page_offset",
            "page_number",
            "page_above",
        ):
            if getattr(params, pagination_key, None) is not None:
                pagination_mechanism = PaginationMechanism(pagination_key)
                break

        if pagination_mechanism == PaginationMechanism.OFFSET:
            query["page_offset"] = [
                str(params.page_offset + len(results))  # type: ignore[list-item]
            ]

    return query

handle_query_params(params)

Parse and interpret the backend-agnostic query parameter models into a dictionary that can be used by the specific backend.

Note

Currently this method returns the pymongo interpretation of the parameters, which will need modification for modified for other backends.

Parameters:

Name Type Description Default
params Union[EntryListingQueryParams, SingleEntryQueryParams]

The initialized query parameter model from the server.

required

Raises:

Type Description
Forbidden

If too large of a page limit is provided.

BadRequest

If an invalid request is made, e.g., with incorrect fields or response format.

Returns:

Type Description
dict[str, Any]

A dictionary representation of the query parameters.

Source code in optimade/server/entry_collections/entry_collections.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def handle_query_params(
    self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
) -> dict[str, Any]:
    """Parse and interpret the backend-agnostic query parameter models into a dictionary
    that can be used by the specific backend.

    Note:
        Currently this method returns the pymongo interpretation of the parameters,
        which will need modification for modified for other backends.

    Parameters:
        params: The initialized query parameter model from the server.

    Raises:
        Forbidden: If too large of a page limit is provided.
        BadRequest: If an invalid request is made, e.g., with incorrect fields
            or response format.

    Returns:
        A dictionary representation of the query parameters.

    """
    cursor_kwargs = {}

    # filter
    if getattr(params, "filter", False):
        cursor_kwargs["filter"] = self.transformer.transform(
            self.parser.parse(params.filter)  # type: ignore[union-attr]
        )
    else:
        cursor_kwargs["filter"] = {}

    # response_format
    if (
        getattr(params, "response_format", False)
        and params.response_format != "json"
    ):
        raise BadRequest(
            detail=f"Response format {params.response_format} is not supported, please use response_format='json'"
        )

    # page_limit
    if getattr(params, "page_limit", False):
        limit = params.page_limit  # type: ignore[union-attr]
        if limit > CONFIG.page_limit_max:
            raise Forbidden(
                detail=f"Max allowed page_limit is {CONFIG.page_limit_max}, you requested {limit}",
            )
        cursor_kwargs["limit"] = limit
    else:
        cursor_kwargs["limit"] = CONFIG.page_limit

    # response_fields
    cursor_kwargs["projection"] = {
        f"{self.resource_mapper.get_backend_field(f)}": True
        for f in self.all_fields
    }

    if getattr(params, "response_fields", False):
        response_fields = set(params.response_fields.split(","))
        response_fields |= self.resource_mapper.get_required_fields()
    else:
        response_fields = self.all_fields.copy()

    cursor_kwargs["fields"] = response_fields

    # sort
    if getattr(params, "sort", False):
        cursor_kwargs["sort"] = self.parse_sort_params(params.sort)  # type: ignore[union-attr]

    # warn if multiple pagination keys are present, and only use the first from this list
    received_pagination_option = False
    warn_multiple_keys = False

    if getattr(params, "page_offset", False):
        received_pagination_option = True
        cursor_kwargs["skip"] = params.page_offset  # type: ignore[union-attr]

    if isinstance(getattr(params, "page_number", None), int):
        if received_pagination_option:
            warn_multiple_keys = True
        else:
            received_pagination_option = True
            if params.page_number < 1:  # type: ignore[union-attr]
                warnings.warn(
                    message=f"'page_number' is 1-based, using 'page_number=1' instead of {params.page_number}",  # type: ignore[union-attr]
                    category=QueryParamNotUsed,
                )
                page_number = 1
            else:
                page_number = params.page_number  # type: ignore[union-attr]
            cursor_kwargs["skip"] = (page_number - 1) * cursor_kwargs["limit"]

    if isinstance(getattr(params, "page_above", None), str):
        if received_pagination_option:
            warn_multiple_keys = True
        else:
            received_pagination_option = True
            cursor_kwargs["page_above"] = params.page_above  # type: ignore[union-attr]

    if warn_multiple_keys:
        warnings.warn(
            message="Multiple pagination keys were provided, only using the first one of 'page_offset', 'page_number' or 'page_above'",
            category=QueryParamNotUsed,
        )

    return cursor_kwargs

insert(data) abstractmethod

Add the given entries to the underlying database.

Parameters:

Name Type Description Default
data list[EntryResource]

The entry resource objects to add to the database.

required
Source code in optimade/server/entry_collections/entry_collections.py
121
122
123
124
125
126
127
128
@abstractmethod
def insert(self, data: list[EntryResource]) -> None:
    """Add the given entries to the underlying database.

    Arguments:
        data: The entry resource objects to add to the database.

    """

parse_sort_params(sort_params)

Handles any sort parameters passed to the collection, resolving aliases and dealing with any invalid fields.

Raises:

Type Description
BadRequest

if an invalid sort is requested.

Returns:

Type Description
Iterable[tuple[str, int]]

A list of tuples containing the aliased field name and

Iterable[tuple[str, int]]

sort direction encoded as 1 (ascending) or -1 (descending).

Source code in optimade/server/entry_collections/entry_collections.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def parse_sort_params(self, sort_params: str) -> Iterable[tuple[str, int]]:
    """Handles any sort parameters passed to the collection,
    resolving aliases and dealing with any invalid fields.

    Raises:
        BadRequest: if an invalid sort is requested.

    Returns:
        A list of tuples containing the aliased field name and
        sort direction encoded as 1 (ascending) or -1 (descending).

    """
    sort_spec: list[tuple[str, int]] = []
    for field in sort_params.split(","):
        sort_dir = 1
        if field.startswith("-"):
            field = field[1:]
            sort_dir = -1
        aliased_field = self.resource_mapper.get_backend_field(field)
        sort_spec.append((aliased_field, sort_dir))

    unknown_fields = [
        field
        for field, _ in sort_spec
        if self.resource_mapper.get_optimade_field(field) not in self.all_fields
    ]

    if unknown_fields:
        error_detail = "Unable to sort on unknown field{} '{}'".format(
            "s" if len(unknown_fields) > 1 else "",
            "', '".join(unknown_fields),
        )

        # If all unknown fields are "other" provider-specific, then only provide a warning
        if all(
            (
                re.match(r"_[a-z_0-9]+_[a-z_0-9]*", field)
                and not field.startswith(f"_{self.provider_prefix}_")
            )
            for field in unknown_fields
        ):
            warnings.warn(error_detail, FieldValueNotRecognized)

        # Otherwise, if all fields are unknown, or some fields are unknown and do not
        # have other provider prefixes, then return 400: Bad Request
        else:
            raise BadRequest(detail=error_detail)

    # If at least one valid field has been provided for sorting, then use that
    sort_spec = [
        (field, sort_dir)
        for field, sort_dir in sort_spec
        if field not in unknown_fields
    ]

    return sort_spec

EntryResource

Bases: Resource

The base model for an entry resource.

Source code in optimade/models/entries.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class EntryResource(Resource):
    """The base model for an entry resource."""

    id: Annotated[
        str,
        OptimadeField(
            description="""An entry's ID as defined in section Definition of Terms.

- **Type**: string.

- **Requirements/Conventions**:
    - **Support**: MUST be supported by all implementations, MUST NOT be `null`.
    - **Query**: MUST be a queryable property with support for all mandatory filter features.
    - **Response**: REQUIRED in the response.

- **Examples**:
    - `"db/1234567"`
    - `"cod/2000000"`
    - `"cod/2000000@1234567"`
    - `"nomad/L1234567890"`
    - `"42"`""",
            support=SupportLevel.MUST,
            queryable=SupportLevel.MUST,
        ),
    ]

    type: Annotated[
        str,
        OptimadeField(
            description="""The name of the type of an entry.

- **Type**: string.

- **Requirements/Conventions**:
    - **Support**: MUST be supported by all implementations, MUST NOT be `null`.
    - **Query**: MUST be a queryable property with support for all mandatory filter features.
    - **Response**: REQUIRED in the response.
    - MUST be an existing entry type.
    - The entry of type `<type>` and ID `<id>` MUST be returned in response to a request for `/<type>/<id>` under the versioned base URL.

- **Example**: `"structures"`""",
            support=SupportLevel.MUST,
            queryable=SupportLevel.MUST,
        ),
    ]

    attributes: Annotated[
        EntryResourceAttributes,
        StrictField(
            description="""A dictionary, containing key-value pairs representing the entry's properties, except for `type` and `id`.
Database-provider-specific properties need to include the database-provider-specific prefix (see section on Database-Provider-Specific Namespace Prefixes).""",
        ),
    ]

    relationships: Annotated[
        Optional[EntryRelationships],
        StrictField(
            description="""A dictionary containing references to other entries according to the description in section Relationships encoded as [JSON API Relationships](https://jsonapi.org/format/1.0/#document-resource-object-relationships).
The OPTIONAL human-readable description of the relationship MAY be provided in the `description` field inside the `meta` dictionary of the JSON API resource identifier object.""",
        ),
    ] = None

PaginationMechanism

Bases: Enum

The supported pagination mechanisms.

Source code in optimade/server/entry_collections/entry_collections.py
67
68
69
70
71
72
73
74
class PaginationMechanism(enum.Enum):
    """The supported pagination mechanisms."""

    OFFSET = "page_offset"
    NUMBER = "page_number"
    CURSOR = "page_cursor"
    ABOVE = "page_above"
    BELOW = "page_below"