-
-
Notifications
You must be signed in to change notification settings - Fork 436
Expand file tree
/
Copy pathjsonschema.py
More file actions
4378 lines (3958 loc) · 192 KB
/
jsonschema.py
File metadata and controls
4378 lines (3958 loc) · 192 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""JSON Schema parser implementation.
Handles parsing of JSON Schema, JSON, YAML, Dict, and CSV inputs to generate
Python data models. Supports draft-04 through draft-2020-12 schemas.
"""
from __future__ import annotations
import enum as _enum
import importlib
import json
import re
from collections import defaultdict
from collections.abc import Iterable
from contextlib import contextmanager, suppress
from functools import cached_property, lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, Literal, Optional, Union
from urllib.parse import ParseResult, unquote
from warnings import warn
from pydantic import (
ConfigDict,
Field,
field_validator,
model_validator,
)
from typing_extensions import Unpack
from datamodel_code_generator import (
AllOfClassHierarchy,
AllOfMergeMode,
InvalidClassNameError,
JsonSchemaVersion,
ReadOnlyWriteOnlyModelType,
SchemaParseError,
VersionMode,
YamlValue,
load_data,
load_data_from_path,
snooper_to_methods,
)
from datamodel_code_generator.format import (
DatetimeClassType,
)
from datamodel_code_generator.imports import IMPORT_ANY, Import
from datamodel_code_generator.model import DataModel, DataModelFieldBase
from datamodel_code_generator.model.base import UNDEFINED, get_module_name, sanitize_module_name
from datamodel_code_generator.model.dataclass import DataClass
from datamodel_code_generator.model.enum import (
SPECIALIZED_ENUM_TYPE_MATCH,
Enum,
StrEnum,
)
from datamodel_code_generator.model.pydantic_v2.dataclass import DataClass as PydanticV2DataClass
from datamodel_code_generator.parser import DefaultPutDict, LiteralType
from datamodel_code_generator.parser.base import (
SPECIAL_PATH_FORMAT,
Parser,
Source,
escape_characters,
get_special_path,
title_to_class_name,
)
from datamodel_code_generator.reference import SPECIAL_PATH_MARKER, ModelType, Reference, is_url
from datamodel_code_generator.types import (
ANY,
DataType,
EmptyDataType,
Types,
UnionIntFloat,
extract_qualified_names,
get_subscript_args,
get_type_base_name,
)
from datamodel_code_generator.util import BaseModel
if TYPE_CHECKING:
from collections.abc import Callable, Generator, Iterable, Iterator
from datamodel_code_generator._types import JSONSchemaParserConfigDict
from datamodel_code_generator.config import JSONSchemaParserConfig
from datamodel_code_generator.parser.schema_version import JsonSchemaFeatures
def unescape_json_pointer_segment(segment: str) -> str:
"""Unescape JSON pointer segment by converting escape sequences and percent-encoding."""
# Unescape ~1, ~0, and percent-encoding
return unquote(segment.replace("~1", "/").replace("~0", "~"))
def get_model_by_path(
schema: dict[str, YamlValue] | list[YamlValue], keys: list[str] | list[int]
) -> dict[str, YamlValue]:
"""Retrieve a model from schema by traversing the given path keys."""
if not keys:
if isinstance(schema, dict):
return schema
msg = f"Does not support json pointer to array. schema={schema}, key={keys}" # pragma: no cover
raise NotImplementedError(msg) # pragma: no cover
# Unescape the key if it's a string (JSON pointer segment)
key = keys[0]
if isinstance(key, str): # pragma: no branch
key = unescape_json_pointer_segment(key)
value = schema.get(str(key), {}) if isinstance(schema, dict) else schema[int(key)]
if len(keys) == 1:
if isinstance(value, dict):
return value
msg = f"Does not support json pointer to array. schema={schema}, key={keys}" # pragma: no cover
raise NotImplementedError(msg) # pragma: no cover
if isinstance(value, (dict, list)):
return get_model_by_path(value, keys[1:])
msg = f"Cannot traverse non-container value. schema={schema}, key={keys}" # pragma: no cover
raise NotImplementedError(msg) # pragma: no cover
# TODO: This dictionary contains formats valid only for OpenAPI and not for
# jsonschema and vice versa. They should be separated.
json_schema_data_formats: dict[str, dict[str, Types]] = {
"integer": {
"int32": Types.int32,
"int64": Types.int64,
"default": Types.integer,
"date-time": Types.date_time,
"unix-time": Types.int64,
"unixtime": Types.int64,
},
"number": {
"float": Types.float,
"double": Types.double,
"decimal": Types.decimal,
"date-time": Types.date_time,
"time": Types.time,
"time-delta": Types.timedelta,
"default": Types.number,
"unixtime": Types.int64,
},
"string": {
"default": Types.string,
"byte": Types.byte, # base64 encoded string
"binary": Types.binary,
"date": Types.date,
"date-time": Types.date_time,
"timestamp with time zone": Types.date_time, # PostgreSQL format
"date-time-local": Types.date_time_local,
"duration": Types.timedelta,
"time": Types.time,
"time-local": Types.time_local,
"password": Types.password,
"path": Types.path,
"email": Types.email,
"idn-email": Types.email,
"uuid": Types.uuid,
"uuid1": Types.uuid1,
"uuid2": Types.uuid2,
"uuid3": Types.uuid3,
"uuid4": Types.uuid4,
"uuid5": Types.uuid5,
"uri": Types.uri,
"uri-reference": Types.string,
"hostname": Types.hostname,
"ipv4": Types.ipv4,
"ipv4-network": Types.ipv4_network,
"ipv6": Types.ipv6,
"ipv6-network": Types.ipv6_network,
"decimal": Types.decimal,
"integer": Types.integer,
"unixtime": Types.int64,
"ulid": Types.ulid,
},
"boolean": {"default": Types.boolean},
"object": {"default": Types.object},
"null": {"default": Types.null},
"array": {"default": Types.array},
}
class JSONReference(_enum.Enum):
"""Define types of JSON references."""
LOCAL = "LOCAL"
REMOTE = "REMOTE"
URL = "URL"
class Discriminator(BaseModel):
"""Represent OpenAPI discriminator object.
This is an OpenAPI-specific concept for supporting polymorphism.
It identifies which schema applies based on a property value.
Kept in jsonschema.py to avoid circular imports with openapi.py.
"""
propertyName: str # noqa: N815
mapping: Optional[dict[str, str]] = None # noqa: UP045
class JsonSchemaObject(BaseModel):
"""Represent a JSON Schema object with validation and parsing capabilities."""
if not TYPE_CHECKING: # pragma: no branch
@classmethod
def get_fields(cls) -> dict[str, Any]:
"""Get fields for Pydantic v2 models."""
return cls.model_fields
__constraint_fields__: set[str] = { # noqa: RUF012
"exclusiveMinimum",
"minimum",
"exclusiveMaximum",
"maximum",
"multipleOf",
"minItems",
"maxItems",
"minLength",
"maxLength",
"pattern",
"uniqueItems",
}
__extra_key__: str = SPECIAL_PATH_FORMAT.format("extras")
__metadata_only_fields__: set[str] = { # noqa: RUF012
"title",
"description",
"id",
"$id",
"$schema",
"$comment",
"examples",
"example",
"x_enum_varnames",
"x_enum_field_as_literal",
"definitions",
"$defs",
"default",
"readOnly",
"writeOnly",
"deprecated",
"$recursiveRef",
"recursiveRef",
"$recursiveAnchor",
"recursiveAnchor",
"$dynamicRef",
"dynamicRef",
"$dynamicAnchor",
"dynamicAnchor",
}
__schema_affecting_extras__: set[str] = { # noqa: RUF012
"const",
}
@model_validator(mode="before")
def validate_exclusive_maximum_and_exclusive_minimum(cls, values: Any) -> Any: # noqa: N805
"""Validate and convert boolean exclusive maximum and minimum to numeric values."""
if not isinstance(values, dict):
return values
exclusive_maximum: float | bool | None = values.get("exclusiveMaximum")
exclusive_minimum: float | bool | None = values.get("exclusiveMinimum")
if exclusive_maximum is True:
values["exclusiveMaximum"] = values["maximum"]
del values["maximum"]
elif exclusive_maximum is False:
del values["exclusiveMaximum"]
if exclusive_minimum is True:
values["exclusiveMinimum"] = values["minimum"]
del values["minimum"]
elif exclusive_minimum is False:
del values["exclusiveMinimum"]
return values
@field_validator("ref")
def validate_ref(cls, value: Any) -> Any: # noqa: N805
"""Validate and normalize $ref values."""
if isinstance(value, str) and "#" in value:
if value.endswith("#/"):
return value[:-1]
if "#/" in value or value[0] == "#" or value[-1] == "#":
return value
return value.replace("#", "#/")
return value
@field_validator("required", mode="before")
def validate_required(cls, value: Any) -> Any: # noqa: N805
"""Validate and normalize required field values."""
if value is None:
return []
if isinstance(value, list): # pragma: no branch # noqa: PLR1702
# Filter to only include valid strings, excluding invalid objects
required_fields: list[str] = []
for item in value:
if isinstance(item, str):
required_fields.append(item)
# In some cases, the required field can include "anyOf", "oneOf", or "allOf" as a dict (#2297)
elif isinstance(item, dict): # pragma: no branch
for key, val in item.items():
if isinstance(val, list): # pragma: no branch
# If 'anyOf' or "oneOf" is present, we won't include it in required fields
if key in {"anyOf", "oneOf"}:
continue
if key == "allOf": # pragma: no branch
# If 'allOf' is present, we include them as required fields
required_fields.extend(sub_item for sub_item in val if isinstance(sub_item, str))
value = required_fields
return value
@field_validator("type", mode="before")
def validate_null_type(cls, value: Any) -> Any: # noqa: N805
"""Validate and convert unquoted null type to string "null"."""
# TODO[openapi]: This should be supported only for OpenAPI 3.1+
# See: https://github.com/koxudaxi/datamodel-code-generator/issues/2477#issuecomment-3192480591
if value is None:
value = "null"
if isinstance(value, list) and None in value:
value = [v if v is not None else "null" for v in value]
return value
items: Optional[Union[list[JsonSchemaObject], JsonSchemaObject, bool]] = None # noqa: UP007, UP045
prefixItems: Optional[list[JsonSchemaObject]] = None # noqa: N815, UP045
uniqueItems: Optional[bool] = None # noqa: N815, UP045
type: Optional[Union[str, list[str]]] = None # noqa: UP007, UP045
format: Optional[str] = None # noqa: UP045
pattern: Optional[str] = None # noqa: UP045
minLength: Optional[int] = None # noqa: N815,UP045
maxLength: Optional[int] = None # noqa: N815,UP045
minimum: Optional[UnionIntFloat] = None # noqa: UP045
maximum: Optional[UnionIntFloat] = None # noqa: UP045
minItems: Optional[int] = None # noqa: N815,UP045
maxItems: Optional[int] = None # noqa: N815,UP045
multipleOf: Optional[float] = None # noqa: N815, UP045
exclusiveMaximum: Optional[Union[float, bool]] = None # noqa: N815, UP007, UP045
exclusiveMinimum: Optional[Union[float, bool]] = None # noqa: N815, UP007, UP045
additionalProperties: Optional[Union[JsonSchemaObject, bool]] = None # noqa: N815, UP007, UP045
unevaluatedProperties: Optional[Union[JsonSchemaObject, bool]] = None # noqa: N815, UP007, UP045
patternProperties: Optional[dict[str, Union[JsonSchemaObject, bool]]] = None # noqa: N815, UP007, UP045
propertyNames: Optional[JsonSchemaObject] = None # noqa: N815, UP045
oneOf: list[JsonSchemaObject] = Field(default_factory=list) # noqa: N815
anyOf: list[JsonSchemaObject] = Field(default_factory=list) # noqa: N815
allOf: list[JsonSchemaObject] = Field(default_factory=list) # noqa: N815
enum: list[Any] = Field(default_factory=list)
writeOnly: Optional[bool] = None # noqa: N815, UP045
readOnly: Optional[bool] = None # noqa: N815, UP045
properties: Optional[dict[str, Union[JsonSchemaObject, bool]]] = None # noqa: UP007, UP045
required: list[str] = Field(default_factory=list)
ref: Optional[str] = Field(default=None, alias="$ref") # noqa: UP045
recursiveRef: Optional[str] = Field(default=None, alias="$recursiveRef") # noqa: N815, UP045
recursiveAnchor: Optional[bool] = Field(default=None, alias="$recursiveAnchor") # noqa: N815, UP045
dynamicRef: Optional[str] = Field(default=None, alias="$dynamicRef") # noqa: N815, UP045
dynamicAnchor: Optional[str] = Field(default=None, alias="$dynamicAnchor") # noqa: N815, UP045
nullable: Optional[bool] = None # noqa: UP045
x_enum_varnames: list[str] = Field(default_factory=list, alias="x-enum-varnames")
x_enum_names: list[str] = Field(default_factory=list, alias="x-enumNames")
x_enum_field_as_literal: Optional[bool] = Field(default=None, alias="x-enum-field-as-literal") # noqa: UP045
description: Optional[str] = None # noqa: UP045
title: Optional[str] = None # noqa: UP045
example: Any = None
examples: Any = None
default: Any = None
id: Optional[str] = Field(default=None, alias="$id") # noqa: UP045
custom_type_path: Optional[str] = Field(default=None, alias="customTypePath") # noqa: UP045
custom_base_path: str | list[str] | None = Field(default=None, alias="customBasePath")
extras: dict[str, Any] = Field(alias=__extra_key__, default_factory=dict)
discriminator: Optional[Union[Discriminator, str]] = None # noqa: UP007, UP045
model_config = ConfigDict( # ty: ignore
arbitrary_types_allowed=True,
ignored_types=(cached_property,),
)
def __init__(self, **data: Any) -> None:
"""Initialize JsonSchemaObject with extra fields handling."""
super().__init__(**data)
# Restore extras from alias key (for dict -> parse_obj round-trip)
alias_extras = data.get(self.__extra_key__, {})
# Collect custom keys from raw data
raw_extras = {k: v for k, v in data.items() if k not in EXCLUDE_FIELD_KEYS}
# Merge: raw_extras takes precedence (original data is the source of truth)
self.extras = {**alias_extras, **raw_extras}
if "const" in alias_extras: # pragma: no cover
self.extras["const"] = alias_extras["const"]
# Support x-propertyNames extension for OpenAPI 3.0
if "x-propertyNames" in self.extras and self.propertyNames is None:
x_prop_names = self.extras.pop("x-propertyNames")
if isinstance(x_prop_names, dict):
self.propertyNames = JsonSchemaObject.model_validate(x_prop_names)
@cached_property
def is_object(self) -> bool:
"""Check if the schema represents an object type."""
return self.properties is not None or (
self.type == "object" and not self.allOf and not self.oneOf and not self.anyOf and not self.ref
)
@cached_property
def is_array(self) -> bool:
"""Check if the schema represents an array type."""
return self.items is not None or self.prefixItems is not None or self.type == "array"
@cached_property
def ref_object_name(self) -> str: # pragma: no cover
"""Extract the object name from the reference path."""
return (self.ref or "").rsplit("/", 1)[-1]
@field_validator("items", mode="before")
def validate_items(cls, values: Any) -> Any: # noqa: N805
"""Validate items field, converting empty dicts to None."""
# this condition expects empty dict
return values or None
@cached_property
def has_default(self) -> bool:
"""Check if the schema has a default value or default factory."""
return "default" in self.model_fields_set or "default_factory" in self.extras
@cached_property
def has_constraint(self) -> bool:
"""Check if the schema has any constraint fields set."""
return bool(self.__constraint_fields__ & self.model_fields_set)
@cached_property
def ref_type(self) -> JSONReference | None:
"""Get the reference type (LOCAL, REMOTE, or URL)."""
if self.ref:
return get_ref_type(self.ref)
return None # pragma: no cover
@cached_property
def type_has_null(self) -> bool:
"""Check if the type list or oneOf/anyOf contains null."""
if isinstance(self.type, list) and "null" in self.type:
return True
for item in self.oneOf + self.anyOf:
if item.type == "null":
return True
if isinstance(item.type, list) and "null" in item.type:
return True
return False
@cached_property
def has_multiple_types(self) -> bool:
"""Check if the type is a list with multiple non-null types."""
if not isinstance(self.type, list):
return False
non_null_types = [t for t in self.type if t != "null"]
return len(non_null_types) > 1
@cached_property
def has_ref_with_schema_keywords(self) -> bool:
"""Check if schema has $ref combined with schema-affecting keywords.
Metadata-only keywords (title, description, etc.) are excluded
as they don't affect the schema structure. OpenAPI/JSON Schema
extension fields (x-*) are also excluded as they are vendor
extensions and don't affect the core schema structure.
"""
if not self.ref:
return False
other_fields = self.model_fields_set - {"ref"}
schema_affecting_fields = other_fields - self.__metadata_only_fields__ - {"extras"}
if self.extras:
schema_affecting_extras = {k for k in self.extras if k in self.__schema_affecting_extras__}
if schema_affecting_extras:
schema_affecting_fields |= {"extras"}
return bool(schema_affecting_fields)
@cached_property
def is_ref_with_nullable_only(self) -> bool:
"""Check if schema has $ref with only nullable: true (no other schema-affecting keywords).
This is used to avoid creating duplicate models when a $ref is combined
with nullable: true. In such cases, the reference should be used directly
with Optional type annotation instead of merging schemas.
"""
if not self.ref or self.nullable is not True:
return False
other_fields = self.model_fields_set - {"ref", "nullable"} - self.__metadata_only_fields__ - {"extras"}
if other_fields:
return False
if self.extras:
schema_affecting_extras = {k for k in self.extras if k in self.__schema_affecting_extras__}
if schema_affecting_extras:
return False
return True
@lru_cache
def get_ref_type(ref: str) -> JSONReference:
"""Determine the type of reference (LOCAL, REMOTE, or URL)."""
if ref[0] == "#":
return JSONReference.LOCAL
if is_url(ref):
return JSONReference.URL
return JSONReference.REMOTE
def _get_type(
type_: str,
format__: str | None = None,
data_formats: dict[str, dict[str, Types]] | None = None,
) -> Types:
"""Get the appropriate Types enum for a given JSON Schema type and format."""
if data_formats is None: # pragma: no cover
data_formats = json_schema_data_formats
if type_ not in data_formats:
return Types.any
if (type_format := data_formats[type_].get("default" if format__ is None else format__)) is not None:
return type_format
warn(f"format of {format__!r} not understood for {type_!r} - using default", stacklevel=2)
return data_formats[type_]["default"]
JsonSchemaObject.model_rebuild()
DEFAULT_FIELD_KEYS: set[str] = {
"example",
"examples",
"description",
"discriminator",
"title",
"const",
"default_factory",
"deprecated",
}
EXCLUDE_FIELD_KEYS_IN_JSON_SCHEMA: set[str] = {
"readOnly",
"writeOnly",
}
EXCLUDE_FIELD_KEYS = (
set(JsonSchemaObject.get_fields()) # ty: ignore
- DEFAULT_FIELD_KEYS
- EXCLUDE_FIELD_KEYS_IN_JSON_SCHEMA
) | {
"$id",
"$ref",
"$recursiveRef",
"$recursiveAnchor",
"$dynamicRef",
"$dynamicAnchor",
JsonSchemaObject.__extra_key__,
}
@snooper_to_methods() # noqa: PLR0904
class JsonSchemaParser(Parser["JSONSchemaParserConfig", "JsonSchemaFeatures"]):
"""Parser for JSON Schema, JSON, YAML, Dict, and CSV formats."""
SCHEMA_PATHS: ClassVar[list[str]] = ["#/definitions", "#/$defs"]
SCHEMA_OBJECT_TYPE: ClassVar[type[JsonSchemaObject]] = JsonSchemaObject
COMPATIBLE_PYTHON_TYPES: ClassVar[dict[str, frozenset[str]]] = {
"string": frozenset({"str", "String"}),
"integer": frozenset({"int", "Integer"}),
"number": frozenset({"float", "int", "Number"}),
"boolean": frozenset({"bool", "Boolean"}),
"array": frozenset({
"list",
"List",
"set",
"Set",
"frozenset",
"FrozenSet",
"Sequence",
"MutableSequence",
"tuple",
"Tuple",
"AbstractSet",
"MutableSet",
}),
"object": frozenset({"dict", "Dict", "Mapping", "MutableMapping", "TypedDict"}),
}
PYTHON_TYPE_IMPORTS: ClassVar[dict[str, Import]] = {
# collections.abc
"Callable": Import.from_full_path("collections.abc.Callable"),
"Iterable": Import.from_full_path("collections.abc.Iterable"),
"Iterator": Import.from_full_path("collections.abc.Iterator"),
"Generator": Import.from_full_path("collections.abc.Generator"),
"Awaitable": Import.from_full_path("collections.abc.Awaitable"),
"Coroutine": Import.from_full_path("collections.abc.Coroutine"),
"AsyncIterable": Import.from_full_path("collections.abc.AsyncIterable"),
"AsyncIterator": Import.from_full_path("collections.abc.AsyncIterator"),
"AsyncGenerator": Import.from_full_path("collections.abc.AsyncGenerator"),
"Mapping": Import.from_full_path("collections.abc.Mapping"),
"MutableMapping": Import.from_full_path("collections.abc.MutableMapping"),
"Sequence": Import.from_full_path("collections.abc.Sequence"),
"MutableSequence": Import.from_full_path("collections.abc.MutableSequence"),
"Set": Import.from_full_path("collections.abc.Set"),
"MutableSet": Import.from_full_path("collections.abc.MutableSet"),
"Collection": Import.from_full_path("collections.abc.Collection"),
"Reversible": Import.from_full_path("collections.abc.Reversible"),
# collections
"defaultdict": Import.from_full_path("collections.defaultdict"),
"OrderedDict": Import.from_full_path("collections.OrderedDict"),
"Counter": Import.from_full_path("collections.Counter"),
"deque": Import.from_full_path("collections.deque"),
"ChainMap": Import.from_full_path("collections.ChainMap"),
# re
"Pattern": Import.from_full_path("re.Pattern"),
"Match": Import.from_full_path("re.Match"),
# typing
"Any": Import.from_full_path("typing.Any"),
"Type": Import.from_full_path("typing.Type"),
"Union": Import.from_full_path("typing.Union"),
"Optional": Import.from_full_path("typing.Optional"),
"Literal": Import.from_full_path("typing.Literal"),
"Final": Import.from_full_path("typing.Final"),
"ClassVar": Import.from_full_path("typing.ClassVar"),
"Annotated": Import.from_full_path("typing.Annotated"),
"TypeVar": Import.from_full_path("typing.TypeVar"),
"TypeAlias": Import.from_full_path("typing.TypeAlias"),
"Never": Import.from_full_path("typing.Never"),
"NoReturn": Import.from_full_path("typing.NoReturn"),
"Self": Import.from_full_path("typing.Self"),
"LiteralString": Import.from_full_path("typing.LiteralString"),
"TypeGuard": Import.from_full_path("typing.TypeGuard"),
# pathlib
"Path": Import.from_full_path("pathlib.Path"),
"PurePath": Import.from_full_path("pathlib.PurePath"),
# decimal
"Decimal": Import.from_full_path("decimal.Decimal"),
# uuid
"UUID": Import.from_full_path("uuid.UUID"),
# datetime
"datetime": Import.from_full_path("datetime.datetime"),
"date": Import.from_full_path("datetime.date"),
"time": Import.from_full_path("datetime.time"),
"timedelta": Import.from_full_path("datetime.timedelta"),
# enum
"Enum": Import.from_full_path("enum.Enum"),
"IntEnum": Import.from_full_path("enum.IntEnum"),
"StrEnum": Import.from_full_path("enum.StrEnum"),
"Flag": Import.from_full_path("enum.Flag"),
"IntFlag": Import.from_full_path("enum.IntFlag"),
"BaseModel": Import.from_full_path("pydantic.BaseModel"),
}
# Types that require x-python-type override regardless of schema type
PYTHON_TYPE_OVERRIDE_ALWAYS: ClassVar[frozenset[str]] = frozenset({
"Callable",
"Type",
# collections types that have no JSON Schema equivalent
"defaultdict",
"OrderedDict",
"Counter",
"deque",
"ChainMap",
})
_config_class_name: ClassVar[str] = "JSONSchemaParserConfig"
def __init__(
self,
source: str | Path | list[Path] | ParseResult,
*,
config: JSONSchemaParserConfig | None = None,
**options: Unpack[JSONSchemaParserConfigDict],
) -> None:
"""Initialize the JSON Schema parser with configuration options."""
if config is None and options.get("target_datetime_class") is None:
options["target_datetime_class"] = DatetimeClassType.Awaredatetime
super().__init__(source=source, config=config, **options)
self.remote_object_cache: DefaultPutDict[str, dict[str, YamlValue]] = DefaultPutDict()
self.raw_obj: dict[str, YamlValue] = {}
self._root_id: Optional[str] = None # noqa: UP045
self._root_id_base_path: Optional[str] = None # noqa: UP045
# Normalize external ref mapping paths to absolute for reliable matching
raw_mapping = self.config.external_ref_mapping
self._external_ref_mapping: dict[str, str] = {}
if raw_mapping:
for file_path, python_package in raw_mapping.items():
if is_url(file_path):
self._external_ref_mapping[file_path] = python_package
else:
abs_path = str((self.base_path / file_path).resolve())
self._external_ref_mapping[abs_path] = python_package
self.reserved_refs: defaultdict[tuple[str, ...], set[str]] = defaultdict(set)
self._dynamic_anchor_index: dict[tuple[str, ...], dict[str, str]] = {}
self._recursive_anchor_index: dict[tuple[str, ...], list[str]] = {}
self.field_keys: set[str] = {
*DEFAULT_FIELD_KEYS,
*self.field_extra_keys,
*self.field_extra_keys_without_x_prefix,
}
self._circular_ref_cache: dict[str, bool] = {}
if self.data_model_field_type.can_have_extra_keys:
self.get_field_extra_key: Callable[[str], str] = (
lambda key: self.model_resolver.get_valid_field_name_and_alias(
key, model_type=self.field_name_model_type
)[0]
)
else:
self.get_field_extra_key = lambda key: key
def get_field_extras(self, obj: JsonSchemaObject) -> dict[str, Any]:
"""Extract extra field metadata from a JSON Schema object."""
if self.field_include_all_keys:
extras = {
self.get_field_extra_key(k.lstrip("x-") if k in self.field_extra_keys_without_x_prefix else k): v
for k, v in obj.extras.items()
}
else:
extras = {
self.get_field_extra_key(k.lstrip("x-") if k in self.field_extra_keys_without_x_prefix else k): v
for k, v in obj.extras.items()
if k in self.field_keys
}
if self.default_field_extras:
extras.update(self.default_field_extras)
return extras
@cached_property
def _data_formats(self) -> dict[str, dict[str, Types]]:
"""Get data format mappings for this parser type.
Returns all formats for backward compatibility.
OpenAPI-specific formats will be separated in Strict mode (future).
"""
return json_schema_data_formats
def _get_type_with_mappings(self, type_: str, format_: str | None = None) -> Types:
"""Get the Types enum for a given type and format, applying custom type mappings.
Custom mappings from --type-mappings are checked first, then falls back to
the parser's data format mappings.
"""
data_formats = self._data_formats
if self.type_mappings and format_ is not None and (type_, format_) in self.type_mappings:
target_format = self.type_mappings[type_, format_]
for type_formats in data_formats.values():
if target_format in type_formats:
return type_formats[target_format]
if target_format in data_formats:
return data_formats[target_format]["default"]
return _get_type(type_, format_, data_formats)
@cached_property
def schema_paths(self) -> list[tuple[str, list[str]]]:
"""Get schema paths for definitions and defs.
For JsonSchema, uses schema_features.definitions_key to determine
the primary path, with fallback to the alternative in Lenient mode.
OpenAPI subclass uses its own SCHEMA_PATHS (#/components/schemas).
"""
# OpenAPI and other subclasses use their own SCHEMA_PATHS
if self.SCHEMA_PATHS != ["#/definitions", "#/$defs"]:
return [(s, s.lstrip("#/").split("/")) for s in self.SCHEMA_PATHS]
# JsonSchema: use definitions_key from schema_features
primary_key = self.schema_features.definitions_key
primary_path = f"#/{primary_key}"
fallback_key = "$defs" if primary_key == "definitions" else "definitions"
fallback_path = f"#/{fallback_key}"
# Strict mode: only use version-specific path
if self.config.schema_version_mode == VersionMode.Strict:
return [(str(primary_path), [str(primary_key)])]
# Lenient mode (default): check both paths, primary first
return [
(str(primary_path), [str(primary_key)]),
(str(fallback_path), [str(fallback_key)]),
]
@cached_property
def schema_features(self) -> JsonSchemaFeatures:
"""Get schema features based on config or detected version."""
from datamodel_code_generator.parser.schema_version import ( # noqa: PLC0415
JsonSchemaFeatures,
detect_jsonschema_version,
)
config_version = getattr(self.config, "jsonschema_version", None)
if config_version is not None and config_version != JsonSchemaVersion.Auto:
return JsonSchemaFeatures.from_version(config_version)
version = detect_jsonschema_version(self.raw_obj) if self.raw_obj else JsonSchemaVersion.Auto
return JsonSchemaFeatures.from_version(version)
@property
def root_id(self) -> str | None:
"""Get the root $id from the model resolver."""
return self.model_resolver.root_id
@root_id.setter
def root_id(self, value: str | None) -> None:
"""Set the root $id in the model resolver."""
self.model_resolver.set_root_id(value)
def should_parse_enum_as_literal(
self,
obj: JsonSchemaObject,
property_name: str | None = None,
property_obj: JsonSchemaObject | None = None,
) -> bool:
"""Determine if an enum should be parsed as a literal type.
Priority (highest to lowest):
1. x-enum-field-as-literal on the property schema
2. enum_field_as_literal_map matching Model.field or field
3. Global enum_field_as_literal setting
"""
# Check x-enum-field-as-literal on property or obj
target_obj = property_obj if property_obj is not None else obj
if target_obj.x_enum_field_as_literal is not None:
return target_obj.x_enum_field_as_literal
# Check enum_field_as_literal_map for matching keys
if property_name and self.enum_field_as_literal_map and property_name in self.enum_field_as_literal_map:
return self.enum_field_as_literal_map[property_name] == "literal"
# Fall back to global setting
if self.enum_field_as_literal == LiteralType.All:
return True
if self.enum_field_as_literal == LiteralType.One:
return len(obj.enum) == 1
return False
@classmethod
def _extract_const_enum_from_combined( # noqa: PLR0912
cls, items: list[JsonSchemaObject], parent_type: str | list[str] | None
) -> tuple[list[Any], list[str], str | None, bool] | None:
"""Extract enum values from oneOf/anyOf const pattern."""
enum_values: list[Any] = []
varnames: list[str] = []
nullable = False
inferred_type: str | None = None
for item in items:
if item.type == "null" and "const" not in item.extras:
nullable = True
continue
if "const" not in item.extras:
return None
if item.ref or item.properties or item.oneOf or item.anyOf or item.allOf:
return None
const_value = item.extras["const"]
enum_values.append(const_value)
if item.title:
varnames.append(item.title)
if inferred_type is None and const_value is not None:
match const_value:
case str():
inferred_type = "string"
case bool(): # bool must come before int (bool is subclass of int)
inferred_type = "boolean"
case int():
inferred_type = "integer"
case float():
inferred_type = "number"
if not enum_values: # pragma: no cover
return None
final_type: str | None
match parent_type:
case str():
final_type = parent_type
case list():
non_null_types = [t for t in parent_type if t != "null"]
final_type = non_null_types[0] if non_null_types else inferred_type
if "null" in parent_type:
nullable = True
case _:
final_type = inferred_type
return (enum_values, varnames, final_type, nullable)
def _create_synthetic_enum_obj(
self,
original: JsonSchemaObject,
enum_values: list[Any],
varnames: list[str],
enum_type: str | None,
nullable: bool, # noqa: FBT001
) -> JsonSchemaObject:
"""Create a synthetic JsonSchemaObject for enum parsing."""
final_enum = [*enum_values, None] if nullable else enum_values
final_varnames = varnames if len(varnames) == len(enum_values) else []
return self.SCHEMA_OBJECT_TYPE(
type=enum_type,
enum=final_enum,
title=original.title,
description=original.description,
**({"x-enum-varnames": final_varnames} | ({"default": original.default} if original.has_default else {})),
)
def is_constraints_field(self, obj: JsonSchemaObject) -> bool:
"""Check if a field should include constraints."""
return obj.is_array or (
self.field_constraints
and not (
obj.ref
or obj.anyOf
or obj.oneOf
or obj.allOf
or obj.is_object
or (obj.enum and not self.ignore_enum_constraints)
)
)
def _is_fixed_length_tuple(self, obj: JsonSchemaObject) -> bool:
"""Check if an array field represents a fixed-length tuple."""
if obj.prefixItems is not None and obj.items in {None, False}:
return obj.minItems == obj.maxItems == len(obj.prefixItems)
if self.use_tuple_for_fixed_items and isinstance(obj.items, list) and obj.prefixItems is None:
return obj.minItems == obj.maxItems == len(obj.items)
return False
def _resolve_field_flag(self, obj: JsonSchemaObject, flag: Literal["readOnly", "writeOnly"]) -> bool:
"""Resolve a field flag (readOnly/writeOnly) from direct value, $ref, and compositions."""
if getattr(obj, flag) is True:
return True
if (
self.read_only_write_only_model_type
and obj.ref
and self._resolve_field_flag(self._load_ref_schema_object(obj.ref), flag)
):
return True
return any(self._resolve_field_flag(sub, flag) for sub in obj.allOf + obj.anyOf + obj.oneOf)
def _collect_all_fields_for_request_response(
self,
fields: list[DataModelFieldBase],
base_classes: list[Reference] | None,
) -> list[DataModelFieldBase]:
"""Collect all fields including those from base classes for Request/Response models.
Order: parent → child, with child fields overriding parent fields of the same name.
"""
all_fields: list[DataModelFieldBase] = []
visited: set[str] = set()
def iter_from_schema(obj: JsonSchemaObject, path: list[str]) -> Iterable[DataModelFieldBase]:
module_name = get_module_name(path[-1] if path else "", None, treat_dot_as_module=self.treat_dot_as_module)
if obj.properties:
yield from self.parse_object_fields(obj, path, module_name)
for item in obj.allOf:
if item.ref:
if item.ref in visited: # pragma: no cover
continue
visited.add(item.ref)
yield from iter_from_schema(self._load_ref_schema_object(item.ref), path)
elif item.properties:
yield from self.parse_object_fields(item, path, module_name)
for base_ref in base_classes or []:
if isinstance(base_ref.source, DataModel):
all_fields.extend(base_ref.source.iter_all_fields(visited))
elif base_ref.path not in visited: # pragma: no cover
visited.add(base_ref.path)
all_fields.extend(iter_from_schema(self._load_ref_schema_object(base_ref.path), []))
all_fields.extend(fields)
deduplicated: dict[str, DataModelFieldBase] = {}
for field in all_fields:
key = field.original_name or field.name
if key: # pragma: no cover
deduplicated[key] = field.copy_deep()
return list(deduplicated.values())
def _should_generate_separate_models(
self,
fields: list[DataModelFieldBase],
base_classes: list[Reference] | None,
) -> bool:
"""Determine if Request/Response models should be generated."""
if self.read_only_write_only_model_type is None:
return False
all_fields = self._collect_all_fields_for_request_response(fields, base_classes)
return any(field.read_only or field.write_only for field in all_fields)
def _should_generate_base_model(self, *, generates_separate_models: bool = False) -> bool:
"""Determine if Base model should be generated."""
if getattr(self, "_force_base_model_generation", False):
return True
if self.read_only_write_only_model_type is None:
return True
if self.read_only_write_only_model_type == ReadOnlyWriteOnlyModelType.All:
return True
return not generates_separate_models
def _ref_schema_generates_variant(self, ref_path: str, suffix: str) -> bool:
"""Check if a referenced schema will generate a specific variant (Request or Response).