Skip to content

elasticsearch

ElasticCollection

Source code in optimade/server/entry_collections/elasticsearch.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class ElasticCollection(EntryCollection):
    def __init__(
        self,
        name: str,
        resource_cls: EntryResource,
        resource_mapper: BaseResourceMapper,
        client: Optional["Elasticsearch"] = None,
    ):
        """Initialize the ElasticCollection for the given parameters.

        Parameters:
            name: The name of the collection.
            resource_cls: The type of entry resource that is stored by the collection.
            resource_mapper: A resource mapper object that handles aliases and
                format changes between deserialization and response.
            client: A preconfigured Elasticsearch client.

        """
        super().__init__(
            resource_cls=resource_cls,
            resource_mapper=resource_mapper,
            transformer=ElasticTransformer(mapper=resource_mapper),
        )

        self.client = client if client else CLIENT
        self.name = name

        # If we are creating a new collection from scratch, also create the index,
        # otherwise assume it has already been created externally
        if CONFIG.insert_test_data:
            self.create_optimade_index()

    def count(self, *args, **kwargs) -> int:
        raise NotImplementedError

    def create_optimade_index(self) -> None:
        """Load or create an index that can handle aliased OPTIMADE fields and attach it
        to the current client.

        """
        body = self.predefined_index.get(self.name)
        if body is None:
            body = self.create_elastic_index_from_mapper(
                self.resource_mapper, self.all_fields
            )

        properties = {}
        for field in list(body["mappings"]["properties"].keys()):
            properties[self.resource_mapper.get_backend_field(field)] = body[
                "mappings"
            ]["properties"].pop(field)
        properties["id"] = {"type": "keyword"}
        body["mappings"]["properties"] = properties
        self.client.indices.create(index=self.name, body=body, ignore=400)

        LOGGER.debug(f"Created Elastic index for {self.name!r} with body {body}")

    @property
    def predefined_index(self) -> Dict[str, Any]:
        """Loads and returns the default pre-defined index."""
        with open(Path(__file__).parent.joinpath("elastic_indexes.json")) as f:
            index = json.load(f)
        return index

    @staticmethod
    def create_elastic_index_from_mapper(
        resource_mapper: BaseResourceMapper, fields: Iterable[str]
    ) -> Dict[str, Any]:
        """Create a fallback elastic index based on a resource mapper.

        Arguments:
            resource_mapper: The resource mapper to create the index for.
            fields: The list of fields to use in the index.

        Returns:
            The `body` parameter to pass to `client.indices.create(..., body=...)`.

        """
        properties = {
            resource_mapper.get_optimade_field(field): {"type": "keyword"}
            for field in fields
        }
        properties["id"] = {"type": "keyword"}
        return {"mappings": {"properties": properties}}

    def __len__(self):
        """Returns the total number of entries in the collection."""
        return Search(using=self.client, index=self.name).execute().hits.total.value

    def insert(self, data: List[EntryResource]) -> None:
        """Add the given entries to the underlying database.

        Warning:
            No validation is performed on the incoming data.

        Arguments:
            data: The entry resource objects to add to the database.

        """

        def get_id(item):
            if self.name == "links":
                id_ = "%s-%s" % (item["id"], item["type"])
            elif "id" in item:
                id_ = item["id"]
            elif "_id" in item:
                # use the existing MongoDB ids in the test data
                id_ = str(item["_id"])
            else:
                # ES will generate ids
                id_ = None
            item.pop("_id", None)
            return id_

        bulk(
            self.client,
            [
                {
                    "_index": self.name,
                    "_id": get_id(item),
                    "_type": "_doc",
                    "_source": item,
                }
                for item in data
            ],
        )

    def _run_db_query(
        self, criteria: Dict[str, Any], single_entry=False
    ) -> Tuple[Union[List[Dict[str, Any]], Dict[str, Any]], int, bool]:
        """Run the query on the backend and collect the results.

        Arguments:
            criteria: A dictionary representation of the query parameters.
            single_entry: Whether or not the caller is expecting a single entry response.

        Returns:
            The list of entries from the database (without any re-mapping), the total number of
            entries matching the query and a boolean for whether or not there is more data available.

        """

        search = Search(using=self.client, index=self.name)

        if criteria.get("filter", False):
            search = search.query(criteria["filter"])

        page_offset = criteria.get("skip", 0)
        limit = criteria.get("limit", CONFIG.page_limit)

        all_aliased_fields = [
            self.resource_mapper.get_backend_field(field) for field in self.all_fields
        ]
        search = search.source(includes=all_aliased_fields)

        elastic_sort = [
            {field: {"order": "desc" if sort_dir == -1 else "asc"}}
            for field, sort_dir in criteria.get("sort", {})
        ]
        if not elastic_sort:
            elastic_sort = {
                self.resource_mapper.get_backend_field("id"): {"order": "asc"}
            }

        search = search.sort(*elastic_sort)

        search = search[page_offset : page_offset + limit]
        search = search.extra(track_total_hits=True)
        response = search.execute()

        results = [hit.to_dict() for hit in response.hits]

        if not single_entry:
            data_returned = response.hits.total.value
            more_data_available = page_offset + limit < data_returned
        else:
            # SingleEntryQueryParams, e.g., /structures/{entry_id}
            data_returned = len(results)
            more_data_available = False

        return results, data_returned, more_data_available

__init__(name, resource_cls, resource_mapper, client=None)

Initialize the ElasticCollection for the given parameters.

Parameters:

Name Type Description Default
name str

The name of the collection.

required
resource_cls EntryResource

The type of entry resource that is stored by the collection.

required
resource_mapper BaseResourceMapper

A resource mapper object that handles aliases and format changes between deserialization and response.

required
client Optional[Elasticsearch]

A preconfigured Elasticsearch client.

None
Source code in optimade/server/entry_collections/elasticsearch.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    name: str,
    resource_cls: EntryResource,
    resource_mapper: BaseResourceMapper,
    client: Optional["Elasticsearch"] = None,
):
    """Initialize the ElasticCollection for the given parameters.

    Parameters:
        name: The name of the collection.
        resource_cls: The type of entry resource that is stored by the collection.
        resource_mapper: A resource mapper object that handles aliases and
            format changes between deserialization and response.
        client: A preconfigured Elasticsearch client.

    """
    super().__init__(
        resource_cls=resource_cls,
        resource_mapper=resource_mapper,
        transformer=ElasticTransformer(mapper=resource_mapper),
    )

    self.client = client if client else CLIENT
    self.name = name

    # If we are creating a new collection from scratch, also create the index,
    # otherwise assume it has already been created externally
    if CONFIG.insert_test_data:
        self.create_optimade_index()

__len__()

Returns the total number of entries in the collection.

Source code in optimade/server/entry_collections/elasticsearch.py
107
108
109
def __len__(self):
    """Returns the total number of entries in the collection."""
    return Search(using=self.client, index=self.name).execute().hits.total.value

create_elastic_index_from_mapper(resource_mapper, fields) staticmethod

Create a fallback elastic index based on a resource mapper.

Parameters:

Name Type Description Default
resource_mapper BaseResourceMapper

The resource mapper to create the index for.

required
fields Iterable[str]

The list of fields to use in the index.

required

Returns:

Type Description
Dict[str, Any]

The body parameter to pass to client.indices.create(..., body=...).

Source code in optimade/server/entry_collections/elasticsearch.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@staticmethod
def create_elastic_index_from_mapper(
    resource_mapper: BaseResourceMapper, fields: Iterable[str]
) -> Dict[str, Any]:
    """Create a fallback elastic index based on a resource mapper.

    Arguments:
        resource_mapper: The resource mapper to create the index for.
        fields: The list of fields to use in the index.

    Returns:
        The `body` parameter to pass to `client.indices.create(..., body=...)`.

    """
    properties = {
        resource_mapper.get_optimade_field(field): {"type": "keyword"}
        for field in fields
    }
    properties["id"] = {"type": "keyword"}
    return {"mappings": {"properties": properties}}

create_optimade_index()

Load or create an index that can handle aliased OPTIMADE fields and attach it to the current client.

Source code in optimade/server/entry_collections/elasticsearch.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def create_optimade_index(self) -> None:
    """Load or create an index that can handle aliased OPTIMADE fields and attach it
    to the current client.

    """
    body = self.predefined_index.get(self.name)
    if body is None:
        body = self.create_elastic_index_from_mapper(
            self.resource_mapper, self.all_fields
        )

    properties = {}
    for field in list(body["mappings"]["properties"].keys()):
        properties[self.resource_mapper.get_backend_field(field)] = body[
            "mappings"
        ]["properties"].pop(field)
    properties["id"] = {"type": "keyword"}
    body["mappings"]["properties"] = properties
    self.client.indices.create(index=self.name, body=body, ignore=400)

    LOGGER.debug(f"Created Elastic index for {self.name!r} with body {body}")

insert(data)

Add the given entries to the underlying database.

Warning

No validation is performed on the incoming data.

Parameters:

Name Type Description Default
data List[EntryResource]

The entry resource objects to add to the database.

required
Source code in optimade/server/entry_collections/elasticsearch.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def insert(self, data: List[EntryResource]) -> None:
    """Add the given entries to the underlying database.

    Warning:
        No validation is performed on the incoming data.

    Arguments:
        data: The entry resource objects to add to the database.

    """

    def get_id(item):
        if self.name == "links":
            id_ = "%s-%s" % (item["id"], item["type"])
        elif "id" in item:
            id_ = item["id"]
        elif "_id" in item:
            # use the existing MongoDB ids in the test data
            id_ = str(item["_id"])
        else:
            # ES will generate ids
            id_ = None
        item.pop("_id", None)
        return id_

    bulk(
        self.client,
        [
            {
                "_index": self.name,
                "_id": get_id(item),
                "_type": "_doc",
                "_source": item,
            }
            for item in data
        ],
    )

predefined_index() property

Loads and returns the default pre-defined index.

Source code in optimade/server/entry_collections/elasticsearch.py
79
80
81
82
83
84
@property
def predefined_index(self) -> Dict[str, Any]:
    """Loads and returns the default pre-defined index."""
    with open(Path(__file__).parent.joinpath("elastic_indexes.json")) as f:
        index = json.load(f)
    return index