Skip to content

mongo

This submodule implements the MongoTransformer, which takes the parsed filter and converts it to a valid pymongo/BSON query.

MongoTransformer (BaseTransformer)

A filter transformer for the MongoDB backend.

Parses a lark tree into a dictionary representation to be used by pymongo or mongomock. Uses post-processing functions to handle some specific edge-cases for MongoDB.

Attributes:

Name Type Description
operator_map Dict[str, Optional[str]]

A map from comparison operators to the mongoDB specific versions.

inverse_operator_map

A map from operators to their logical inverse.

mapper Optional[Type[optimade.server.mappers.entries.BaseResourceMapper]]

A resource mapper object that defines the expected fields and acts as a container for various field-related configuration.

Source code in optimade/filtertransformers/mongo.py
class MongoTransformer(BaseTransformer):
    """A filter transformer for the MongoDB backend.

    Parses a lark tree into a dictionary representation to be
    used by pymongo or mongomock. Uses post-processing functions
    to handle some specific edge-cases for MongoDB.

    Attributes:
        operator_map: A map from comparison operators
            to the mongoDB specific versions.
        inverse_operator_map: A map from operators to their
            logical inverse.
        mapper: A resource mapper object that defines the
            expected fields and acts as a container for
            various field-related configuration.

    """

    operator_map = {
        "<": "$lt",
        "<=": "$lte",
        ">": "$gt",
        ">=": "$gte",
        "!=": "$ne",
        "=": "$eq",
    }

    inverse_operator_map = {
        "$lt": "$gte",
        "$lte": "$gt",
        "$gt": "$lte",
        "$gte": "$lt",
        "$ne": "$eq",
        "$eq": "$ne",
        "$in": "$nin",
        "$nin": "$in",
    }

    def postprocess(self, query: Dict[str, Any]):
        """Used to post-process the nested dictionary of the parsed query."""
        query = self._apply_relationship_filtering(query)
        query = self._apply_length_operators(query)
        query = self._apply_unknown_or_null_filter(query)
        query = self._apply_has_only_filter(query)
        query = self._apply_mongo_id_filter(query)
        query = self._apply_mongo_date_filter(query)
        return query

    def value_list(self, arg):
        # value_list: [ OPERATOR ] value ( "," [ OPERATOR ] value )*
        # NOTE: no support for optional OPERATOR, yet, so this takes the
        # parsed values and returns an error if that is being attempted
        for value in arg:
            if str(value) in self.operator_map.keys():
                raise NotImplementedError(
                    f"OPERATOR {value} inside value_list {arg} not implemented."
                )

        return arg

    def value_zip(self, arg):
        # value_zip: [ OPERATOR ] value ":" [ OPERATOR ] value (":" [ OPERATOR ] value)*
        raise NotImplementedError("Correlated list queries are not supported.")

    def value_zip_list(self, arg):
        # value_zip_list: value_zip ( "," value_zip )*
        raise NotImplementedError("Correlated list queries are not supported.")

    def expression(self, arg):
        # expression: expression_clause ( OR expression_clause )
        # expression with and without 'OR'
        return {"$or": arg} if len(arg) > 1 else arg[0]

    def expression_clause(self, arg):
        # expression_clause: expression_phrase ( AND expression_phrase )*
        # expression_clause with and without 'AND'
        return {"$and": arg} if len(arg) > 1 else arg[0]

    def expression_phrase(self, arg):
        # expression_phrase: [ NOT ] ( comparison | "(" expression ")" )
        return self._recursive_expression_phrase(arg)

    @v_args(inline=True)
    def property_first_comparison(self, quantity, query):
        # property_first_comparison: property ( value_op_rhs | known_op_rhs | fuzzy_string_op_rhs | set_op_rhs |
        # set_zip_op_rhs | length_op_rhs )

        # Awkwardly, MongoDB will match null fields in $ne filters,
        # so we need to add a check for null equality in evey $ne query.
        if "$ne" in query:
            return {"$and": [{quantity: query}, {quantity: {"$ne": None}}]}

        # Check if a $size query is being made (indicating a length_op_rhs filter); if so, check for
        # a defined length alias to replace the $size call with the corresponding filter on the
        # length quantity then carefully merge the two queries.
        #
        # e.g. `("elements", {"$size": 2, "$all": ["Ag", "Au"]})` should become
        # `{"elements": {"$all": ["Ag", "Au"]}, "nelements": 2}` if the `elements` -> `nelements`
        # length alias is defined.
        if "$size" in query:
            if (
                getattr(self.backend_mapping.get(quantity), "length_quantity", None)
                is not None
            ):
                size_query = {
                    self.backend_mapping[
                        quantity
                    ].length_quantity.backend_field: query.pop("$size")
                }

                final_query = {}
                if query:
                    final_query = {quantity: query}
                for q in size_query:
                    if q in final_query:
                        final_query[q].update(size_query[q])
                    else:
                        final_query[q] = size_query[q]

                return final_query

        return {quantity: query}

    def constant_first_comparison(self, arg):
        # constant_first_comparison: constant OPERATOR ( non_string_value | not_implemented_string )
        return self.property_first_comparison(
            arg[2], {self.operator_map[self._reversed_operator_map[arg[1]]]: arg[0]}
        )

    @v_args(inline=True)
    def value_op_rhs(self, operator, value):
        # value_op_rhs: OPERATOR value
        return {self.operator_map[operator]: value}

    def known_op_rhs(self, arg):
        # known_op_rhs: IS ( KNOWN | UNKNOWN )
        # The OPTIMADE spec also required a type comparison with null, this must be post-processed
        # so here we use a special key "#known" which will get replaced in post-processing with the
        # expanded dict
        return {"#known": arg[1] == "KNOWN"}

    def fuzzy_string_op_rhs(self, arg):
        # fuzzy_string_op_rhs: CONTAINS value | STARTS [ WITH ] value | ENDS [ WITH ] value

        # The WITH keyword may be omitted.
        if isinstance(arg[1], Token) and arg[1].type == "WITH":
            pattern = arg[2]
        else:
            pattern = arg[1]

        # CONTAINS
        if arg[0] == "CONTAINS":
            regex = f"{pattern}"
        elif arg[0] == "STARTS":
            regex = f"^{pattern}"
        elif arg[0] == "ENDS":
            regex = f"{pattern}$"
        return {"$regex": regex}

    def set_op_rhs(self, arg):
        # set_op_rhs: HAS ( [ OPERATOR ] value | ALL value_list | ANY value_list | ONLY value_list )

        if len(arg) == 2:
            # only value without OPERATOR
            return {"$in": arg[1:]}

        if arg[1] == "ALL":
            return {"$all": arg[2]}

        if arg[1] == "ANY":
            return {"$in": arg[2]}

        if arg[1] == "ONLY":
            return {"#only": arg[2]}

        # value with OPERATOR
        raise NotImplementedError(
            f"set_op_rhs not implemented for use with OPERATOR. Given: {arg}"
        )

    def property(self, args):
        # property: IDENTIFIER ( "." IDENTIFIER )*
        quantity = super().property(args)
        if isinstance(quantity, Quantity):
            quantity = quantity.backend_field

        return ".".join([quantity] + args[1:])

    def length_op_rhs(self, arg):
        # length_op_rhs: LENGTH [ OPERATOR ] value
        if len(arg) == 2 or (len(arg) == 3 and arg[1] == "="):
            return {"$size": arg[-1]}

        if arg[1] in self.operator_map and arg[1] != "!=":
            # create an invalid query that needs to be post-processed
            # e.g. {'$size': {'$gt': 2}}, which is not allowed by Mongo.
            return {"$size": {self.operator_map[arg[1]]: arg[-1]}}

        raise NotImplementedError(
            f"Operator {arg[1]} not implemented for LENGTH filter."
        )

    def set_zip_op_rhs(self, arg):
        # set_zip_op_rhs: property_zip_addon HAS ( value_zip | ONLY value_zip_list | ALL value_zip_list |
        # ANY value_zip_list )
        raise NotImplementedError("Correlated list queries are not supported.")

    def property_zip_addon(self, arg):
        # property_zip_addon: ":" property (":" property)*
        raise NotImplementedError("Correlated list queries are not supported.")

    def _recursive_expression_phrase(self, arg: List) -> Dict[str, Any]:
        """Helper function for parsing `expression_phrase`. Recursively sorts out
        the correct precedence for `$not`, `$and` and `$or`.

        Parameters:
            arg: A list containing the expression to be evaluated and whether it
                is negated, e.g., `["NOT", expr]` or just `[expr]`.

        Returns:
             The evaluated filter as a nested dictionary.

        """

        def handle_not_and(arg: Dict[str, List]) -> Dict[str, List]:
            """Handle the case of `~(A & B) -> (~A | ~B)`.

            We have to check for the special case in which the "and" was created
            by a previous NOT, e.g.,
            `NOT (NOT ({"a": {"$eq": 6}})) -> NOT({"$and": [{"a": {"$ne": 6}},{"a": {"$ne": None}}]})`

            Parameters:
                arg: A dictionary with key `"$and"` containing a list of expressions.

            Returns:
                A dictionary with key `"$or"` containing a list of the appropriate negated expressions.
            """

            expr1 = arg["$and"][0]
            expr2 = arg["$and"][1]
            if expr1.keys() == expr2.keys():
                key = list(expr1.keys())[0]
                for e, f in itertools.permutations((expr1, expr2)):
                    if e.get(key) == {"$ne": None}:
                        return self._recursive_expression_phrase(["NOT", f])

            return {
                "$or": [
                    self._recursive_expression_phrase(["NOT", subdict])
                    for subdict in arg["$and"]
                ]
            }

        def handle_not_or(arg: Dict[str, List]) -> Dict[str, List]:
            """Handle the case of ~(A | B) -> (~A & ~B).

            !!! note
            Although the MongoDB `$nor` could be used here, it is not convenient as it
            will also return documents where the filtered field is missing when testing
            for inequality.

            Parameters:
                arg: A dictionary with key `"$or"` containing a list of expressions.

            Returns:
                A dictionary with key `"$and"` that lists the appropriate negated expressions.
            """

            return {
                "$and": [
                    self._recursive_expression_phrase(["NOT", subdict])
                    for subdict in arg["$or"]
                ]
            }

        if len(arg) == 1:
            # without NOT
            return arg[0]

        if "$or" in arg[1]:
            return handle_not_or(arg[1])

        if "$and" in arg[1]:
            return handle_not_and(arg[1])

        prop, expr = next(iter(arg[1].items()))
        operator, value = next(iter(expr.items()))
        if operator == "$not":  # Case of double negation e.g. NOT("$not":{ ...})
            return {prop: value}

        # If the NOT operator occurs at the lowest nesting level,
        # the expression can be simplified by using the opposite operator and removing the not.
        if operator in self.inverse_operator_map:
            filter_ = {prop: {self.inverse_operator_map[operator]: value}}
            if operator in ("$in", "$eq"):
                filter_ = {"$and": [filter_, {prop: {"$ne": None}}]}  # type: ignore[dict-item]
            return filter_

        filter_ = {prop: {"$not": expr}}
        if "#known" in expr:
            return filter_
        return {"$and": [filter_, {prop: {"$ne": None}}]}

    def _apply_length_operators(self, filter_: dict) -> dict:
        """Check for any invalid pymongo queries that involve applying a
        comparison operator to the length of a field, and transform
        them into a test for existence of the relevant entry, e.g.
        "list LENGTH > 3" becomes "does the 4th list entry exist?".

        """

        def check_for_length_op_filter(_, expr):
            return (
                isinstance(expr, dict)
                and "$size" in expr
                and isinstance(expr["$size"], dict)
            )

        def apply_length_op(subdict, prop, expr):
            # assumes that the dictionary only has one element by design
            # (we just made it above in the transformer)
            operator, value = list(expr["$size"].items())[0]
            if operator in self.operator_map.values() and operator != "$ne":
                # worth being explicit here, I think
                _prop = None
                existence = None
                if operator == "$gt":
                    _prop = f"{prop}.{value + 1}"
                    existence = True
                elif operator == "$gte":
                    _prop = f"{prop}.{value}"
                    existence = True
                elif operator == "$lt":
                    _prop = f"{prop}.{value}"
                    existence = False
                elif operator == "$lte":
                    _prop = f"{prop}.{value + 1}"
                    existence = False
                if _prop is not None:
                    subdict.pop(prop)
                    subdict[_prop] = {"$exists": existence}

            return subdict

        return recursive_postprocessing(
            filter_,
            check_for_length_op_filter,
            apply_length_op,
        )

    def _apply_relationship_filtering(self, filter_: dict) -> dict:
        """Check query for property names that match the entry
        types, and transform them as relationship filters rather than
        property filters.

        """

        def check_for_entry_type(prop, _):
            return str(prop).count(".") == 1 and str(prop).split(".")[0] in (
                "structures",
                "references",
            )

        def replace_with_relationship(subdict, prop, expr):
            _prop, _field = str(prop).split(".")
            if _field != "id":
                raise NotImplementedError(
                    f'Cannot filter relationships by field "{_field}", only "id" is supported.'
                )

            subdict[f"relationships.{_prop}.data.{_field}"] = expr
            subdict.pop(prop)
            return subdict

        return recursive_postprocessing(
            filter_, check_for_entry_type, replace_with_relationship
        )

    def _apply_has_only_filter(self, filter_: dict) -> dict:
        """This method loops through the query and replaces the magic key `"#only"`
        with the proper 'HAS ONLY' query.
        """

        def check_for_only_filter(_, expr):
            """Find cases where the magic key `"#only"` is in the query."""
            return isinstance(expr, dict) and ("#only" in expr)

        def replace_only_filter(subdict: dict, prop: str, expr: dict):
            """Replace the magic key `"#only"` (added by this transformer) with an `$elemMatch`-based query.

            The first part of the query selects all the documents that contain any value that does not
            match any target values for the property `prop`.
            Subsequently, this selection is inverted, to get the documents that only have
            the allowed values.
            This inversion also selects documents with edge-case values such as null or empty lists;
            these are removed in the second part of the query that makes sure that only documents
            with lists that have at least one value are selected.

            """

            if "$and" not in subdict:
                subdict["$and"] = []

            if prop.startswith("relationships."):
                if prop not in (
                    "relationships.references.data.id",
                    "relationships.structures.data.id",
                ):
                    raise BadRequest(f"Unable to query on unrecognised field {prop}.")
                first_part_prop = ".".join(prop.split(".")[:-1])
                subdict["$and"].append(
                    {
                        first_part_prop: {
                            "$not": {"$elemMatch": {"id": {"$nin": expr["#only"]}}}
                        }
                    }
                )
                subdict["$and"].append({first_part_prop + ".0": {"$exists": True}})

            else:
                subdict["$and"].append(
                    {prop: {"$not": {"$elemMatch": {"$nin": expr["#only"]}}}}
                )
                subdict["$and"].append({prop + ".0": {"$exists": True}})

            subdict.pop(prop)
            return subdict

        return recursive_postprocessing(
            filter_, check_for_only_filter, replace_only_filter
        )

    def _apply_unknown_or_null_filter(self, filter_: dict) -> dict:
        """This method loops through the query and replaces the check for
        KNOWN with a check for existence and a check for not null, and the
        inverse for UNKNOWN.

        """

        def check_for_known_filter(_, expr):
            """Find cases where the query dict looks like
            `{"field": {"#known": T/F}}` or
            `{"field": "$not": {"#known": T/F}}`, which is a magic word
            for KNOWN/UNKNOWN filters in this transformer.

            """
            return isinstance(expr, dict) and (
                "#known" in expr or "#known" in expr.get("$not", {})
            )

        def replace_known_filter_with_or(subdict, prop, expr):
            """Replace magic key `"#known"` (added by this transformer) with the appropriate
            combination of `$exists` and/or test for nullity.
            combination of $exists and/or $eq/$ne null.

            """
            not_ = set(expr.keys()) == {"$not"}
            if not_:
                expr = expr["$not"]

            exists = expr["#known"] ^ not_

            top_level_key = "$or"
            comparison_operator = "$eq"
            if exists:
                top_level_key = "$and"
                comparison_operator = "$ne"

            if top_level_key not in subdict:
                subdict[top_level_key] = []

            subdict[top_level_key].append({prop: {"$exists": exists}})
            subdict[top_level_key].append({prop: {comparison_operator: None}})

            subdict.pop(prop)

            return subdict

        return recursive_postprocessing(
            filter_, check_for_known_filter, replace_known_filter_with_or
        )

    def _apply_mongo_id_filter(self, filter_: dict) -> dict:
        """This method loops through the query and replaces any operations
        on the special Mongodb `_id` key with the corresponding operation
        on a BSON `ObjectId` type.
        """

        def check_for_id_key(prop, _):
            """Find cases where the query dict is operating on the `_id` field."""
            return prop == "_id"

        def replace_str_id_with_objectid(subdict, prop, expr):
            from bson import ObjectId

            for operator in subdict[prop]:
                val = subdict[prop][operator]
                if operator not in ("$eq", "$ne"):
                    if self.mapper is not None:
                        prop = self.mapper.get_optimade_field(prop)
                    raise NotImplementedError(
                        f"Operator {operator} not supported for query on field {prop!r}, can only test for equality"
                    )
                if isinstance(val, str):
                    subdict[prop][operator] = ObjectId(val)
            return subdict

        return recursive_postprocessing(
            filter_, check_for_id_key, replace_str_id_with_objectid
        )

    def _apply_mongo_date_filter(self, filter_: dict) -> dict:
        """This method loops through the query and replaces any operations
        on suspected timestamp properties with the corresponding operation
        on a BSON `DateTime` type.
        """

        def check_for_timestamp_field(prop, _):
            """Find cases where the query dict is operating on a timestamp field."""
            if self.mapper is not None:
                prop = self.mapper.get_optimade_field(prop)
            return prop == "last_modified"

        def replace_str_date_with_datetime(subdict, prop, expr):
            """Encode suspected dates in with BSON."""
            import bson.json_util

            for operator in subdict[prop]:
                query_datetime = bson.json_util.loads(
                    bson.json_util.dumps({"$date": subdict[prop][operator]}),
                    json_options=bson.json_util.DEFAULT_JSON_OPTIONS.with_options(
                        tz_aware=True, tzinfo=bson.tz_util.utc
                    ),
                )
                if query_datetime.microsecond != 0:
                    warnings.warn(
                        f"Query for timestamp {subdict[prop][operator]!r} for field {prop!r} contained microseconds, which is not RFC3339 compliant. "
                        "This may cause undefined behaviour for the underlying database.",
                        TimestampNotRFCCompliant,
                    )

                subdict[prop][operator] = query_datetime

            return subdict

        return recursive_postprocessing(
            filter_, check_for_timestamp_field, replace_str_date_with_datetime
        )

constant_first_comparison(self, arg)

constant_first_comparison: constant OPERATOR ( non_string_value | not_implemented_string )

Source code in optimade/filtertransformers/mongo.py
def constant_first_comparison(self, arg):
    # constant_first_comparison: constant OPERATOR ( non_string_value | not_implemented_string )
    return self.property_first_comparison(
        arg[2], {self.operator_map[self._reversed_operator_map[arg[1]]]: arg[0]}
    )

expression(self, arg)

expression: expression_clause ( OR expression_clause )

Source code in optimade/filtertransformers/mongo.py
def expression(self, arg):
    # expression: expression_clause ( OR expression_clause )
    # expression with and without 'OR'
    return {"$or": arg} if len(arg) > 1 else arg[0]

expression_clause(self, arg)

expression_clause: expression_phrase ( AND expression_phrase )*

Source code in optimade/filtertransformers/mongo.py
def expression_clause(self, arg):
    # expression_clause: expression_phrase ( AND expression_phrase )*
    # expression_clause with and without 'AND'
    return {"$and": arg} if len(arg) > 1 else arg[0]

expression_phrase(self, arg)

expression_phrase: [ NOT ] ( comparison | "(" expression ")" )

Source code in optimade/filtertransformers/mongo.py
def expression_phrase(self, arg):
    # expression_phrase: [ NOT ] ( comparison | "(" expression ")" )
    return self._recursive_expression_phrase(arg)

fuzzy_string_op_rhs(self, arg)

fuzzy_string_op_rhs: CONTAINS value | STARTS [ WITH ] value | ENDS [ WITH ] value

Source code in optimade/filtertransformers/mongo.py
def fuzzy_string_op_rhs(self, arg):
    # fuzzy_string_op_rhs: CONTAINS value | STARTS [ WITH ] value | ENDS [ WITH ] value

    # The WITH keyword may be omitted.
    if isinstance(arg[1], Token) and arg[1].type == "WITH":
        pattern = arg[2]
    else:
        pattern = arg[1]

    # CONTAINS
    if arg[0] == "CONTAINS":
        regex = f"{pattern}"
    elif arg[0] == "STARTS":
        regex = f"^{pattern}"
    elif arg[0] == "ENDS":
        regex = f"{pattern}$"
    return {"$regex": regex}

known_op_rhs(self, arg)

known_op_rhs: IS ( KNOWN | UNKNOWN )

Source code in optimade/filtertransformers/mongo.py
def known_op_rhs(self, arg):
    # known_op_rhs: IS ( KNOWN | UNKNOWN )
    # The OPTIMADE spec also required a type comparison with null, this must be post-processed
    # so here we use a special key "#known" which will get replaced in post-processing with the
    # expanded dict
    return {"#known": arg[1] == "KNOWN"}

length_op_rhs(self, arg)

length_op_rhs: LENGTH [ OPERATOR ] value

Source code in optimade/filtertransformers/mongo.py
def length_op_rhs(self, arg):
    # length_op_rhs: LENGTH [ OPERATOR ] value
    if len(arg) == 2 or (len(arg) == 3 and arg[1] == "="):
        return {"$size": arg[-1]}

    if arg[1] in self.operator_map and arg[1] != "!=":
        # create an invalid query that needs to be post-processed
        # e.g. {'$size': {'$gt': 2}}, which is not allowed by Mongo.
        return {"$size": {self.operator_map[arg[1]]: arg[-1]}}

    raise NotImplementedError(
        f"Operator {arg[1]} not implemented for LENGTH filter."
    )

postprocess(self, query)

Used to post-process the nested dictionary of the parsed query.

Source code in optimade/filtertransformers/mongo.py
def postprocess(self, query: Dict[str, Any]):
    """Used to post-process the nested dictionary of the parsed query."""
    query = self._apply_relationship_filtering(query)
    query = self._apply_length_operators(query)
    query = self._apply_unknown_or_null_filter(query)
    query = self._apply_has_only_filter(query)
    query = self._apply_mongo_id_filter(query)
    query = self._apply_mongo_date_filter(query)
    return query

property(self, args)

property: IDENTIFIER ( "." IDENTIFIER )*

If this transformer has an associated mapper, the property will be compared to possible relationship entry types and for any supported provider prefixes. If there is a match, this rule will return a string and not a dereferenced Quantity.

Exceptions:

Type Description
BadRequest

If the property does not match any of the above rules.

Source code in optimade/filtertransformers/mongo.py
def property(self, args):
    # property: IDENTIFIER ( "." IDENTIFIER )*
    quantity = super().property(args)
    if isinstance(quantity, Quantity):
        quantity = quantity.backend_field

    return ".".join([quantity] + args[1:])

property_zip_addon(self, arg)

property_zip_addon: ":" property (":" property)*

Source code in optimade/filtertransformers/mongo.py
def property_zip_addon(self, arg):
    # property_zip_addon: ":" property (":" property)*
    raise NotImplementedError("Correlated list queries are not supported.")

set_op_rhs(self, arg)

set_op_rhs: HAS ( [ OPERATOR ] value | ALL value_list | ANY value_list | ONLY value_list )

Source code in optimade/filtertransformers/mongo.py
def set_op_rhs(self, arg):
    # set_op_rhs: HAS ( [ OPERATOR ] value | ALL value_list | ANY value_list | ONLY value_list )

    if len(arg) == 2:
        # only value without OPERATOR
        return {"$in": arg[1:]}

    if arg[1] == "ALL":
        return {"$all": arg[2]}

    if arg[1] == "ANY":
        return {"$in": arg[2]}

    if arg[1] == "ONLY":
        return {"#only": arg[2]}

    # value with OPERATOR
    raise NotImplementedError(
        f"set_op_rhs not implemented for use with OPERATOR. Given: {arg}"
    )

set_zip_op_rhs(self, arg)

set_zip_op_rhs: property_zip_addon HAS ( value_zip | ONLY value_zip_list | ALL value_zip_list | ANY value_zip_list )

Source code in optimade/filtertransformers/mongo.py
def set_zip_op_rhs(self, arg):
    # set_zip_op_rhs: property_zip_addon HAS ( value_zip | ONLY value_zip_list | ALL value_zip_list |
    # ANY value_zip_list )
    raise NotImplementedError("Correlated list queries are not supported.")

value_list(self, arg)

value_list: [ OPERATOR ] value ( "," [ OPERATOR ] value )*

Source code in optimade/filtertransformers/mongo.py
def value_list(self, arg):
    # value_list: [ OPERATOR ] value ( "," [ OPERATOR ] value )*
    # NOTE: no support for optional OPERATOR, yet, so this takes the
    # parsed values and returns an error if that is being attempted
    for value in arg:
        if str(value) in self.operator_map.keys():
            raise NotImplementedError(
                f"OPERATOR {value} inside value_list {arg} not implemented."
            )

    return arg

value_zip(self, arg)

value_zip: [ OPERATOR ] value ":" [ OPERATOR ] value (":" [ OPERATOR ] value)*

Source code in optimade/filtertransformers/mongo.py
def value_zip(self, arg):
    # value_zip: [ OPERATOR ] value ":" [ OPERATOR ] value (":" [ OPERATOR ] value)*
    raise NotImplementedError("Correlated list queries are not supported.")

value_zip_list(self, arg)

value_zip_list: value_zip ( "," value_zip )*

Source code in optimade/filtertransformers/mongo.py
def value_zip_list(self, arg):
    # value_zip_list: value_zip ( "," value_zip )*
    raise NotImplementedError("Correlated list queries are not supported.")

recursive_postprocessing(filter_, condition, replacement)

Recursively descend into the query, checking each dictionary (contained in a list, or as an entry in another dictionary) for the condition passed. If the condition is true, apply the replacement to the dictionary.

Parameters:

Name Type Description Default
filter_

the filter_ to process.

required
condition callable

a function that returns True if the replacement function should be applied. It should take as arguments the property and expression from the filter_, as would be returned by iterating over filter_.items().

required
replacement callable

a function that returns the processed dictionary. It should take as arguments the dictionary to modify, the property and the expression (as described above).

required

Examples:

For the simple case of replacing one field name with another, the following functions could be used:

def condition(prop, expr):
    return prop == "field_name_old"

def replacement(d, prop, expr):
    d["field_name_old"] = d.pop(prop)

filter_ = recursive_postprocessing(
    filter_, condition, replacement
)
Source code in optimade/filtertransformers/mongo.py
def recursive_postprocessing(filter_: Union[Dict, List], condition, replacement):
    """Recursively descend into the query, checking each dictionary
    (contained in a list, or as an entry in another dictionary) for
    the condition passed. If the condition is true, apply the
    replacement to the dictionary.

    Parameters:
        filter_ : the filter_ to process.
        condition (callable): a function that returns True if the
            replacement function should be applied. It should take
            as arguments the property and expression from the filter_,
            as would be returned by iterating over `filter_.items()`.
        replacement (callable): a function that returns the processed
            dictionary. It should take as arguments the dictionary
            to modify, the property and the expression (as described
            above).

    Example:
        For the simple case of replacing one field name with
        another, the following functions could be used:

        ```python
        def condition(prop, expr):
            return prop == "field_name_old"

        def replacement(d, prop, expr):
            d["field_name_old"] = d.pop(prop)

        filter_ = recursive_postprocessing(
            filter_, condition, replacement
        )

        ```

    """
    if isinstance(filter_, list):
        result = [recursive_postprocessing(q, condition, replacement) for q in filter_]
        return result

    if isinstance(filter_, dict):
        # this could potentially lead to memory leaks if the filter_ is *heavily* nested
        _cached_filter = copy.deepcopy(filter_)
        for prop, expr in filter_.items():
            if condition(prop, expr):
                _cached_filter = replacement(_cached_filter, prop, expr)
            elif isinstance(expr, list):
                _cached_filter[prop] = [
                    recursive_postprocessing(q, condition, replacement) for q in expr
                ]
        return _cached_filter

    return filter_