Skip to content

elasticsearch

ElasticCollection

Bases: EntryCollection

Source code in optimade/server/entry_collections/elasticsearch.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
class ElasticCollection(EntryCollection):
    pagination_mechanism = PaginationMechanism("page_offset")

    def __init__(
        self,
        name: str,
        resource_cls: type[EntryResource],
        resource_mapper: type[BaseResourceMapper],
        client: Optional["Elasticsearch"] = None,
    ):
        """Initialize the ElasticCollection for the given parameters.

        Parameters:
            name: The name of the collection.
            resource_cls: The type of entry resource that is stored by the collection.
            resource_mapper: A resource mapper object that handles aliases and
                format changes between deserialization and response.
            client: A preconfigured Elasticsearch client.

        """
        super().__init__(
            resource_cls=resource_cls,
            resource_mapper=resource_mapper,
            transformer=ElasticTransformer(mapper=resource_mapper),
        )

        self.client = client if client else CLIENT
        self.name = name

        # If we are creating a new collection from scratch, also create the index,
        # otherwise assume it has already been created externally
        if CONFIG.insert_test_data:
            self.create_optimade_index()

    def count(self, *args, **kwargs) -> int:
        raise NotImplementedError

    def create_optimade_index(self) -> None:
        """Load or create an index that can handle aliased OPTIMADE fields and attach it
        to the current client.

        """
        body = self.predefined_index.get(self.name)
        if body is None:
            body = self.create_elastic_index_from_mapper(
                self.resource_mapper, self.all_fields
            )

        properties = {}
        for field in list(body["mappings"]["properties"].keys()):
            properties[self.resource_mapper.get_backend_field(field)] = body[
                "mappings"
            ]["properties"].pop(field)
        properties["id"] = {"type": "keyword"}
        body["mappings"]["properties"] = properties
        self.client.indices.create(index=self.name, ignore=400, **body)

        LOGGER.debug(f"Created Elastic index for {self.name!r} with parameters {body}")

    @property
    def predefined_index(self) -> dict[str, Any]:
        """Loads and returns the default pre-defined index."""
        with open(Path(__file__).parent.joinpath("elastic_indexes.json")) as f:
            index = json.load(f)
        return index

    @staticmethod
    def create_elastic_index_from_mapper(
        resource_mapper: type[BaseResourceMapper], fields: Iterable[str]
    ) -> dict[str, Any]:
        """Create a fallback elastic index based on a resource mapper.

        Arguments:
            resource_mapper: The resource mapper to create the index for.
            fields: The list of fields to use in the index.

        Returns:
            The parameters to pass to `client.indices.create(...)` (previously
                the 'body' parameters).

        """
        properties = {
            resource_mapper.get_optimade_field(field): {"type": "keyword"}
            for field in fields
        }
        properties["id"] = {"type": "keyword"}
        return {"mappings": {"properties": properties}}

    def __len__(self):
        """Returns the total number of entries in the collection."""
        return Search(using=self.client, index=self.name).execute().hits.total.value

    def insert(self, data: list[EntryResource]) -> None:
        """Add the given entries to the underlying database.

        Warning:
            No validation is performed on the incoming data.

        Arguments:
            data: The entry resource objects to add to the database.

        """

        def get_id(item):
            if self.name == "links":
                id_ = f"{item['id']}-{item['type']}"
            elif "id" in item:
                id_ = item["id"]
            elif "_id" in item:
                # use the existing MongoDB ids in the test data
                id_ = str(item["_id"])
            else:
                # ES will generate ids
                id_ = None
            item.pop("_id", None)
            return id_

        bulk(
            self.client,
            (
                {
                    "_index": self.name,
                    "_id": get_id(item),
                    "_source": item,
                }
                for item in data
            ),
        )

    def _run_db_query(
        self, criteria: dict[str, Any], single_entry=False
    ) -> tuple[list[dict[str, Any]], int, bool]:
        """Run the query on the backend and collect the results.

        Arguments:
            criteria: A dictionary representation of the query parameters.
            single_entry: Whether or not the caller is expecting a single entry response.

        Returns:
            The list of entries from the database (without any re-mapping), the total number of
            entries matching the query and a boolean for whether or not there is more data available.

        """

        search = Search(using=self.client, index=self.name)

        if criteria.get("filter", False):
            search = search.query(criteria["filter"])

        page_offset = criteria.get("skip", None)
        page_above = criteria.get("page_above", None)

        limit = criteria.get("limit", CONFIG.page_limit)

        all_aliased_fields = [
            self.resource_mapper.get_backend_field(field) for field in self.all_fields
        ]
        search = search.source(includes=all_aliased_fields)

        elastic_sort = [
            {field: {"order": "desc" if sort_dir == -1 else "asc"}}
            for field, sort_dir in criteria.get("sort", {})
        ]
        if not elastic_sort:
            elastic_sort = [
                {self.resource_mapper.get_backend_field("id"): {"order": "asc"}}
            ]

        search = search.sort(*elastic_sort)

        if page_offset:
            search = search[page_offset : page_offset + limit]

        elif page_above:
            search = search.extra(search_after=page_above, limit=limit)

        else:
            search = search[0:limit]
            page_offset = 0

        search = search.extra(track_total_hits=True)
        response = search.execute()

        results = [hit.to_dict() for hit in response.hits]

        more_data_available = False
        if not single_entry:
            data_returned = response.hits.total.value
            if page_above is not None:
                more_data_available = len(results) == limit and data_returned != limit
            else:
                more_data_available = page_offset + limit < data_returned
        else:
            # SingleEntryQueryParams, e.g., /structures/{entry_id}
            data_returned = len(results)

        return results, data_returned, more_data_available

all_fields: set[str] property

Get the set of all fields handled in this collection, from attribute fields in the schema, provider fields and top-level OPTIMADE fields.

The set of all fields are lazily created and then cached. This means the set is created the first time the property is requested and then cached.

Returns:

Type Description
set[str]

All fields handled in this collection.

predefined_index: dict[str, Any] property

Loads and returns the default pre-defined index.

__init__(name, resource_cls, resource_mapper, client=None)

Initialize the ElasticCollection for the given parameters.

Parameters:

Name Type Description Default
name str

The name of the collection.

required
resource_cls type[EntryResource]

The type of entry resource that is stored by the collection.

required
resource_mapper type[BaseResourceMapper]

A resource mapper object that handles aliases and format changes between deserialization and response.

required
client Optional[Elasticsearch]

A preconfigured Elasticsearch client.

None
Source code in optimade/server/entry_collections/elasticsearch.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    name: str,
    resource_cls: type[EntryResource],
    resource_mapper: type[BaseResourceMapper],
    client: Optional["Elasticsearch"] = None,
):
    """Initialize the ElasticCollection for the given parameters.

    Parameters:
        name: The name of the collection.
        resource_cls: The type of entry resource that is stored by the collection.
        resource_mapper: A resource mapper object that handles aliases and
            format changes between deserialization and response.
        client: A preconfigured Elasticsearch client.

    """
    super().__init__(
        resource_cls=resource_cls,
        resource_mapper=resource_mapper,
        transformer=ElasticTransformer(mapper=resource_mapper),
    )

    self.client = client if client else CLIENT
    self.name = name

    # If we are creating a new collection from scratch, also create the index,
    # otherwise assume it has already been created externally
    if CONFIG.insert_test_data:
        self.create_optimade_index()

__len__()

Returns the total number of entries in the collection.

Source code in optimade/server/entry_collections/elasticsearch.py
110
111
112
def __len__(self):
    """Returns the total number of entries in the collection."""
    return Search(using=self.client, index=self.name).execute().hits.total.value

create_elastic_index_from_mapper(resource_mapper, fields) staticmethod

Create a fallback elastic index based on a resource mapper.

Parameters:

Name Type Description Default
resource_mapper type[BaseResourceMapper]

The resource mapper to create the index for.

required
fields Iterable[str]

The list of fields to use in the index.

required

Returns:

Type Description
dict[str, Any]

The parameters to pass to client.indices.create(...) (previously the 'body' parameters).

Source code in optimade/server/entry_collections/elasticsearch.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@staticmethod
def create_elastic_index_from_mapper(
    resource_mapper: type[BaseResourceMapper], fields: Iterable[str]
) -> dict[str, Any]:
    """Create a fallback elastic index based on a resource mapper.

    Arguments:
        resource_mapper: The resource mapper to create the index for.
        fields: The list of fields to use in the index.

    Returns:
        The parameters to pass to `client.indices.create(...)` (previously
            the 'body' parameters).

    """
    properties = {
        resource_mapper.get_optimade_field(field): {"type": "keyword"}
        for field in fields
    }
    properties["id"] = {"type": "keyword"}
    return {"mappings": {"properties": properties}}

create_optimade_index()

Load or create an index that can handle aliased OPTIMADE fields and attach it to the current client.

Source code in optimade/server/entry_collections/elasticsearch.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def create_optimade_index(self) -> None:
    """Load or create an index that can handle aliased OPTIMADE fields and attach it
    to the current client.

    """
    body = self.predefined_index.get(self.name)
    if body is None:
        body = self.create_elastic_index_from_mapper(
            self.resource_mapper, self.all_fields
        )

    properties = {}
    for field in list(body["mappings"]["properties"].keys()):
        properties[self.resource_mapper.get_backend_field(field)] = body[
            "mappings"
        ]["properties"].pop(field)
    properties["id"] = {"type": "keyword"}
    body["mappings"]["properties"] = properties
    self.client.indices.create(index=self.name, ignore=400, **body)

    LOGGER.debug(f"Created Elastic index for {self.name!r} with parameters {body}")

find(params)

Fetches results and indicates if more data is available.

Also gives the total number of data available in the absence of page_limit. See EntryListingQueryParams for more information.

Returns a list of the mapped database reponse.

If no results match the query, then results is set to None.

Parameters:

Name Type Description Default
params Union[EntryListingQueryParams, SingleEntryQueryParams]

Entry listing URL query params.

required

Returns:

Type Description
Optional[Union[dict[str, Any], list[dict[str, Any]]]]

A tuple of various relevant values:

Optional[int]

(results, data_returned, more_data_available, exclude_fields, include_fields).

Source code in optimade/server/entry_collections/entry_collections.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def find(
    self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
) -> tuple[
    Optional[Union[dict[str, Any], list[dict[str, Any]]]],
    Optional[int],
    bool,
    set[str],
    set[str],
]:
    """
    Fetches results and indicates if more data is available.

    Also gives the total number of data available in the absence of `page_limit`.
    See [`EntryListingQueryParams`][optimade.server.query_params.EntryListingQueryParams]
    for more information.

    Returns a list of the mapped database reponse.

    If no results match the query, then `results` is set to `None`.

    Parameters:
        params: Entry listing URL query params.

    Returns:
        A tuple of various relevant values:
        (`results`, `data_returned`, `more_data_available`, `exclude_fields`, `include_fields`).

    """
    criteria = self.handle_query_params(params)
    single_entry = isinstance(params, SingleEntryQueryParams)
    response_fields: set[str] = criteria.pop("fields")

    raw_results, data_returned, more_data_available = self._run_db_query(
        criteria, single_entry
    )

    exclude_fields = self.all_fields - response_fields
    include_fields = (
        response_fields - self.resource_mapper.TOP_LEVEL_NON_ATTRIBUTES_FIELDS
    )

    bad_optimade_fields: set[str] = set()
    bad_provider_fields: set[str] = set()
    supported_prefixes = self.resource_mapper.SUPPORTED_PREFIXES
    all_attributes: set[str] = self.resource_mapper.ALL_ATTRIBUTES
    for field in include_fields:
        if field not in all_attributes:
            if field.startswith("_"):
                if any(
                    field.startswith(f"_{prefix}_") for prefix in supported_prefixes
                ):
                    bad_provider_fields.add(field)
            else:
                bad_optimade_fields.add(field)

    if bad_provider_fields:
        warnings.warn(
            message=f"Unrecognised field(s) for this provider requested in `response_fields`: {bad_provider_fields}.",
            category=UnknownProviderProperty,
        )

    if bad_optimade_fields:
        raise BadRequest(
            detail=f"Unrecognised OPTIMADE field(s) in requested `response_fields`: {bad_optimade_fields}."
        )

    results: Optional[Union[list[dict[str, Any]], dict[str, Any]]] = None

    if raw_results:
        results = [self.resource_mapper.map_back(doc) for doc in raw_results]

        if single_entry:
            results = results[0]

            if (
                CONFIG.validate_api_response
                and data_returned is not None
                and data_returned > 1
            ):
                raise NotFound(
                    detail=f"Instead of a single entry, {data_returned} entries were found",
                )
            else:
                data_returned = 1

    return (
        results,
        data_returned,
        more_data_available,
        exclude_fields,
        include_fields,
    )

get_attribute_fields()

Get the set of attribute fields

Return only the first-level attribute fields from the schema of the resource class, resolving references along the way if needed.

Note

It is not needed to take care of other special OpenAPI schema keys than allOf, since only allOf will be found in this context. Other special keys can be found in the Swagger documentation.

Returns:

Type Description
set[str]

Property names.

Source code in optimade/server/entry_collections/entry_collections.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def get_attribute_fields(self) -> set[str]:
    """Get the set of attribute fields

    Return only the _first-level_ attribute fields from the schema of the resource class,
    resolving references along the way if needed.

    Note:
        It is not needed to take care of other special OpenAPI schema keys than `allOf`,
        since only `allOf` will be found in this context.
        Other special keys can be found in [the Swagger documentation](https://swagger.io/docs/specification/data-models/oneof-anyof-allof-not/).

    Returns:
        Property names.

    """
    annotation = _get_origin_type(
        self.resource_cls.model_fields["attributes"].annotation
    )

    if annotation in (None, NoneType) or not issubclass(annotation, Attributes):
        raise TypeError(
            "resource class 'attributes' field must be a subclass of 'EntryResourceAttributes'"
        )

    return set(annotation.model_fields)  # type: ignore[attr-defined]

get_next_query_params(params, results)

Provides url query pagination parameters that will be used in the next link.

Parameters:

Name Type Description Default
results Optional[Union[dict[str, Any], list[dict[str, Any]]]]

The results produced by find.

required
params EntryListingQueryParams

The parsed request params produced by handle_query_params.

required

Returns:

Type Description
dict[str, list[str]]

A dictionary with the necessary query parameters.

Source code in optimade/server/entry_collections/entry_collections.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def get_next_query_params(
    self,
    params: EntryListingQueryParams,
    results: Optional[Union[dict[str, Any], list[dict[str, Any]]]],
) -> dict[str, list[str]]:
    """Provides url query pagination parameters that will be used in the next
    link.

    Arguments:
        results: The results produced by find.
        params: The parsed request params produced by handle_query_params.

    Returns:
        A dictionary with the necessary query parameters.

    """
    query: dict[str, list[str]] = dict()
    if isinstance(results, list) and results:
        # If a user passed a particular pagination mechanism, keep using it
        # Otherwise, use the default pagination mechanism of the collection
        pagination_mechanism = PaginationMechanism.OFFSET
        for pagination_key in (
            "page_offset",
            "page_number",
            "page_above",
        ):
            if getattr(params, pagination_key, None) is not None:
                pagination_mechanism = PaginationMechanism(pagination_key)
                break

        if pagination_mechanism == PaginationMechanism.OFFSET:
            query["page_offset"] = [
                str(params.page_offset + len(results))  # type: ignore[list-item]
            ]

    return query

handle_query_params(params)

Parse and interpret the backend-agnostic query parameter models into a dictionary that can be used by the specific backend.

Note

Currently this method returns the pymongo interpretation of the parameters, which will need modification for modified for other backends.

Parameters:

Name Type Description Default
params Union[EntryListingQueryParams, SingleEntryQueryParams]

The initialized query parameter model from the server.

required

Raises:

Type Description
Forbidden

If too large of a page limit is provided.

BadRequest

If an invalid request is made, e.g., with incorrect fields or response format.

Returns:

Type Description
dict[str, Any]

A dictionary representation of the query parameters.

Source code in optimade/server/entry_collections/entry_collections.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def handle_query_params(
    self, params: Union[EntryListingQueryParams, SingleEntryQueryParams]
) -> dict[str, Any]:
    """Parse and interpret the backend-agnostic query parameter models into a dictionary
    that can be used by the specific backend.

    Note:
        Currently this method returns the pymongo interpretation of the parameters,
        which will need modification for modified for other backends.

    Parameters:
        params: The initialized query parameter model from the server.

    Raises:
        Forbidden: If too large of a page limit is provided.
        BadRequest: If an invalid request is made, e.g., with incorrect fields
            or response format.

    Returns:
        A dictionary representation of the query parameters.

    """
    cursor_kwargs = {}

    # filter
    if getattr(params, "filter", False):
        cursor_kwargs["filter"] = self.transformer.transform(
            self.parser.parse(params.filter)  # type: ignore[union-attr]
        )
    else:
        cursor_kwargs["filter"] = {}

    # response_format
    if (
        getattr(params, "response_format", False)
        and params.response_format != "json"
    ):
        raise BadRequest(
            detail=f"Response format {params.response_format} is not supported, please use response_format='json'"
        )

    # page_limit
    if getattr(params, "page_limit", False):
        limit = params.page_limit  # type: ignore[union-attr]
        if limit > CONFIG.page_limit_max:
            raise Forbidden(
                detail=f"Max allowed page_limit is {CONFIG.page_limit_max}, you requested {limit}",
            )
        cursor_kwargs["limit"] = limit
    else:
        cursor_kwargs["limit"] = CONFIG.page_limit

    # response_fields
    cursor_kwargs["projection"] = {
        f"{self.resource_mapper.get_backend_field(f)}": True
        for f in self.all_fields
    }

    if getattr(params, "response_fields", False):
        response_fields = set(params.response_fields.split(","))
        response_fields |= self.resource_mapper.get_required_fields()
    else:
        response_fields = self.all_fields.copy()

    cursor_kwargs["fields"] = response_fields

    # sort
    if getattr(params, "sort", False):
        cursor_kwargs["sort"] = self.parse_sort_params(params.sort)  # type: ignore[union-attr]

    # warn if multiple pagination keys are present, and only use the first from this list
    received_pagination_option = False
    warn_multiple_keys = False

    if getattr(params, "page_offset", False):
        received_pagination_option = True
        cursor_kwargs["skip"] = params.page_offset  # type: ignore[union-attr]

    if isinstance(getattr(params, "page_number", None), int):
        if received_pagination_option:
            warn_multiple_keys = True
        else:
            received_pagination_option = True
            if params.page_number < 1:  # type: ignore[union-attr]
                warnings.warn(
                    message=f"'page_number' is 1-based, using 'page_number=1' instead of {params.page_number}",  # type: ignore[union-attr]
                    category=QueryParamNotUsed,
                )
                page_number = 1
            else:
                page_number = params.page_number  # type: ignore[union-attr]
            cursor_kwargs["skip"] = (page_number - 1) * cursor_kwargs["limit"]

    if isinstance(getattr(params, "page_above", None), str):
        if received_pagination_option:
            warn_multiple_keys = True
        else:
            received_pagination_option = True
            cursor_kwargs["page_above"] = params.page_above  # type: ignore[union-attr]

    if warn_multiple_keys:
        warnings.warn(
            message="Multiple pagination keys were provided, only using the first one of 'page_offset', 'page_number' or 'page_above'",
            category=QueryParamNotUsed,
        )

    return cursor_kwargs

insert(data)

Add the given entries to the underlying database.

Warning

No validation is performed on the incoming data.

Parameters:

Name Type Description Default
data list[EntryResource]

The entry resource objects to add to the database.

required
Source code in optimade/server/entry_collections/elasticsearch.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def insert(self, data: list[EntryResource]) -> None:
    """Add the given entries to the underlying database.

    Warning:
        No validation is performed on the incoming data.

    Arguments:
        data: The entry resource objects to add to the database.

    """

    def get_id(item):
        if self.name == "links":
            id_ = f"{item['id']}-{item['type']}"
        elif "id" in item:
            id_ = item["id"]
        elif "_id" in item:
            # use the existing MongoDB ids in the test data
            id_ = str(item["_id"])
        else:
            # ES will generate ids
            id_ = None
        item.pop("_id", None)
        return id_

    bulk(
        self.client,
        (
            {
                "_index": self.name,
                "_id": get_id(item),
                "_source": item,
            }
            for item in data
        ),
    )

parse_sort_params(sort_params)

Handles any sort parameters passed to the collection, resolving aliases and dealing with any invalid fields.

Raises:

Type Description
BadRequest

if an invalid sort is requested.

Returns:

Type Description
Iterable[tuple[str, int]]

A list of tuples containing the aliased field name and

Iterable[tuple[str, int]]

sort direction encoded as 1 (ascending) or -1 (descending).

Source code in optimade/server/entry_collections/entry_collections.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def parse_sort_params(self, sort_params: str) -> Iterable[tuple[str, int]]:
    """Handles any sort parameters passed to the collection,
    resolving aliases and dealing with any invalid fields.

    Raises:
        BadRequest: if an invalid sort is requested.

    Returns:
        A list of tuples containing the aliased field name and
        sort direction encoded as 1 (ascending) or -1 (descending).

    """
    sort_spec: list[tuple[str, int]] = []
    for field in sort_params.split(","):
        sort_dir = 1
        if field.startswith("-"):
            field = field[1:]
            sort_dir = -1
        aliased_field = self.resource_mapper.get_backend_field(field)
        sort_spec.append((aliased_field, sort_dir))

    unknown_fields = [
        field
        for field, _ in sort_spec
        if self.resource_mapper.get_optimade_field(field) not in self.all_fields
    ]

    if unknown_fields:
        error_detail = "Unable to sort on unknown field{} '{}'".format(
            "s" if len(unknown_fields) > 1 else "",
            "', '".join(unknown_fields),
        )

        # If all unknown fields are "other" provider-specific, then only provide a warning
        if all(
            (
                re.match(r"_[a-z_0-9]+_[a-z_0-9]*", field)
                and not field.startswith(f"_{self.provider_prefix}_")
            )
            for field in unknown_fields
        ):
            warnings.warn(error_detail, FieldValueNotRecognized)

        # Otherwise, if all fields are unknown, or some fields are unknown and do not
        # have other provider prefixes, then return 400: Bad Request
        else:
            raise BadRequest(detail=error_detail)

    # If at least one valid field has been provided for sorting, then use that
    sort_spec = [
        (field, sort_dir)
        for field, sort_dir in sort_spec
        if field not in unknown_fields
    ]

    return sort_spec