41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541 | class ImplementationValidator:
"""Class used to make a series of checks against a particular
OPTIMADE implementation over HTTP.
Uses the pydantic models in [`optimade.models`][optimade.models] to
validate the response from the server and crawl through the
available endpoints.
Attributes:
valid: whether or not the implementation was deemed valid, with
`None` signifying that tests did not run.
Caution:
Only works for current version of the specification as defined
by [`optimade.models`][optimade.models].
"""
valid: Optional[bool]
def __init__(
self,
client: Optional[Any] = None,
base_url: Optional[str] = None,
verbosity: int = 0,
respond_json: bool = False,
page_limit: int = 4,
max_retries: int = 5,
run_optional_tests: bool = True,
fail_fast: bool = False,
as_type: Optional[str] = None,
index: bool = False,
minimal: bool = False,
http_headers: Optional[dict[str, str]] = None,
timeout: float = DEFAULT_CONN_TIMEOUT,
read_timeout: float = DEFAULT_READ_TIMEOUT,
):
"""Set up the tests to run, based on constants in this module
for required endpoints.
Arguments:
client: A client that has a `.get()` method to obtain the
response from the implementation. If `None`, then
[`Client`][optimade.validator.utils.Client] will be used.
base_url: The URL of the implementation to validate. Unless
performing "as_type" validation, this should point to the
base of the OPTIMADE implementation.
verbosity: The verbosity of the output and logging as an integer
(`0`: critical, `1`: warning, `2`: info, `3`: debug).
respond_json: If `True`, print only a JSON representation of the
results of validation to stdout.
page_limit: The default page limit to apply to filters.
max_retries: Argument is passed to the client for how many
attempts to make for a request before failing.
run_optional_tests: Whether to run the tests on optional
OPTIMADE features.
fail_fast: Whether to exit validation after the first failure
of a mandatory test.
as_type: An OPTIMADE entry or endpoint type to coerce the response
from implementation into, e.g. "structures". Requires `base_url`
to be pointed to the corresponding endpoint.
index: Whether to validate the implementation as an index meta-database.
minimal: Whether or not to run only a minimal test set.
http_headers: Dictionary of additional headers to add to every request.
timeout: The connection timeout to use for all requests (in seconds).
read_timeout: The read timeout to use for all requests (in seconds).
"""
self.verbosity = verbosity
self.max_retries = max_retries
self.page_limit = page_limit
self.index = index
self.run_optional_tests = run_optional_tests
self.fail_fast = fail_fast
self.respond_json = respond_json
self.minimal = minimal
if as_type is None:
self.as_type_cls = None
elif self.index:
if as_type not in CONF.response_classes_index:
raise RuntimeError(
f"Provided as_type='{as_type}' not allowed for an Index meta-database."
)
self.as_type_cls = CONF.response_classes_index[as_type]
elif as_type in ("structure", "reference"):
self.as_type_cls = CONF.response_classes[f"{as_type}s/"]
else:
self.as_type_cls = CONF.response_classes[as_type]
if client is None and base_url is None:
raise RuntimeError(
"Need at least a URL or a client to initialize validator."
)
if base_url and client:
raise RuntimeError("Please specify at most one of base_url or client.")
if client:
self.client = client
self.base_url = self.client.base_url
# If a custom client has been provided, try to set custom headers if they have been specified,
# but do not overwrite any existing attributes
if http_headers:
if not hasattr(self.client, "headers"):
self.client.headers = http_headers
else:
print_warning(
f"Not using specified request headers {http_headers} with custom client {self.client}."
)
else:
while base_url.endswith("/"): # type: ignore[union-attr]
base_url = base_url[:-1] # type: ignore[index]
self.base_url = base_url
self.client = Client(
self.base_url, # type: ignore[arg-type]
max_retries=self.max_retries,
headers=http_headers,
timeout=timeout,
read_timeout=read_timeout,
)
self._setup_log()
self._response_classes = (
CONF.response_classes_index if self.index else CONF.response_classes
)
# some simple checks on base_url
self.base_url = str(self.base_url)
self.base_url_parsed = urllib.parse.urlparse(self.base_url)
# only allow filters/endpoints if we are working in "as_type" mode
if self.as_type_cls is None and self.base_url_parsed.query:
raise SystemExit(
f"Base URL {self.base_url} not appropriate: should not contain a filter."
)
self.valid = None
self._test_id_by_type: dict[str, Any] = {}
self._entry_info_by_type: dict[str, Any] = {}
self.results = ValidatorResults(verbosity=self.verbosity)
def _setup_log(self):
"""Define stdout log based on given verbosity."""
self._log = logging.getLogger("optimade").getChild("validator")
self._log.handlers = []
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s | %(levelname)8s: %(message)s")
)
if not self.respond_json:
self._log.addHandler(stdout_handler)
else:
self.verbosity = -1
if self.verbosity == 0:
self._log.setLevel(logging.CRITICAL)
elif self.verbosity == 1:
self._log.setLevel(logging.WARNING)
elif self.verbosity == 2:
self._log.setLevel(logging.INFO)
elif self.verbosity > 0:
self._log.setLevel(logging.DEBUG)
def print_summary(self):
"""Print a summary of the results of validation."""
if self.respond_json:
print(json.dumps(dataclasses.asdict(self.results), indent=2))
return
if self.results.failure_messages:
print("\n\nFAILURES")
print("========\n")
for message in self.results.failure_messages:
print_failure(message[0])
for line in message[1].split("\n"):
print_warning("\t" + line)
if self.results.optional_failure_messages:
print("\n\nOPTIONAL TEST FAILURES")
print("======================\n")
for message in self.results.optional_failure_messages:
print_notify(message[0])
for line in message[1].split("\n"):
print_warning("\t" + line)
if self.results.internal_failure_messages:
print("\n\nINTERNAL FAILURES")
print("=================\n")
print(
"There were internal validator failures associated with this run.\n"
"If this problem persists, please report it at:\n"
"https://github.com/Materials-Consortia/optimade-python-tools/issues/new\n"
)
for message in self.results.internal_failure_messages:
print_warning(message[0])
for line in message[1].split("\n"):
print_warning("\t" + line)
if self.valid or (not self.valid and not self.fail_fast):
final_message = f"\n\nPassed {self.results.success_count} out of {self.results.success_count + self.results.failure_count + self.results.internal_failure_count} tests."
if not self.valid:
print_failure(final_message)
else:
print_success(final_message)
if self.run_optional_tests and not self.fail_fast:
print(
f"Additionally passed {self.results.optional_success_count} out of "
f"{self.results.optional_success_count + self.results.optional_failure_count} optional tests."
)
def validate_implementation(self):
"""Run all the test cases on the implementation, or the single type test,
depending on what options were provided on initialiation.
Sets the `self.valid` attribute to `True` or `False` depending on the
outcome of the tests.
Raises:
RuntimeError: If it was not possible to start the validation process.
"""
# If a single "as type" has been set, only run that test
if self.as_type_cls is not None:
self._log.debug(
"Validating response of %s with model %s",
self.base_url,
self.as_type_cls,
)
self._test_as_type()
self.valid = not bool(self.results.failure_count)
self.print_summary()
return
# Test entire implementation
if self.verbosity >= 0:
print(f"Testing entire implementation at {self.base_url}")
info_endp = CONF.info_endpoint
self._log.debug("Testing base info endpoint of %s", info_endp)
# Get and validate base info to find endpoints
# If this is not possible, then exit at this stage
base_info = self._test_info_or_links_endpoint(info_endp)
if not base_info:
self._log.critical(
f"Unable to deserialize response from introspective {info_endp!r} endpoint. "
"This is required for all further validation, so the validator will now exit."
)
# Set valid to False to ensure error code 1 is raised at CLI
self.valid = False
self.print_summary()
return
# Grab the provider prefix from base info and use it when looking for provider fields
self.provider_prefix = None
meta = base_info.get("meta", {})
if meta.get("provider") is not None:
self.provider_prefix = meta["provider"].get("prefix")
# Set the response class for all `/info/entry` endpoints based on `/info` response
self.available_json_endpoints, _ = self._get_available_endpoints(
base_info, request=info_endp
)
for endp in self.available_json_endpoints:
self._response_classes[f"{info_endp}/{endp}"] = EntryInfoResponse
# Run some tests on the versions endpoint
self._log.debug("Testing versions endpoint %s", CONF.versions_endpoint)
self._test_versions_endpoint()
self._test_bad_version_returns_553()
# Test that entry info endpoints deserialize correctly
# If they do not, the corresponding entry in _entry_info_by_type
# is set to False, which must be checked for further validation
for endp in self.available_json_endpoints:
entry_info_endpoint = f"{info_endp}/{endp}"
self._log.debug("Testing expected info endpoint %s", entry_info_endpoint)
self._entry_info_by_type[endp] = self._test_info_or_links_endpoint(
entry_info_endpoint
)
# Test that the results from multi-entry-endpoints obey, e.g. page limits,
# and that all entries can be deserialized with the patched models.
# These methods also set the test_ids for each type of entry, which are validated
# in the next loop.
for endp in self.available_json_endpoints:
self._log.debug("Testing multiple entry endpoint of %s", endp)
self._test_multi_entry_endpoint(endp)
# Test that the single IDs scraped earlier work with the single entry endpoint
for endp in self.available_json_endpoints:
self._log.debug("Testing single entry request of type %s", endp)
self._test_single_entry_endpoint(endp)
# Use the _entry_info_by_type to construct filters on the relevant endpoints
if not self.minimal:
for endp in self.available_json_endpoints:
self._log.debug("Testing queries on JSON entry endpoint of %s", endp)
self._recurse_through_endpoint(endp)
# Test that the links endpoint can be serialized correctly
self._log.debug("Testing %s endpoint", CONF.links_endpoint)
self._test_info_or_links_endpoint(CONF.links_endpoint)
self.valid = not (
self.results.failure_count or self.results.internal_failure_count
)
self.print_summary()
@test_case
def _recurse_through_endpoint(self, endp: str) -> tuple[Optional[bool], str]:
"""For a given endpoint (`endp`), get the entry type
and supported fields, testing that all mandatory fields
are supported, then test queries on every property according
to the reported type, with optionality decided by the
specification-level support level for that field.
Parameters:
endp: Endpoint to be tested.
Returns:
`True` if endpoint passed the tests, and a string summary.
"""
entry_info = self._entry_info_by_type.get(endp)
if not entry_info:
raise ResponseError(
f"Unable to generate filters for endpoint {endp}: 'info/{endp}' response was malformed."
)
_impl_properties = self._check_entry_info(entry_info, endp)
prop_list = list(_impl_properties.keys())
self._check_response_fields(endp, prop_list)
chosen_entry, _ = self._get_archetypal_entry(endp, prop_list)
if not chosen_entry:
return (
None,
f"Unable to generate filters for endpoint {endp}: no valid entries found.",
)
for prop in _impl_properties:
# check support level of property
prop_type = _impl_properties[prop]["type"]
sortable = _impl_properties[prop]["sortable"]
optional = (
CONF.entry_schemas[endp].get(prop, {}).get("queryable")
== SupportLevel.OPTIONAL
)
if optional and not self.run_optional_tests:
continue
self._construct_queries_for_property(
prop,
prop_type,
sortable,
endp,
chosen_entry,
request=f"; testing queries for {endp}->{prop}",
optional=optional,
)
self._test_unknown_provider_property(endp)
self._test_completely_unknown_property(endp)
return True, f"successfully recursed through endpoint {endp}."
@test_case
def _test_completely_unknown_property(self, endp):
request = f"{endp}?filter=crazyfield = 2"
response, _ = self._get_endpoint(
request,
expected_status_code=400,
)
return True, "unknown field returned 400 Bad Request, as expected"
@test_case
def _test_unknown_provider_property(self, endp):
dummy_provider_field = "_crazyprovider_field"
request = f"{endp}?filter={dummy_provider_field}=2"
response, _ = self._get_endpoint(
request,
multistage=True,
request=request,
)
if response is not None:
deserialized, _ = self._deserialize_response(
response, CONF.response_classes[endp], request=request, multistage=True
)
return (
True,
"Unknown provider field was ignored when filtering, as expected",
)
raise ResponseError(
"Failed to handle field from unknown provider; should return without affecting filter results"
)
def _check_entry_info(
self, entry_info: dict[str, Any], endp: str
) -> dict[str, dict[str, Any]]:
"""Checks that `entry_info` contains all the required properties,
and returns the property list for the endpoint.
Parameters:
entry_info: JSON representation of the response from the
entry info endpoint.
endp: The name of the entry endpoint.
Returns:
The list of property names supported by this implementation.
"""
properties = entry_info.get("data", {}).get("properties", [])
self._test_must_properties(
properties, endp, request=f"{CONF.info_endpoint}/{endp}"
)
return properties
@test_case
def _test_must_properties(
self, properties: list[str], endp: str
) -> tuple[bool, str]:
"""Check that the entry info lists all properties with the "MUST"
support level for this endpoint.
Parameters:
properties: The list of property names supported by the endpoint.
endp: The endpoint.
Returns:
`True` if the properties were found, and a string summary.
"""
must_props = {
prop
for prop in CONF.entry_schemas.get(endp, {})
if CONF.entry_schemas[endp].get(prop, {}).get("support")
== SupportLevel.MUST
}
must_props_supported = {prop for prop in properties if prop in must_props}
missing = must_props - must_props_supported
if len(missing) != 0:
raise ResponseError(
f"Some 'MUST' properties were missing from info/{endp}: {missing}"
)
return True, f"Found all required properties in entry info for endpoint {endp}"
@test_case
def _get_archetypal_entry(
self, endp: str, properties: list[str]
) -> tuple[Optional[dict[str, Any]], str]:
"""Get a random entry from the first page of results for this
endpoint.
Parameters:
endp: The endpoint to query.
Returns:
The JSON representation of the chosen entry and the summary message.
"""
response, message = self._get_endpoint(endp, multistage=True)
if response:
data = response.json().get("data", [])
data_returned = len(data)
if data_returned < 1:
return (
None,
"Endpoint {endp!r} returned no entries, cannot get archetypal entry or test filtering.",
)
archetypal_entry = response.json()["data"][
random.randint(0, data_returned - 1)
]
if "id" not in archetypal_entry:
raise ResponseError(
f"Chosen archetypal entry did not have an ID, cannot proceed: {archetypal_entry!r}"
)
return (
archetypal_entry,
f"set archetypal entry for {endp} with ID {archetypal_entry['id']}.",
)
raise ResponseError(f"Failed to get archetypal entry. Details: {message}")
@test_case
def _check_response_fields(
self, endp: str, fields: list[str]
) -> tuple[Optional[bool], str]:
"""Check that the response field query parameter is obeyed.
Parameters:
endp: The endpoint to query.
fields: The known fields for this endpoint to test.
Returns:
Bool indicating success and a summary message.
"""
subset_fields = random.sample(fields, min(len(fields) - 1, 3))
test_query = f"{endp}?response_fields={','.join(subset_fields)}&page_limit=1"
response, _ = self._get_endpoint(test_query, multistage=True)
if response and len(response.json()["data"]) > 0:
doc = response.json()["data"][0]
expected_fields = set(subset_fields)
expected_fields -= CONF.top_level_non_attribute_fields
if "attributes" not in doc:
raise ResponseError(
f"Entries are missing `attributes` key.\nReceived: {doc}"
)
returned_fields = set(sorted(list(doc.get("attributes", {}).keys())))
returned_fields -= CONF.top_level_non_attribute_fields
if expected_fields != returned_fields:
raise ResponseError(
f"Response fields not obeyed by {endp!r}:\nExpected: {expected_fields}\nReturned: {returned_fields}"
)
return True, "Successfully limited response fields"
return (
None,
f"Unable to test adherence to response fields as no entries were returned for endpoint {endp!r}.",
)
@test_case
def _construct_queries_for_property(
self,
prop: str,
prop_type: DataType,
sortable: bool,
endp: str,
chosen_entry: dict[str, Any],
) -> tuple[Optional[bool], str]:
"""For the given property, property type and chose entry, this method
runs a series of queries for each field in the entry, testing that the
initial document is returned where expected.
Parameters:
prop: The property name.
prop_type: The property type.
sortable: Whether the implementation has indicated that the field is sortable.
endp: The corresponding entry endpoint.
chosen_entry: A JSON respresentation of the chosen entry that will be used to
construct the filters.
Returns:
Boolean indicating success (`True`) or failure/irrelevance
(`None`) and the string summary of the test case.
"""
# Explicitly handle top level keys that do not have types in info
if not chosen_entry:
raise ResponseError(
f"Chosen entry of endpoint '/{endp}' failed validation."
)
if prop == "type":
if chosen_entry.get("type") == endp:
return True, f"Successfully validated {prop}"
raise ResponseError(
f"Chosen entry of endpoint '{endp}' had unexpected or missing type: {chosen_entry.get('type')!r}."
)
prop_type = (
CONF.entry_schemas[endp].get(prop, {}).get("type")
if prop_type is None
else prop_type
)
if prop_type is None:
raise ResponseError(
f"Cannot validate queries on {prop!r} as field type was not reported in `/info/{endp}`"
)
# this is the case of a provider field
if prop not in CONF.entry_schemas[endp]:
if self.provider_prefix is None:
raise ResponseError(
f"Found unknown field {prop!r} in `/info/{endp}` and no provider prefix was provided in `/info`"
)
elif not prop.startswith(f"_{self.provider_prefix}_"):
raise ResponseError(
f"Found unknown field {prop!r} that did not start with provider prefix '_{self.provider_prefix}_'"
)
return (
None,
f"Found provider field {prop!r}, will not test queries as they are strictly optional.",
)
query_optional = (
CONF.entry_schemas[endp].get(prop, {}).get("queryable")
== SupportLevel.OPTIONAL
)
return self._construct_single_property_filters(
prop, prop_type, sortable, endp, chosen_entry, query_optional
)
@staticmethod
def _format_test_value(test_value: Any, prop_type: DataType, operator: str) -> str:
"""Formats the test value as a string according to the type of the property.
Parameters:
test_value: The value to format.
prop_type: The OPTIMADE data type of the field.
operator: The operator that will be applied to it.
Returns:
The value formatted as a string to use in an OPTIMADE filter.
"""
if prop_type == DataType.LIST:
if operator in ("HAS ALL", "HAS ANY"):
_vals = sorted(set(test_value))
if isinstance(test_value[0], str):
_vals = [f'"{val}"' for val in _vals]
else:
_vals = [f"{val}" for val in _vals]
_test_value = ",".join(_vals)
elif operator == "LENGTH":
_test_value = f"{len(test_value)}"
else:
if isinstance(test_value[0], str):
_test_value = f'"{test_value[0]}"'
else:
_test_value = test_value[0]
elif prop_type in (DataType.STRING, DataType.TIMESTAMP):
_test_value = f'"{test_value}"'
else:
_test_value = test_value
return _test_value
def _construct_single_property_filters(
self,
prop: str,
prop_type: DataType,
sortable: bool,
endp: str,
chosen_entry: dict[str, Any],
query_optional: bool,
) -> tuple[Optional[bool], str]:
"""This method constructs appropriate queries using all operators
for a certain field and applies some tests:
- inclusive operators return compatible entries, e.g. `>=` always returns
at least the results of `=`.
- exclusive operators never return contradictory entries, e.g.
`nsites=1` never returns the same entries as `nsites!=1`, modulo
pagination.
Parameters:
prop: The property name.
prop_type: The property type.
sortable: Whether the implementation has indicated that the field is sortable.
endp: The corresponding entry endpoint.
chosen_entry: A JSON respresentation of the chosen entry that will be used to
construct the filters.
query_optional: Whether to treat query success as optional.
Returns:
Boolean indicating success (`True`) or failure/irrelevance
(`None`) and the string summary of the test case.
"""
if prop == "id":
test_value = chosen_entry.get("id")
else:
test_value = chosen_entry.get("attributes", {}).get(prop, "_missing")
if test_value in ("_missing", None):
support = CONF.entry_schemas[endp].get(prop, {}).get("support")
queryable = CONF.entry_schemas[endp].get(prop, {}).get("queryable")
submsg = "had no value" if test_value == "_missing" else "had `None` value"
msg = (
f"Chosen entry {submsg} for {prop!r} with support level {support} and queryability {queryable}, "
f"so cannot construct test queries. This field should potentially be removed from the `/info/{endp}` endpoint response."
)
# None values are allowed for OPTIONAL and SHOULD, so we can just skip
if support in (
SupportLevel.OPTIONAL,
SupportLevel.SHOULD,
):
self._log.info(msg)
return None, msg
# Otherwise, None values are not allowed for MUST's, and entire missing fields are not allowed
raise ResponseError(msg)
using_fallback = False
if prop_type == DataType.LIST:
if not test_value:
test_value = CONF.enum_fallback_values.get(endp, {}).get(prop)
using_fallback = True
if not test_value:
msg = f"Not testing filters on field {prop} of type {prop_type} as no test value was found to use in filter."
self._log.warning(msg)
return None, msg
if isinstance(test_value[0], dict) or isinstance(test_value[0], list):
msg = f"Not testing filters on field {prop} of type {prop_type} with nested dictionary/list test value."
self._log.warning(msg)
return None, msg
# Try to infer if the test value is a float from its string representation
# and decide whether to do inclusive/exclusive query tests
try:
float(test_value[0])
msg = f"Not testing filters on field {prop} of type {prop_type} containing float values."
self._log.warning(msg)
return None, msg
except ValueError:
pass
if prop_type in (DataType.DICTIONARY,):
msg = f"Not testing queries on field {prop} of type {prop_type}."
self._log.warning(msg)
return None, msg
num_data_returned = {}
inclusive_operators = CONF.inclusive_operators[prop_type]
exclusive_operators = CONF.exclusive_operators[prop_type]
field_specific_support_overrides = CONF.field_specific_overrides.get(prop, {})
for operator in inclusive_operators | exclusive_operators:
# Need to pre-format list and string test values for the query
_test_value = self._format_test_value(test_value, prop_type, operator)
query_optional = (
query_optional
or operator
in field_specific_support_overrides.get(SupportLevel.OPTIONAL, [])
)
query = f"{prop} {operator} {_test_value}"
request = f"{endp}?filter={query}"
response, message = self._get_endpoint(
request,
multistage=True,
optional=query_optional,
expected_status_code=(200, 501),
)
if response is None or response.status_code != 200:
if query_optional:
return (
None,
"Optional query {query!r} raised the error: {message}.",
)
raise ResponseError(
f"Unable to perform mandatory query {query!r}, which raised the error: {message}"
)
response = response.json()
if "meta" not in response or "more_data_available" not in response["meta"]:
raise ResponseError(
f"Required field `meta->more_data_available` missing from response for {request}."
)
if not response["meta"]["more_data_available"] and "data" in response:
num_data_returned[operator] = len(response["data"])
else:
num_data_returned[operator] = response["meta"].get("data_returned")
if prop in CONF.unique_properties and operator == "=":
if num_data_returned["="] is not None and num_data_returned["="] == 0:
raise ResponseError(
f"Unable to filter field 'id' for equality, no data was returned for {query}."
)
if num_data_returned["="] is not None and num_data_returned["="] > 1:
raise ResponseError(
f"Filter for an individual 'id' returned {num_data_returned['=']} results, when only 1 was expected."
)
num_response = num_data_returned[operator]
excluded = operator in exclusive_operators
# if we have all results on this page, check that the blessed ID is in the response
if excluded and (
chosen_entry.get("id", "")
in {entry.get("id") for entry in response["data"]}
):
raise ResponseError(
f"Entry {chosen_entry['id']} with value {prop!r}: {test_value} was not excluded by {query!r}"
)
# check that at least the archetypal structure was returned, unless we are using a fallback value
if not excluded and not using_fallback:
if (
num_data_returned[operator] is not None
and num_data_returned[operator] < 1
):
raise ResponseError(
f"Supposedly inclusive query {query!r} did not include original entry ID {chosen_entry['id']!r} "
f"(with field {prop!r} = {test_value}) potentially indicating a problem with filtering on this field."
)
# check that the filter returned no entries that had a null or missing value for the filtered property
if any(
entry.get("attributes", {}).get(prop, entry.get(prop, None)) is None
for entry in response.get("data", [])
):
raise ResponseError(
f"Filter {query!r} on field {prop!r} returned entries that had null or missing values for the field."
)
# Numeric and string comparisons must work both ways...
if prop_type in (
DataType.STRING,
DataType.INTEGER,
DataType.FLOAT,
DataType.TIMESTAMP,
) and operator not in (
"CONTAINS",
"STARTS",
"STARTS WITH",
"ENDS",
"ENDS WITH",
):
reversed_operator = operator.replace("<", ">")
if "<" in operator:
reversed_operator = operator.replace("<", ">")
elif ">" in operator:
reversed_operator = operator.replace(">", "<")
# Don't try to reverse string comparison as it is ill-defined
if prop_type == DataType.STRING and any(
comp in operator for comp in ("<", ">")
):
continue
reversed_query = f"{_test_value} {reversed_operator} {prop}"
reversed_request = f"{endp}?filter={reversed_query}"
reversed_response, message = self._get_endpoint(
reversed_request,
multistage=True,
optional=query_optional,
expected_status_code=(200, 501),
)
if not reversed_response:
if query_optional:
return (
None,
"Optional query {reversed_query!r} raised the error: {message}.",
)
raise ResponseError(
f"Unable to perform mandatory query {reversed_query!r}, which raised the error: {message}"
)
reversed_response = reversed_response.json()
if (
"meta" not in reversed_response
or "more_data_available" not in reversed_response["meta"]
):
raise ResponseError(
f"Required field `meta->more_data_available` missing from response for {request}."
)
if not reversed_response["meta"]["more_data_available"]:
num_reversed_response = len(reversed_response["data"])
else:
num_reversed_response = reversed_response["meta"].get(
"data_returned"
)
if num_response is not None and num_reversed_response is not None:
if reversed_response["meta"].get("data_returned") != response[
"meta"
].get("data_returned"):
raise ResponseError(
f"Query {query} did not work both ways around: {reversed_query}, "
"returning a different number of results each time (as reported by `meta->data_returned`)"
)
# check that the filter returned no entries that had a null or missing value for the filtered property
if any(
entry.get("attributes", {}).get(prop, entry.get(prop, None)) is None
for entry in reversed_response.get("data", [])
):
raise ResponseError(
f"Filter {reversed_query!r} on field {prop!r} returned entries that had null or missing values for the field."
)
return True, f"{prop} passed filter tests"
def _test_info_or_links_endpoint(
self, request_str: str
) -> Union[Literal[False], dict]:
"""Requests an info or links endpoint and attempts to deserialize
the response.
Parameters:
request_str: The request to make, e.g. "links".
Returns:
`False` if the info response failed deserialization,
otherwise returns the deserialized object.
"""
response, _ = self._get_endpoint(request_str)
if response:
deserialized, _ = self._deserialize_response(
response,
self._response_classes[request_str],
request=request_str,
)
if deserialized:
return deserialized.model_dump()
return False
def _test_single_entry_endpoint(self, endp: str) -> None:
"""Requests and deserializes a single entry endpoint with the
appropriate model.
Parameters:
request_str: The single entry request to make, e.g. "structures/id_1".
"""
response_cls_name = endp + "/"
if response_cls_name in self._response_classes:
response_cls = self._response_classes[response_cls_name]
else:
self._log.warning(
"Deserializing single entry response %s with generic response rather than defined endpoint.",
endp,
)
response_cls = ValidatorEntryResponseOne
response_fields = set()
if endp in CONF.entry_schemas:
response_fields = (
set(CONF.entry_schemas[endp].keys())
- CONF.top_level_non_attribute_fields
)
if endp in self._test_id_by_type:
test_id = self._test_id_by_type[endp]
request_str = f"{endp}/{test_id}"
if response_fields:
request_str += f"?response_fields={','.join(response_fields)}"
response, _ = self._get_endpoint(request_str)
if response:
self._test_meta_schema_reporting(response, request_str, optional=True)
self._deserialize_response(response, response_cls, request=request_str)
def _test_multi_entry_endpoint(self, endp: str) -> None:
"""Requests and deserializes a multi-entry endpoint with the
appropriate model.
Parameters:
request_str: The multi-entry request to make, e.g.,
"structures?filter=nsites<10"
"""
if endp in self._response_classes:
response_cls = self._response_classes[endp]
else:
self._log.warning(
"Deserializing multi entry response from %s with generic response rather than defined endpoint.",
endp,
)
response_cls = ValidatorEntryResponseMany
response_fields = set()
if endp in CONF.entry_schemas:
response_fields = (
set(CONF.entry_schemas[endp].keys())
- CONF.top_level_non_attribute_fields
)
request_str = f"{endp}?page_limit={self.page_limit}"
if response_fields:
request_str += f'&response_fields={",".join(response_fields)}'
response, _ = self._get_endpoint(request_str)
self._test_if_data_empty(response, request_str, optional=True)
self._test_meta_schema_reporting(response, request_str, optional=True)
self._test_page_limit(response)
deserialized, _ = self._deserialize_response(
response, response_cls, request=request_str
)
self._get_single_id_from_multi_entry_endpoint(deserialized, request=request_str)
if deserialized:
self._test_data_available_matches_data_returned(
deserialized, request=request_str
)
@test_case
def _test_data_available_matches_data_returned(
self, deserialized: Any
) -> tuple[Optional[bool], str]:
"""In the case where no query is requested, `data_available`
must equal `data_returned` in the meta response, which is tested
here.
Parameters:
deserialized: The deserialized response to a multi-entry
endpoint.
Returns:
`True` if successful, with a string summary.
"""
if (
deserialized.meta.data_available is None
or deserialized.meta.data_returned is None
):
return (
None,
"`meta->data_available` and/or `meta->data_returned` were not provided.",
)
if deserialized.meta.data_available != deserialized.meta.data_returned:
raise ResponseError(
f"No query was performed, but `data_returned` != `data_available` {deserialized.meta.data_returned} vs {deserialized.meta.data_available}."
)
return (
True,
"Meta response contained correct values for data_available and data_returned.",
)
def _test_versions_endpoint(self):
"""Requests and validate responses for the versions endpoint,
which MUST exist for unversioned base URLs and MUST NOT exist
for versioned base URLs.
"""
# First, check that there is a versions endpoint in the appropriate place:
# If passed a versioned URL, then strip that version from
# the URL before looking for `/versions`.
_old_base_url = self.base_url
if re.match(VERSIONS_REGEXP, self.base_url_parsed.path) is not None:
self.client.base_url = "/".join(self.client.base_url.split("/")[:-1])
self.base_url = self.client.base_url
response, _ = self._get_endpoint(
CONF.versions_endpoint, expected_status_code=200
)
if response:
self._test_versions_endpoint_content(
response, request=CONF.versions_endpoint
)
# If passed a versioned URL, first reset the URL of the client to the
# versioned one, then test that this versioned URL does NOT host a versions endpoint
if re.match(VERSIONS_REGEXP, self.base_url_parsed.path) is not None:
self.client.base_url = _old_base_url
self.base_url = _old_base_url
self._get_endpoint(CONF.versions_endpoint, expected_status_code=404)
@test_case
def _test_versions_endpoint_content(
self, response: requests.Response
) -> tuple[requests.Response, str]:
"""Checks that the response from the versions endpoint complies
with the specification and that its 'Content-Type' header complies with
[RFC 4180](https://tools.ietf.org/html/rfc4180.html).
Parameters:
response: The HTTP response from the versions endpoint.
Raises:
ResponseError: If any content checks fail.
Returns:
The successful HTTP response or `None`, and a summary string.
"""
text_content = response.text.strip().split("\n")
if text_content[0] != "version":
raise ResponseError(
f"First line of `/{CONF.versions_endpoint}` response must be 'version', not {text_content[0]!r}"
)
if len(text_content) <= 1:
raise ResponseError(
f"No version numbers found in `/{CONF.versions_endpoint}` response, only {text_content}"
)
for version in text_content[1:]:
try:
int(version)
except ValueError:
raise ResponseError(
f"Version numbers reported by `/{CONF.versions_endpoint}` must be integers specifying the major version, not {text_content}."
)
_content_type = response.headers.get("content-type")
if not _content_type:
raise ResponseError(
"Missing 'Content-Type' in response header from `/versions`."
)
content_type = [_.replace(" ", "") for _ in _content_type.split(";")]
self._test_versions_headers(
content_type,
("text/csv", "text/plain"),
optional=True,
request=CONF.versions_endpoint,
)
self._test_versions_headers(
content_type,
"header=present",
optional=True,
request=CONF.versions_endpoint,
)
return response, "`/versions` endpoint responded correctly."
@test_case
def _test_versions_headers(
self,
content_type: dict[str, Any],
expected_parameter: Union[str, list[str]],
) -> tuple[dict[str, Any], str]:
"""Tests that the `Content-Type` field of the `/versions` header contains
the passed parameter.
Arguments:
content_type: The 'Content-Type' field from the response of the `/versions` endpoint.
expected_paramter: A substring or list of substrings that are expected in
the Content-Type of the response. If multiple strings are passed, they will
be treated as possible alternatives to one another.
Raises:
ResponseError: If the expected 'Content-Type' parameter is missing.
Returns:
The HTTP response headers and a summary string.
"""
if isinstance(expected_parameter, str):
expected_parameter = [expected_parameter]
if not any(param in content_type for param in expected_parameter):
raise ResponseError(
f"Incorrect 'Content-Type' header {';'.join(content_type)!r}.\n"
f"Missing at least one expected parameter(s): {expected_parameter!r}"
)
return (
content_type,
f"`/versions` response had one of the expected Content-Type parameters {expected_parameter}.",
)
def _test_as_type(self) -> None:
"""Tests that the base URL of the validator (i.e. with no
additional path added) validates with the model selected.
"""
response, _ = self._get_endpoint("")
if response:
self._log.debug("Deserialzing response as type %s", self.as_type_cls)
self._deserialize_response(response, self.as_type_cls)
def _test_bad_version_returns_553(self) -> None:
"""Tests that a garbage version number responds with a 553
error code.
"""
expected_status_code = [553]
if re.match(VERSIONS_REGEXP, self.base_url_parsed.path) is not None:
expected_status_code = [404, 400]
self._get_endpoint(
"v123123/info", expected_status_code=expected_status_code, optional=True
)
@test_case
def _test_if_data_empty(
self,
response: requests.models.Response,
request_str: str,
):
"""Tests whether an endpoint responds a entries under `data`."""
try:
if not response.json().get("data", []):
raise ResponseError(
f"Query {request_str} did not respond with any entries under `data`. This may not consitute an error, but should be checked."
)
except json.JSONDecodeError:
raise ResponseError(
f"Unable to test presence of `data` for query {request_str}: could not decode response as JSON.\n{str(response.content)}"
)
return (
True,
f"Query {request_str} successfully returned some entries.",
)
@test_case
def _test_meta_schema_reporting(
self,
response: requests.models.Response,
request_str: str,
):
"""Tests that the endpoint responds with a `meta->schema`."""
try:
if not response.json().get("meta", {}).get("schema"):
raise ResponseError(
f"Query {request_str} did not report a schema in `meta->schema` field."
)
except json.JSONDecodeError:
raise ResponseError(
f"Unable to test presence of `meta->schema`: could not decode response as JSON.\n{str(response.content)}"
)
return (
True,
f"Query {request_str} successfully reported a schema in `meta->schema`.",
)
@test_case
def _test_page_limit(
self,
response: requests.models.Response,
check_next_link: int = 5,
previous_links: Optional[set[str]] = None,
) -> tuple[Optional[bool], str]:
"""Test that a multi-entry endpoint obeys the page limit by
following pagination links up to a depth of `check_next_link`.
Parameters:
response: The response to test for page limit
compliance.
check_next_link: Maximum recursion depth for following
pagination links.
previous_links: A set of previous links that will be used
to check that the `next` link is actually new.
Returns:
`True` if the test was successful and `None` if not, with a
string summary.
"""
if previous_links is None:
previous_links = set()
try:
response_json = response.json()
except (AttributeError, json.JSONDecodeError):
raise ResponseError("Unable to test endpoint `page_limit` parameter.")
try:
num_entries = len(response_json["data"])
except (KeyError, TypeError):
raise ResponseError(
"Response under `data` field was missing or had wrong type."
)
if num_entries > self.page_limit:
raise ResponseError(
f"Endpoint did not obey page limit: {num_entries} entries vs {self.page_limit} limit"
)
try:
more_data_available = response_json["meta"]["more_data_available"]
except KeyError:
raise ResponseError("Field `meta->more_data_available` was missing.")
if more_data_available and check_next_link:
try:
next_link = response_json["links"]["next"]
if isinstance(next_link, dict):
next_link = next_link["href"]
if not next_link:
raise ResponseError(
"Endpoint suggested more data was available but provided no valid links->next link."
)
except KeyError:
raise ResponseError(
"Endpoint suggested more data was available but provided no valid links->next link."
)
if next_link in previous_links:
raise ResponseError(
f"The next link {next_link} has been provided already for a previous page."
)
previous_links.add(next_link)
if not isinstance(next_link, str):
raise ResponseError(
f"Unable to parse links->next {next_link!r} as a link."
)
self._log.debug("Following pagination link to %r.", next_link)
next_response, _ = self._get_endpoint(next_link)
if not next_response:
raise ResponseError(
f"Error when testing pagination: the response from `links->next` {next_link!r} failed the previous test."
)
check_next_link = check_next_link - 1
self._test_page_limit(
next_response,
check_next_link=check_next_link,
multistage=bool(check_next_link),
previous_links=previous_links,
)
return (
True,
f"Endpoint obeyed page limit of {self.page_limit} by returning {num_entries} entries.",
)
@test_case
def _get_single_id_from_multi_entry_endpoint(self, deserialized):
"""Scrape an ID from the multi-entry endpoint to use as query
for single entry endpoint.
"""
if deserialized and deserialized.data:
self._test_id_by_type[deserialized.data[0].type] = deserialized.data[0].id
self._log.debug(
"Set type %s test ID to %s",
deserialized.data[0].type,
deserialized.data[0].id,
)
else:
return (
None,
"No entries found under endpoint to scrape ID from. "
"This may be caused by previous errors, if e.g. the endpoint failed deserialization.",
)
return (
self._test_id_by_type[deserialized.data[0].type],
f"successfully scraped test ID from {deserialized.data[0].type} endpoint",
)
@test_case
def _deserialize_response(
self,
response: requests.models.Response,
response_cls: Any,
request: Optional[str] = None,
) -> tuple[Any, str]:
"""Try to create the appropriate pydantic model from the response.
Parameters:
response: The response to try to deserialize.
response_cls: The class to use for deserialization.
request: Optional string that will be displayed as the attempted
request in the validator output.
Returns:
The deserialized object (or `None` if unsuccessful) and a
human-readable summary
"""
if not response:
raise ResponseError("Request failed")
try:
json_response = response.json()
except json.JSONDecodeError:
raise ResponseError(
f"Unable to decode response as JSON. Response: {response}"
)
self._log.debug(
f"Deserializing {json.dumps(json_response, indent=2)} as model {response_cls}"
)
return (
response_cls(**json_response),
f"deserialized correctly as object of type {response_cls}",
)
@test_case
def _get_available_endpoints(
self, base_info: Union[Any, dict[str, Any]]
) -> tuple[Optional[list[str]], str]:
"""Tries to get `entry_types_by_format` from base info response
even if it could not be deserialized.
Parameters:
base_info: Either the unvalidated JSON representation of the
base info, or the deserialized object.
Returns:
The list of JSON entry endpoints (or `None` if unavailable)
and a string summary.
"""
available_json_entry_endpoints = []
for _ in [0]:
try:
available_json_entry_endpoints = base_info["data"]["attributes"][
"entry_types_by_format"
]["json"]
break
except (KeyError, TypeError):
raise ResponseError(
"Unable to get entry_types_by_format from unserializable base info response {}.".format(
base_info
)
)
else:
raise ResponseError(
"Unable to find any JSON entry types in entry_types_by_format"
)
if self.index and available_json_entry_endpoints != []:
raise ResponseError(
"No entry endpoint are allowed for an Index meta-database"
)
for non_entry_endpoint in CONF.non_entry_endpoints:
if non_entry_endpoint in available_json_entry_endpoints:
raise ResponseError(
f'Illegal entry "{non_entry_endpoint}" was found in entry_types_by_format"'
)
# Filter out custom extension endpoints that are not covered in the specification
available_json_entry_endpoints = [
endp
for endp in available_json_entry_endpoints
if endp in CONF.entry_endpoints
]
return (
available_json_entry_endpoints,
"successfully found available entry types in baseinfo",
)
@test_case
def _get_endpoint(
self, request_str: str, expected_status_code: Union[list[int], int] = 200
) -> tuple[Optional[requests.Response], str]:
"""Gets the response from the endpoint specified by `request_str`.
function is wrapped by the `test_case` decorator
Parameters:
request_str: The request to make to the client.
expected_status_code: If the request responds with a different
status code to this one, raise a ResponseError.
Returns:
The response to the request (if successful) or `None`, plus
a string summary.
"""
request_str = request_str.replace("\n", "")
response = self.client.get(request_str)
if isinstance(expected_status_code, int):
expected_status_code = [expected_status_code]
message = f"received expected response: {response}."
if response.status_code != 200:
message = f"Request to '{request_str}' returned HTTP status code {response.status_code}."
message += "\nAdditional details from implementation:"
try:
for error in response.json().get("errors", []):
message += f'\n {error.get("title", "N/A")}: {error.get("detail", "N/A")} ({error.get("source", {}).get("pointer", "N/A")})'
except json.JSONDecodeError:
message += f"\n Could not parse response as JSON. Content type was {response.headers.get('content-type')!r}."
if response.status_code not in expected_status_code:
raise ResponseError(message)
return response, message
|