12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188 | class BaseResourceMapper:
"""Instance-based Resource Mapper.
Create one instance per CONFIG (and optionally per providers set).
Subclasses still set class-level constants like ENTRY_RESOURCE_CLASS.
"""
# class-level knobs remain
ALIASES: tuple[tuple[str, str], ...] = ()
LENGTH_ALIASES: tuple[tuple[str, str], ...] = ()
PROVIDER_FIELDS: tuple[str, ...] = ()
ENTRY_RESOURCE_CLASS: type["EntryResource"] = EntryResource
RELATIONSHIP_ENTRY_TYPES: set[str] = {"references", "structures"}
TOP_LEVEL_NON_ATTRIBUTES_FIELDS: set[str] = {"id", "type", "relationships", "links"}
def __init__(self, config: ServerConfig | None = None):
"""
Args:
config: Server CONFIG-like object (must expose:
.provider.prefix, .provider_fields, .aliases, .length_aliases)
"""
if config is None:
config = ServerConfig()
self.config = config
try:
from optimade.server.data import providers as PROVIDERS # type: ignore
except (ImportError, ModuleNotFoundError):
PROVIDERS = {}
self.providers = PROVIDERS
self.KNOWN_PROVIDER_PREFIXES: set[str] = {
prov["id"] for prov in self.providers.get("data", [])
}
# ---- Computed, cached once per instance ----
@cached_property
def ENDPOINT(self) -> Literal["links", "references", "structures"]:
endpoint = self.ENTRY_RESOURCE_CLASS.model_fields["type"].default
if not endpoint or not isinstance(endpoint, str):
raise ValueError("Type not set for this entry type!")
return endpoint
@cached_property
def SUPPORTED_PREFIXES(self) -> set[str]:
return {self.config.provider.prefix}
@cached_property
def all_aliases(self) -> tuple[tuple[str, str], ...]:
cfg = self.config
ep = self.ENDPOINT
provider_fields = cfg.provider_fields.get(ep) or []
provider_field_aliases_str = tuple(
(f"_{cfg.provider.prefix}_{field}", field)
if not field.startswith("_")
else (field, field)
for field in provider_fields
if isinstance(field, str)
)
provider_field_aliases_dict = tuple(
(f"_{cfg.provider.prefix}_{fd['name']}", fd["name"])
if not fd["name"].startswith("_")
else (fd["name"], fd["name"])
for fd in provider_fields
if isinstance(fd, dict)
)
explicit_provider_fields = tuple(
(f"_{cfg.provider.prefix}_{field}", field)
if not field.startswith("_")
else (field, field)
for field in self.PROVIDER_FIELDS
)
config_aliases = tuple(cfg.aliases.get(ep, {}).items())
return (
provider_field_aliases_str
+ provider_field_aliases_dict
+ explicit_provider_fields
+ config_aliases
+ self.ALIASES
)
@cached_property
def all_length_aliases(self) -> tuple[tuple[str, str], ...]:
return self.LENGTH_ALIASES + tuple(
self.config.length_aliases.get(self.ENDPOINT, {}).items()
)
@cached_property
def ENTRY_RESOURCE_ATTRIBUTES_MAP(self) -> dict[str, Any]:
from optimade.server.schemas import retrieve_queryable_properties
return retrieve_queryable_properties(self.ENTRY_RESOURCE_CLASS)
@cached_property
def ALL_ATTRIBUTES(self) -> set[str]:
cfg = self.config
ep = self.ENDPOINT
pf = cfg.provider_fields.get(ep, ())
attrs = set(self.ENTRY_RESOURCE_ATTRIBUTES_MAP)
attrs.update(
self.get_optimade_field(field) for field in pf if isinstance(field, str)
)
attrs.update(
self.get_optimade_field(field["name"])
for field in pf
if isinstance(field, dict)
)
attrs.update(self.get_optimade_field(field) for field in self.PROVIDER_FIELDS)
return attrs
# ---- Instance methods that use the cached properties ----
def length_alias_for(self, field: str) -> str | None:
return dict(self.all_length_aliases).get(field)
def get_backend_field(self, optimade_field: str) -> str:
split = optimade_field.split(".")
alias = dict(self.all_aliases).get(split[0])
if alias is not None:
return alias + ("." + ".".join(split[1:]) if len(split) > 1 else "")
return optimade_field
def get_optimade_field(self, backend_field: str) -> str:
return {alias: real for real, alias in self.all_aliases}.get(
backend_field, backend_field
)
def alias_for(self, field: str) -> str:
warnings.warn(
"`.alias_for(...)` is deprecated; use `.get_backend_field(...)`.",
DeprecationWarning,
)
return self.get_backend_field(field)
def alias_of(self, field: str) -> str:
warnings.warn(
"`.alias_of(...)` is deprecated; use `.get_optimade_field(...)`.",
DeprecationWarning,
)
return self.get_optimade_field(field)
def get_required_fields(self) -> set[str]:
return self.TOP_LEVEL_NON_ATTRIBUTES_FIELDS
def map_back(self, doc: dict) -> dict:
mapping = ((real, alias) for alias, real in self.all_aliases)
newdoc = {}
reals = {real for _, real in self.all_aliases}
for key in doc:
if key not in reals:
newdoc[key] = doc[key]
for real, alias in mapping:
if real in doc:
newdoc[alias] = doc[real]
if "attributes" in newdoc:
raise Exception("Will overwrite doc field!")
attributes = newdoc.copy()
for field in self.TOP_LEVEL_NON_ATTRIBUTES_FIELDS:
value = attributes.pop(field, None)
if value is not None:
newdoc[field] = value
for field in list(newdoc.keys()):
if field not in self.TOP_LEVEL_NON_ATTRIBUTES_FIELDS:
del newdoc[field]
newdoc["type"] = self.ENDPOINT
newdoc["attributes"] = attributes
return newdoc
def deserialize(self, results: dict | Iterable[dict]):
if isinstance(results, dict):
return self.ENTRY_RESOURCE_CLASS(**self.map_back(results))
return [self.ENTRY_RESOURCE_CLASS(**self.map_back(doc)) for doc in results]
|