mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-01-11 23:00:51 +08:00
- Run comfyui workflows directly inside other python applications using EmbeddedComfyClient. - Optional telemetry in prompts and models using anonymity preserving Plausible self-hosted or hosted. - Better OpenAPI schema - Basic support for distributed ComfyUI backends. Limitations: no progress reporting, no easy way to start your own distributed backend, requires RabbitMQ as a message broker.
1446 lines
53 KiB
Python
1446 lines
53 KiB
Python
# coding: utf-8
|
|
|
|
"""
|
|
comfyui
|
|
No description provided (generated by Openapi JSON Schema Generator https://github.com/openapi-json-schema-tools/openapi-json-schema-generator) # noqa: E501
|
|
The version of the OpenAPI document: 0.0.1
|
|
Generated by: https://github.com/openapi-json-schema-tools/openapi-json-schema-generator
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import collections
|
|
import dataclasses
|
|
import decimal
|
|
import re
|
|
import sys
|
|
import types
|
|
import typing
|
|
import uuid
|
|
|
|
import typing_extensions
|
|
|
|
from comfy.api import exceptions
|
|
from comfy.api.configurations import schema_configuration
|
|
|
|
from . import format, original_immutabledict
|
|
|
|
immutabledict = original_immutabledict.immutabledict
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class ValidationMetadata:
|
|
"""
|
|
A class storing metadata that is needed to validate OpenApi Schema payloads
|
|
"""
|
|
path_to_item: typing.Tuple[typing.Union[str, int], ...]
|
|
configuration: schema_configuration.SchemaConfiguration
|
|
validated_path_to_schemas: typing.Mapping[
|
|
typing.Tuple[typing.Union[str, int], ...],
|
|
typing.Mapping[type, None]
|
|
] = dataclasses.field(default_factory=dict)
|
|
seen_classes: typing.FrozenSet[type] = frozenset()
|
|
|
|
def validation_ran_earlier(self, cls: type) -> bool:
|
|
validated_schemas: typing.Union[typing.Mapping[type, None], None] = self.validated_path_to_schemas.get(self.path_to_item)
|
|
if validated_schemas and cls in validated_schemas:
|
|
return True
|
|
if cls in self.seen_classes:
|
|
return True
|
|
return False
|
|
|
|
def _raise_validation_error_message(value, constraint_msg, constraint_value, path_to_item, additional_txt=""):
|
|
raise exceptions.ApiValueError(
|
|
"Invalid value `{value}`, {constraint_msg} `{constraint_value}`{additional_txt} at {path_to_item}".format(
|
|
value=value,
|
|
constraint_msg=constraint_msg,
|
|
constraint_value=constraint_value,
|
|
additional_txt=additional_txt,
|
|
path_to_item=path_to_item,
|
|
)
|
|
)
|
|
|
|
|
|
class SchemaValidator:
|
|
__excluded_cls_properties = {
|
|
'__module__',
|
|
'__dict__',
|
|
'__weakref__',
|
|
'__doc__',
|
|
'__annotations__',
|
|
'default', # excluded because it has no impact on validation
|
|
'type_to_output_cls', # used to pluck the output class for instantiation
|
|
}
|
|
|
|
@classmethod
|
|
def _validate(
|
|
cls,
|
|
arg,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> PathToSchemasType:
|
|
"""
|
|
SchemaValidator validate
|
|
All keyword validation except for type checking was done in calling stack frames
|
|
If those validations passed, the validated classes are collected in path_to_schemas
|
|
"""
|
|
cls_schema = cls()
|
|
json_schema_data = {
|
|
k: v
|
|
for k, v in vars(cls_schema).items()
|
|
if k not in cls.__excluded_cls_properties
|
|
and k
|
|
not in validation_metadata.configuration.disabled_json_schema_python_keywords
|
|
}
|
|
contains_path_to_schemas = []
|
|
path_to_schemas: PathToSchemasType = {}
|
|
if 'contains' in vars(cls_schema):
|
|
contains_path_to_schemas = _get_contains_path_to_schemas(
|
|
arg,
|
|
vars(cls_schema)['contains'],
|
|
validation_metadata,
|
|
path_to_schemas
|
|
)
|
|
if_path_to_schemas = None
|
|
if 'if_' in vars(cls_schema):
|
|
if_path_to_schemas = _get_if_path_to_schemas(
|
|
arg,
|
|
vars(cls_schema)['if_'],
|
|
validation_metadata,
|
|
)
|
|
validated_pattern_properties: typing.Optional[PathToSchemasType] = None
|
|
if 'pattern_properties' in vars(cls_schema):
|
|
validated_pattern_properties = _get_validated_pattern_properties(
|
|
arg,
|
|
vars(cls_schema)['pattern_properties'],
|
|
cls,
|
|
validation_metadata
|
|
)
|
|
prefix_items_length = 0
|
|
if 'prefix_items' in vars(cls_schema):
|
|
prefix_items_length = len(vars(cls_schema)['prefix_items'])
|
|
for keyword, val in json_schema_data.items():
|
|
used_val: typing.Any
|
|
if keyword in {'contains', 'min_contains', 'max_contains'}:
|
|
used_val = (val, contains_path_to_schemas)
|
|
elif keyword == 'items':
|
|
used_val = (val, prefix_items_length)
|
|
elif keyword in {'unevaluated_items', 'unevaluated_properties'}:
|
|
used_val = (val, path_to_schemas)
|
|
elif keyword in {'types'}:
|
|
format: typing.Optional[str] = vars(cls_schema).get('format', None)
|
|
used_val = (val, format)
|
|
elif keyword in {'pattern_properties', 'additional_properties'}:
|
|
used_val = (val, validated_pattern_properties)
|
|
elif keyword in {'if_', 'then', 'else_'}:
|
|
used_val = (val, if_path_to_schemas)
|
|
else:
|
|
used_val = val
|
|
validator = json_schema_keyword_to_validator[keyword]
|
|
|
|
other_path_to_schemas = validator(
|
|
arg,
|
|
used_val,
|
|
cls,
|
|
validation_metadata,
|
|
)
|
|
if other_path_to_schemas:
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
|
|
base_class = type(arg)
|
|
if validation_metadata.path_to_item not in path_to_schemas:
|
|
path_to_schemas[validation_metadata.path_to_item] = dict()
|
|
path_to_schemas[validation_metadata.path_to_item][base_class] = None
|
|
path_to_schemas[validation_metadata.path_to_item][cls] = None
|
|
return path_to_schemas
|
|
|
|
PathToSchemasType = typing.Dict[
|
|
typing.Tuple[typing.Union[str, int], ...],
|
|
typing.Dict[
|
|
typing.Union[
|
|
typing.Type[SchemaValidator],
|
|
typing.Type[str],
|
|
typing.Type[int],
|
|
typing.Type[float],
|
|
typing.Type[bool],
|
|
typing.Type[None],
|
|
typing.Type[immutabledict],
|
|
typing.Type[tuple]
|
|
],
|
|
None
|
|
]
|
|
]
|
|
|
|
def _get_class(
|
|
item_cls: typing.Union[types.FunctionType, staticmethod, typing.Type[SchemaValidator]],
|
|
local_namespace: typing.Optional[dict] = None
|
|
) -> typing.Type[SchemaValidator]:
|
|
if isinstance(item_cls, typing._GenericAlias): # type: ignore
|
|
# petstore_api.schemas.StrSchema[~U] -> petstore_api.schemas.StrSchema
|
|
origin_cls = typing.get_origin(item_cls)
|
|
if origin_cls is None:
|
|
raise ValueError('origin class must not be None')
|
|
return origin_cls
|
|
elif isinstance(item_cls, types.FunctionType):
|
|
# referenced schema
|
|
return item_cls()
|
|
elif isinstance(item_cls, staticmethod):
|
|
# referenced schema
|
|
return item_cls.__func__()
|
|
elif isinstance(item_cls, type):
|
|
return item_cls
|
|
elif isinstance(item_cls, typing.ForwardRef):
|
|
if sys.version_info < (3, 9):
|
|
return item_cls._evaluate(None, local_namespace)
|
|
return item_cls._evaluate(None, local_namespace, set())
|
|
raise ValueError('invalid class value passed in')
|
|
|
|
|
|
def update(d: dict, u: dict):
|
|
"""
|
|
Adds u to d
|
|
Where each dict is collections.defaultdict(dict)
|
|
"""
|
|
if not u:
|
|
return d
|
|
for k, v in u.items():
|
|
if k not in d:
|
|
d[k] = v
|
|
else:
|
|
d[k].update(v)
|
|
|
|
|
|
def add_deeper_validated_schemas(validation_metadata: ValidationMetadata, path_to_schemas: dict):
|
|
# this is called if validation_ran_earlier and current and deeper locations need to be added
|
|
current_path_to_item = validation_metadata.path_to_item
|
|
other_path_to_schemas = {}
|
|
for path_to_item, schemas in validation_metadata.validated_path_to_schemas.items():
|
|
if len(path_to_item) < len(current_path_to_item):
|
|
continue
|
|
path_begins_with_current_path = path_to_item[:len(current_path_to_item)] == current_path_to_item
|
|
if path_begins_with_current_path:
|
|
other_path_to_schemas[path_to_item] = schemas
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
|
|
|
|
def __get_valid_classes_phrase(input_classes):
|
|
"""Returns a string phrase describing what types are allowed"""
|
|
all_classes = list(input_classes)
|
|
all_classes = sorted(all_classes, key=lambda cls: cls.__name__)
|
|
all_class_names = [cls.__name__ for cls in all_classes]
|
|
if len(all_class_names) == 1:
|
|
return "is {0}".format(all_class_names[0])
|
|
return "is one of [{0}]".format(", ".join(all_class_names))
|
|
|
|
|
|
def __type_error_message(
|
|
var_value=None, var_name=None, valid_classes=None, key_type=None
|
|
):
|
|
"""
|
|
Keyword Args:
|
|
var_value (any): the variable which has the type_error
|
|
var_name (str): the name of the variable which has the typ error
|
|
valid_classes (tuple): the accepted classes for current_item's
|
|
value
|
|
key_type (bool): False if our value is a value in a dict
|
|
True if it is a key in a dict
|
|
False if our item is an item in a tuple
|
|
"""
|
|
key_or_value = "value"
|
|
if key_type:
|
|
key_or_value = "key"
|
|
valid_classes_phrase = __get_valid_classes_phrase(valid_classes)
|
|
msg = "Invalid type. Required {0} type {1} and " "passed type was {2}".format(
|
|
key_or_value,
|
|
valid_classes_phrase,
|
|
type(var_value).__name__,
|
|
)
|
|
return msg
|
|
|
|
|
|
def __get_type_error(var_value, path_to_item, valid_classes, key_type=False):
|
|
error_msg = __type_error_message(
|
|
var_name=path_to_item[-1],
|
|
var_value=var_value,
|
|
valid_classes=valid_classes,
|
|
key_type=key_type,
|
|
)
|
|
return exceptions.ApiTypeError(
|
|
error_msg,
|
|
path_to_item=path_to_item,
|
|
valid_classes=valid_classes,
|
|
key_type=key_type,
|
|
)
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class PatternInfo:
|
|
pattern: str
|
|
flags: typing.Optional[re.RegexFlag] = None
|
|
|
|
|
|
def validate_types(
|
|
arg: typing.Any,
|
|
allowed_types_format: typing.Tuple[typing.Set[typing.Type], typing.Optional[str]],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
allowed_types = allowed_types_format[0]
|
|
if type(arg) not in allowed_types:
|
|
raise __get_type_error(
|
|
arg,
|
|
validation_metadata.path_to_item,
|
|
allowed_types,
|
|
key_type=False,
|
|
)
|
|
if isinstance(arg, bool) or not isinstance(arg, (int, float)):
|
|
return None
|
|
format = allowed_types_format[1]
|
|
if format and format == 'int' and arg != int(arg):
|
|
# there is a json schema test where 1.0 validates as an integer
|
|
raise exceptions.ApiValueError(
|
|
"Invalid non-integer value '{}' for type {} at {}".format(
|
|
arg, format, validation_metadata.path_to_item
|
|
)
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_enum(
|
|
arg: typing.Any,
|
|
enum_value_to_name: typing.Dict[typing.Any, str],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if arg not in enum_value_to_name:
|
|
raise exceptions.ApiValueError("Invalid value {} passed in to {}, allowed_values={}".format(arg, cls, enum_value_to_name.keys()))
|
|
return None
|
|
|
|
|
|
def validate_unique_items(
|
|
arg: typing.Any,
|
|
unique_items_value: bool,
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not unique_items_value or not isinstance(arg, tuple):
|
|
return None
|
|
if len(arg) == len(set(arg)):
|
|
return None
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="duplicate items were found, and the tuple must not contain duplicates because",
|
|
constraint_value='unique_items==True',
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
|
|
|
|
def validate_min_items(
|
|
arg: typing.Any,
|
|
min_items: int,
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, tuple):
|
|
return None
|
|
if len(arg) < min_items:
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="number of items must be greater than or equal to",
|
|
constraint_value=min_items,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_max_items(
|
|
arg: typing.Any,
|
|
max_items: int,
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, tuple):
|
|
return None
|
|
if len(arg) > max_items:
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="number of items must be less than or equal to",
|
|
constraint_value=max_items,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_min_properties(
|
|
arg: typing.Any,
|
|
min_properties: int,
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
if len(arg) < min_properties:
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="number of properties must be greater than or equal to",
|
|
constraint_value=min_properties,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_max_properties(
|
|
arg: typing.Any,
|
|
max_properties: int,
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
if len(arg) > max_properties:
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="number of properties must be less than or equal to",
|
|
constraint_value=max_properties,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_min_length(
|
|
arg: typing.Any,
|
|
min_length: int,
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, str):
|
|
return None
|
|
if len(arg) < min_length:
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="length must be greater than or equal to",
|
|
constraint_value=min_length,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_max_length(
|
|
arg: typing.Any,
|
|
max_length: int,
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, str):
|
|
return None
|
|
if len(arg) > max_length:
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="length must be less than or equal to",
|
|
constraint_value=max_length,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_inclusive_minimum(
|
|
arg: typing.Any,
|
|
inclusive_minimum: typing.Union[int, float],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, (int, float)):
|
|
return None
|
|
if arg < inclusive_minimum:
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="must be a value greater than or equal to",
|
|
constraint_value=inclusive_minimum,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_exclusive_minimum(
|
|
arg: typing.Any,
|
|
exclusive_minimum: typing.Union[int, float],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, (int, float)):
|
|
return None
|
|
if arg <= exclusive_minimum:
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="must be a value greater than",
|
|
constraint_value=exclusive_minimum,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_inclusive_maximum(
|
|
arg: typing.Any,
|
|
inclusive_maximum: typing.Union[int, float],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, (int, float)):
|
|
return None
|
|
if arg > inclusive_maximum:
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="must be a value less than or equal to",
|
|
constraint_value=inclusive_maximum,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_exclusive_maximum(
|
|
arg: typing.Any,
|
|
exclusive_maximum: typing.Union[int, float],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, (int, float)):
|
|
return None
|
|
if arg >= exclusive_maximum:
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="must be a value less than",
|
|
constraint_value=exclusive_maximum,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
def validate_multiple_of(
|
|
arg: typing.Any,
|
|
multiple_of: typing.Union[int, float],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, (int, float)):
|
|
return None
|
|
if (not (float(arg) / multiple_of).is_integer()):
|
|
# Note 'multipleOf' will be as good as the floating point arithmetic.
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="value must be a multiple of",
|
|
constraint_value=multiple_of,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_pattern(
|
|
arg: typing.Any,
|
|
pattern_info: PatternInfo,
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, str):
|
|
return None
|
|
flags = pattern_info.flags if pattern_info.flags is not None else 0
|
|
if not re.search(pattern_info.pattern, arg, flags=flags):
|
|
if flags != 0:
|
|
# Don't print the regex flags if the flags are not
|
|
# specified in the OAS document.
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="must match regular expression",
|
|
constraint_value=pattern_info.pattern,
|
|
path_to_item=validation_metadata.path_to_item,
|
|
additional_txt=" with flags=`{}`".format(flags)
|
|
)
|
|
_raise_validation_error_message(
|
|
value=arg,
|
|
constraint_msg="must match regular expression",
|
|
constraint_value=pattern_info.pattern,
|
|
path_to_item=validation_metadata.path_to_item
|
|
)
|
|
return None
|
|
|
|
|
|
__int32_inclusive_minimum = -2147483648
|
|
__int32_inclusive_maximum = 2147483647
|
|
__int64_inclusive_minimum = -9223372036854775808
|
|
__int64_inclusive_maximum = 9223372036854775807
|
|
__float_inclusive_minimum = -3.4028234663852886e+38
|
|
__float_inclusive_maximum = 3.4028234663852886e+38
|
|
__double_inclusive_minimum = -1.7976931348623157E+308
|
|
__double_inclusive_maximum = 1.7976931348623157E+308
|
|
|
|
def __validate_numeric_format(
|
|
arg: typing.Union[int, float],
|
|
format_value: str,
|
|
validation_metadata: ValidationMetadata
|
|
) -> None:
|
|
if format_value[:3] == 'int':
|
|
# there is a json schema test where 1.0 validates as an integer
|
|
if arg != int(arg):
|
|
raise exceptions.ApiValueError(
|
|
"Invalid non-integer value '{}' for type {} at {}".format(
|
|
arg, format, validation_metadata.path_to_item
|
|
)
|
|
)
|
|
if format_value == 'int32':
|
|
if not __int32_inclusive_minimum <= arg <= __int32_inclusive_maximum:
|
|
raise exceptions.ApiValueError(
|
|
"Invalid value '{}' for type int32 at {}".format(arg, validation_metadata.path_to_item)
|
|
)
|
|
return None
|
|
elif format_value == 'int64':
|
|
if not __int64_inclusive_minimum <= arg <= __int64_inclusive_maximum:
|
|
raise exceptions.ApiValueError(
|
|
"Invalid value '{}' for type int64 at {}".format(arg, validation_metadata.path_to_item)
|
|
)
|
|
return None
|
|
return None
|
|
elif format_value in {'float', 'double'}:
|
|
if format_value == 'float':
|
|
if not __float_inclusive_minimum <= arg <= __float_inclusive_maximum:
|
|
raise exceptions.ApiValueError(
|
|
"Invalid value '{}' for type float at {}".format(arg, validation_metadata.path_to_item)
|
|
)
|
|
return None
|
|
# double
|
|
if not __double_inclusive_minimum <= arg <= __double_inclusive_maximum:
|
|
raise exceptions.ApiValueError(
|
|
"Invalid value '{}' for type double at {}".format(arg, validation_metadata.path_to_item)
|
|
)
|
|
return None
|
|
return None
|
|
|
|
|
|
def __validate_string_format(
|
|
arg: str,
|
|
format_value: str,
|
|
validation_metadata: ValidationMetadata
|
|
) -> None:
|
|
if format_value == 'uuid':
|
|
try:
|
|
uuid.UUID(arg)
|
|
return None
|
|
except ValueError:
|
|
raise exceptions.ApiValueError(
|
|
"Invalid value '{}' for type UUID at {}".format(arg, validation_metadata.path_to_item)
|
|
)
|
|
elif format_value == 'number':
|
|
try:
|
|
decimal.Decimal(arg)
|
|
return None
|
|
except decimal.InvalidOperation:
|
|
raise exceptions.ApiValueError(
|
|
"Value cannot be converted to a decimal. "
|
|
"Invalid value '{}' for type decimal at {}".format(arg, validation_metadata.path_to_item)
|
|
)
|
|
elif format_value == 'date':
|
|
try:
|
|
format.DEFAULT_ISOPARSER.parse_isodate_str(arg)
|
|
return None
|
|
except ValueError:
|
|
raise exceptions.ApiValueError(
|
|
"Value does not conform to the required ISO-8601 date format. "
|
|
"Invalid value '{}' for type date at {}".format(arg, validation_metadata.path_to_item)
|
|
)
|
|
elif format_value == 'date-time':
|
|
try:
|
|
format.DEFAULT_ISOPARSER.parse_isodatetime(arg)
|
|
return None
|
|
except ValueError:
|
|
raise exceptions.ApiValueError(
|
|
"Value does not conform to the required ISO-8601 datetime format. "
|
|
"Invalid value '{}' for type datetime at {}".format(arg, validation_metadata.path_to_item)
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_format(
|
|
arg: typing.Union[str, int, float],
|
|
format_value: str,
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
# formats work for strings + numbers
|
|
if isinstance(arg, (int, float)):
|
|
return __validate_numeric_format(
|
|
arg,
|
|
format_value,
|
|
validation_metadata
|
|
)
|
|
elif isinstance(arg, str):
|
|
return __validate_string_format(
|
|
arg,
|
|
format_value,
|
|
validation_metadata
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_required(
|
|
arg: typing.Any,
|
|
required: typing.Set[str],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
missing_req_args = required - arg.keys()
|
|
if missing_req_args:
|
|
missing_required_arguments = list(missing_req_args)
|
|
missing_required_arguments.sort()
|
|
raise exceptions.ApiTypeError(
|
|
"{} is missing {} required argument{}: {}".format(
|
|
cls.__name__,
|
|
len(missing_required_arguments),
|
|
"s" if len(missing_required_arguments) > 1 else "",
|
|
missing_required_arguments
|
|
)
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_items(
|
|
arg: typing.Any,
|
|
item_cls_prefix_items_length: typing.Tuple[typing.Type[SchemaValidator], int],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, tuple):
|
|
return None
|
|
item_cls = _get_class(item_cls_prefix_items_length[0])
|
|
prefix_items_length = item_cls_prefix_items_length[1]
|
|
path_to_schemas: PathToSchemasType = {}
|
|
for i in range(prefix_items_length, len(arg)):
|
|
value = arg[i]
|
|
item_validation_metadata = ValidationMetadata(
|
|
path_to_item=validation_metadata.path_to_item+(i,),
|
|
configuration=validation_metadata.configuration,
|
|
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
|
|
)
|
|
if item_validation_metadata.validation_ran_earlier(item_cls):
|
|
add_deeper_validated_schemas(item_validation_metadata, path_to_schemas)
|
|
continue
|
|
other_path_to_schemas = item_cls._validate(
|
|
value, validation_metadata=item_validation_metadata)
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
return path_to_schemas
|
|
|
|
|
|
def validate_properties(
|
|
arg: typing.Any,
|
|
properties: typing.Mapping[str, typing.Type[SchemaValidator]],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
path_to_schemas: PathToSchemasType = {}
|
|
present_properties = {k: v for k, v, in arg.items() if k in properties}
|
|
module_namespace = vars(sys.modules[cls.__module__])
|
|
for property_name, value in present_properties.items():
|
|
path_to_item = validation_metadata.path_to_item + (property_name,)
|
|
schema = properties[property_name]
|
|
schema = _get_class(schema, module_namespace)
|
|
arg_validation_metadata = ValidationMetadata(
|
|
path_to_item=path_to_item,
|
|
configuration=validation_metadata.configuration,
|
|
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
|
|
)
|
|
if arg_validation_metadata.validation_ran_earlier(schema):
|
|
add_deeper_validated_schemas(arg_validation_metadata, path_to_schemas)
|
|
continue
|
|
other_path_to_schemas = schema._validate(value, validation_metadata=arg_validation_metadata)
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
return path_to_schemas
|
|
|
|
|
|
def validate_additional_properties(
|
|
arg: typing.Any,
|
|
additional_properties_cls_val_pprops: typing.Tuple[
|
|
typing.Type[SchemaValidator],
|
|
typing.Optional[PathToSchemasType]
|
|
],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
schema = _get_class(additional_properties_cls_val_pprops[0])
|
|
path_to_schemas: PathToSchemasType = {}
|
|
cls_schema = cls()
|
|
properties = cls_schema.properties if hasattr(cls_schema, 'properties') else {}
|
|
present_additional_properties = {k: v for k, v, in arg.items() if k not in properties}
|
|
validated_pattern_properties = additional_properties_cls_val_pprops[1]
|
|
for property_name, value in present_additional_properties.items():
|
|
path_to_item = validation_metadata.path_to_item + (property_name,)
|
|
if validated_pattern_properties and path_to_item in validated_pattern_properties:
|
|
continue
|
|
arg_validation_metadata = ValidationMetadata(
|
|
path_to_item=path_to_item,
|
|
configuration=validation_metadata.configuration,
|
|
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
|
|
)
|
|
if arg_validation_metadata.validation_ran_earlier(schema):
|
|
add_deeper_validated_schemas(arg_validation_metadata, path_to_schemas)
|
|
continue
|
|
other_path_to_schemas = schema._validate(value, validation_metadata=arg_validation_metadata)
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
return path_to_schemas
|
|
|
|
|
|
def validate_one_of(
|
|
arg: typing.Any,
|
|
classes: typing.Tuple[typing.Type[SchemaValidator], ...],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> PathToSchemasType:
|
|
oneof_classes = []
|
|
path_to_schemas: PathToSchemasType = collections.defaultdict(dict)
|
|
for schema in classes:
|
|
schema = _get_class(schema)
|
|
if schema in path_to_schemas[validation_metadata.path_to_item]:
|
|
oneof_classes.append(schema)
|
|
continue
|
|
if schema is cls:
|
|
"""
|
|
optimistically assume that cls schema will pass validation
|
|
do not invoke _validate on it because that is recursive
|
|
"""
|
|
oneof_classes.append(schema)
|
|
continue
|
|
if validation_metadata.validation_ran_earlier(schema):
|
|
oneof_classes.append(schema)
|
|
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
|
|
continue
|
|
try:
|
|
path_to_schemas = schema._validate(arg, validation_metadata=validation_metadata)
|
|
except (exceptions.ApiValueError, exceptions.ApiTypeError) as ex:
|
|
# silence exceptions because the code needs to accumulate oneof_classes
|
|
continue
|
|
oneof_classes.append(schema)
|
|
if not oneof_classes:
|
|
raise exceptions.ApiValueError(
|
|
"Invalid inputs given to generate an instance of {}. None "
|
|
"of the oneOf schemas matched the input data.".format(cls)
|
|
)
|
|
elif len(oneof_classes) > 1:
|
|
raise exceptions.ApiValueError(
|
|
"Invalid inputs given to generate an instance of {}. Multiple "
|
|
"oneOf schemas {} matched the inputs, but a max of one is allowed.".format(cls, oneof_classes)
|
|
)
|
|
# exactly one class matches
|
|
return path_to_schemas
|
|
|
|
|
|
def validate_any_of(
|
|
arg: typing.Any,
|
|
classes: typing.Tuple[typing.Type[SchemaValidator], ...],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> PathToSchemasType:
|
|
anyof_classes = []
|
|
path_to_schemas: PathToSchemasType = collections.defaultdict(dict)
|
|
module_namespace = vars(sys.modules[cls.__module__])
|
|
for schema in classes:
|
|
schema = _get_class(schema, module_namespace)
|
|
if schema is cls:
|
|
"""
|
|
optimistically assume that cls schema will pass validation
|
|
do not invoke _validate on it because that is recursive
|
|
"""
|
|
anyof_classes.append(schema)
|
|
continue
|
|
if validation_metadata.validation_ran_earlier(schema):
|
|
anyof_classes.append(schema)
|
|
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
|
|
continue
|
|
|
|
try:
|
|
other_path_to_schemas = schema._validate(arg, validation_metadata=validation_metadata)
|
|
except (exceptions.ApiValueError, exceptions.ApiTypeError) as ex:
|
|
# silence exceptions because the code needs to accumulate anyof_classes
|
|
continue
|
|
anyof_classes.append(schema)
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
if not anyof_classes:
|
|
raise exceptions.ApiValueError(
|
|
"Invalid inputs given to generate an instance of {}. None "
|
|
"of the anyOf schemas matched the input data.".format(cls)
|
|
)
|
|
return path_to_schemas
|
|
|
|
|
|
def validate_all_of(
|
|
arg: typing.Any,
|
|
classes: typing.Tuple[typing.Type[SchemaValidator], ...],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> PathToSchemasType:
|
|
path_to_schemas: PathToSchemasType = collections.defaultdict(dict)
|
|
for schema in classes:
|
|
schema = _get_class(schema)
|
|
if schema is cls:
|
|
"""
|
|
optimistically assume that cls schema will pass validation
|
|
do not invoke _validate on it because that is recursive
|
|
"""
|
|
continue
|
|
if validation_metadata.validation_ran_earlier(schema):
|
|
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
|
|
continue
|
|
other_path_to_schemas = schema._validate(arg, validation_metadata=validation_metadata)
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
return path_to_schemas
|
|
|
|
|
|
def validate_not(
|
|
arg: typing.Any,
|
|
not_cls: typing.Type[SchemaValidator],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
not_schema = _get_class(not_cls)
|
|
other_path_to_schemas = None
|
|
not_exception = exceptions.ApiValueError(
|
|
"Invalid value '{}' was passed in to {}. Value is invalid because it is disallowed by {}".format(
|
|
arg,
|
|
cls.__name__,
|
|
not_schema.__name__,
|
|
)
|
|
)
|
|
if validation_metadata.validation_ran_earlier(not_schema):
|
|
raise not_exception
|
|
|
|
try:
|
|
other_path_to_schemas = not_schema._validate(arg, validation_metadata=validation_metadata)
|
|
except (exceptions.ApiValueError, exceptions.ApiTypeError):
|
|
pass
|
|
if other_path_to_schemas:
|
|
raise not_exception
|
|
return None
|
|
|
|
|
|
def __ensure_discriminator_value_present(
|
|
disc_property_name: str,
|
|
validation_metadata: ValidationMetadata,
|
|
arg
|
|
):
|
|
if disc_property_name not in arg:
|
|
# The input data does not contain the discriminator property
|
|
raise exceptions.ApiValueError(
|
|
"Cannot deserialize input data due to missing discriminator. "
|
|
"The discriminator property '{}' is missing at path: {}".format(disc_property_name, validation_metadata.path_to_item)
|
|
)
|
|
|
|
|
|
def __get_discriminated_class(cls, disc_property_name: str, disc_payload_value: str):
|
|
"""
|
|
Used in schemas with discriminators
|
|
"""
|
|
cls_schema = cls()
|
|
if not hasattr(cls_schema, 'discriminator'):
|
|
return None
|
|
disc = cls_schema.discriminator
|
|
if disc_property_name not in disc:
|
|
return None
|
|
discriminated_cls = disc[disc_property_name].get(disc_payload_value)
|
|
if discriminated_cls is not None:
|
|
return discriminated_cls
|
|
if not (
|
|
hasattr(cls_schema, 'all_of') or
|
|
hasattr(cls_schema, 'one_of') or
|
|
hasattr(cls_schema, 'any_of')
|
|
):
|
|
return None
|
|
# TODO stop traveling if a cycle is hit
|
|
if hasattr(cls_schema, 'all_of'):
|
|
for allof_cls in cls_schema.all_of:
|
|
discriminated_cls = __get_discriminated_class(
|
|
allof_cls, disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
|
|
if discriminated_cls is not None:
|
|
return discriminated_cls
|
|
if hasattr(cls_schema, 'one_of'):
|
|
for oneof_cls in cls_schema.one_of:
|
|
discriminated_cls = __get_discriminated_class(
|
|
oneof_cls, disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
|
|
if discriminated_cls is not None:
|
|
return discriminated_cls
|
|
if hasattr(cls_schema, 'any_of'):
|
|
for anyof_cls in cls_schema.any_of:
|
|
discriminated_cls = __get_discriminated_class(
|
|
anyof_cls, disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
|
|
if discriminated_cls is not None:
|
|
return discriminated_cls
|
|
return None
|
|
|
|
|
|
def validate_discriminator(
|
|
arg: typing.Any,
|
|
discriminator: typing.Mapping[str, typing.Mapping[str, typing.Type[SchemaValidator]]],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
disc_prop_name = list(discriminator.keys())[0]
|
|
__ensure_discriminator_value_present(disc_prop_name, validation_metadata, arg)
|
|
discriminated_cls = __get_discriminated_class(
|
|
cls, disc_property_name=disc_prop_name, disc_payload_value=arg[disc_prop_name]
|
|
)
|
|
if discriminated_cls is None:
|
|
raise exceptions.ApiValueError(
|
|
"Invalid discriminator value was passed in to {}.{} Only the values {} are allowed at {}".format(
|
|
cls.__name__,
|
|
disc_prop_name,
|
|
list(discriminator[disc_prop_name].keys()),
|
|
validation_metadata.path_to_item + (disc_prop_name,)
|
|
)
|
|
)
|
|
if discriminated_cls is cls:
|
|
"""
|
|
Optimistically assume that cls will pass validation
|
|
If the code invoked _validate on cls it would infinitely recurse
|
|
"""
|
|
return None
|
|
if validation_metadata.validation_ran_earlier(discriminated_cls):
|
|
path_to_schemas: PathToSchemasType = {}
|
|
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
|
|
return path_to_schemas
|
|
updated_vm = ValidationMetadata(
|
|
path_to_item=validation_metadata.path_to_item,
|
|
configuration=validation_metadata.configuration,
|
|
seen_classes=validation_metadata.seen_classes | frozenset({cls}),
|
|
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
|
|
)
|
|
return discriminated_cls._validate(arg, validation_metadata=updated_vm)
|
|
|
|
|
|
def _get_if_path_to_schemas(
|
|
arg: typing.Any,
|
|
if_cls: typing.Type[SchemaValidator],
|
|
validation_metadata: ValidationMetadata,
|
|
) -> PathToSchemasType:
|
|
if_cls = _get_class(if_cls)
|
|
these_path_to_schemas: PathToSchemasType = {}
|
|
try:
|
|
other_path_to_schemas = if_cls._validate(
|
|
arg, validation_metadata=validation_metadata)
|
|
update(these_path_to_schemas, other_path_to_schemas)
|
|
except exceptions.OpenApiException:
|
|
pass
|
|
return these_path_to_schemas
|
|
|
|
|
|
def validate_if(
|
|
arg: typing.Any,
|
|
if_cls_if_path_to_schemas: typing.Tuple[
|
|
typing.Type[SchemaValidator], typing.Optional[PathToSchemasType]
|
|
],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if_path_to_schemas = if_cls_if_path_to_schemas[1]
|
|
if if_path_to_schemas is None:
|
|
raise exceptions.OpenApiException('Invalid type for if_path_to_schemas')
|
|
"""
|
|
if is false use case
|
|
if_path_to_schemas == {}
|
|
no need to add any data to path_to_schemas
|
|
|
|
if true, then true -> true for whole schema
|
|
so validate_then will add if_path_to_schemas data to path_to_schemas
|
|
"""
|
|
return None
|
|
|
|
|
|
def validate_then(
|
|
arg: typing.Any,
|
|
then_cls_if_path_to_schemas: typing.Tuple[
|
|
typing.Type[SchemaValidator], typing.Optional[PathToSchemasType]
|
|
],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if_path_to_schemas = then_cls_if_path_to_schemas[1]
|
|
if if_path_to_schemas is None:
|
|
# use case: there is no if
|
|
return None
|
|
"""
|
|
if is false use case
|
|
if_path_to_schemas == {}
|
|
no need to add any data to path_to_schemas
|
|
"""
|
|
if not if_path_to_schemas:
|
|
return None
|
|
then_cls = _get_class(then_cls_if_path_to_schemas[0])
|
|
these_path_to_schemas: PathToSchemasType = {}
|
|
try:
|
|
other_path_to_schemas = then_cls._validate(
|
|
arg, validation_metadata=validation_metadata)
|
|
update(these_path_to_schemas, if_path_to_schemas)
|
|
update(these_path_to_schemas, other_path_to_schemas)
|
|
return these_path_to_schemas
|
|
except exceptions.OpenApiException as ex:
|
|
# then False case
|
|
raise ex
|
|
|
|
|
|
def validate_else(
|
|
arg: typing.Any,
|
|
else_cls_if_path_to_schemas: typing.Tuple[
|
|
typing.Type[SchemaValidator], typing.Optional[PathToSchemasType]
|
|
],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if_path_to_schemas = else_cls_if_path_to_schemas[1]
|
|
if if_path_to_schemas is None:
|
|
# use case: there is no if
|
|
return None
|
|
if if_path_to_schemas:
|
|
# skip validation if if_path_to_schemas was true
|
|
return None
|
|
"""
|
|
if is false use case
|
|
if_path_to_schemas == {}
|
|
"""
|
|
else_cls = _get_class(else_cls_if_path_to_schemas[0])
|
|
these_path_to_schemas: PathToSchemasType = {}
|
|
try:
|
|
other_path_to_schemas = else_cls._validate(
|
|
arg, validation_metadata=validation_metadata)
|
|
update(these_path_to_schemas, if_path_to_schemas)
|
|
update(these_path_to_schemas, other_path_to_schemas)
|
|
return these_path_to_schemas
|
|
except exceptions.OpenApiException as ex:
|
|
# else False case
|
|
raise ex
|
|
|
|
|
|
def _get_contains_path_to_schemas(
|
|
arg: typing.Any,
|
|
contains_cls: typing.Type[SchemaValidator],
|
|
validation_metadata: ValidationMetadata,
|
|
path_to_schemas: PathToSchemasType
|
|
) -> typing.List[PathToSchemasType]:
|
|
if not isinstance(arg, tuple):
|
|
return []
|
|
contains_cls = _get_class(contains_cls)
|
|
contains_path_to_schemas = []
|
|
for i, value in enumerate(arg):
|
|
these_path_to_schemas: PathToSchemasType = {}
|
|
item_validation_metadata = ValidationMetadata(
|
|
path_to_item=validation_metadata.path_to_item+(i,),
|
|
configuration=validation_metadata.configuration,
|
|
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
|
|
)
|
|
if item_validation_metadata.validation_ran_earlier(contains_cls):
|
|
add_deeper_validated_schemas(item_validation_metadata, these_path_to_schemas)
|
|
contains_path_to_schemas.append(these_path_to_schemas)
|
|
continue
|
|
try:
|
|
other_path_to_schemas = contains_cls._validate(
|
|
value, validation_metadata=item_validation_metadata)
|
|
contains_path_to_schemas.append(other_path_to_schemas)
|
|
except exceptions.OpenApiException:
|
|
pass
|
|
return contains_path_to_schemas
|
|
|
|
|
|
def validate_contains(
|
|
arg: typing.Any,
|
|
contains_cls_path_to_schemas: typing.Tuple[typing.Type[SchemaValidator], typing.List[PathToSchemasType]],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, tuple):
|
|
return None
|
|
many_path_to_schemas = contains_cls_path_to_schemas[1]
|
|
if not many_path_to_schemas:
|
|
raise exceptions.ApiValueError(
|
|
"Validation failed for contains keyword in class={} at path_to_item={}. No "
|
|
"items validated to the contains schema.".format(cls, validation_metadata.path_to_item)
|
|
)
|
|
these_path_to_schemas: PathToSchemasType = {}
|
|
for other_path_to_schema in many_path_to_schemas:
|
|
update(these_path_to_schemas, other_path_to_schema)
|
|
return these_path_to_schemas
|
|
|
|
|
|
def validate_min_contains(
|
|
arg: typing.Any,
|
|
min_contains_and_contains_path_to_schemas: typing.Tuple[int, typing.List[PathToSchemasType]],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, tuple):
|
|
return None
|
|
min_contains = min_contains_and_contains_path_to_schemas[0]
|
|
contains_path_to_schemas = min_contains_and_contains_path_to_schemas[1]
|
|
if len(contains_path_to_schemas) < min_contains:
|
|
raise exceptions.ApiValueError(
|
|
"Validation failed for minContains keyword in class={} at path_to_item={}. No "
|
|
"items validated to the contains schema.".format(cls, validation_metadata.path_to_item)
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_max_contains(
|
|
arg: typing.Any,
|
|
max_contains_and_contains_path_to_schemas: typing.Tuple[int, typing.List[PathToSchemasType]],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, tuple):
|
|
return None
|
|
max_contains = max_contains_and_contains_path_to_schemas[0]
|
|
contains_path_to_schemas = max_contains_and_contains_path_to_schemas[1]
|
|
if len(contains_path_to_schemas) > max_contains:
|
|
raise exceptions.ApiValueError(
|
|
"Validation failed for maxContains keyword in class={} at path_to_item={}. Too "
|
|
"many items validated to the contains schema.".format(cls, validation_metadata.path_to_item)
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_const(
|
|
arg: typing.Any,
|
|
const_value_to_name: typing.Dict[typing.Any, str],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if arg not in const_value_to_name:
|
|
raise exceptions.ApiValueError("Invalid value {} passed in to {}, allowed_values={}".format(arg, cls, const_value_to_name.keys()))
|
|
return None
|
|
|
|
|
|
def validate_dependent_required(
|
|
arg: typing.Any,
|
|
dependent_required: typing.Mapping[str, typing.Set[str]],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
for key, keys_that_must_exist in dependent_required.items():
|
|
if key not in arg:
|
|
continue
|
|
missing_keys = keys_that_must_exist - arg.keys()
|
|
if missing_keys:
|
|
raise exceptions.ApiValueError(
|
|
f"Validation failed for dependentRequired because these_keys={missing_keys} are "
|
|
f"missing at path_to_item={validation_metadata.path_to_item} in class {cls}"
|
|
)
|
|
return None
|
|
|
|
|
|
def validate_dependent_schemas(
|
|
arg: typing.Any,
|
|
dependent_schemas: typing.Mapping[str, typing.Type[SchemaValidator]],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
path_to_schemas: PathToSchemasType = {}
|
|
module_namespace = vars(sys.modules[cls.__module__])
|
|
for key, schema in dependent_schemas.items():
|
|
if key not in arg:
|
|
continue
|
|
schema = _get_class(schema, module_namespace)
|
|
if validation_metadata.validation_ran_earlier(schema):
|
|
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
|
|
continue
|
|
other_path_to_schemas = schema._validate(arg, validation_metadata=validation_metadata)
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
return path_to_schemas
|
|
|
|
|
|
def validate_property_names(
|
|
arg: typing.Any,
|
|
property_names_schema: typing.Type[SchemaValidator],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> None:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
module_namespace = vars(sys.modules[cls.__module__])
|
|
property_names_schema = _get_class(property_names_schema, module_namespace)
|
|
for key in arg.keys():
|
|
path_to_item = validation_metadata.path_to_item + (key,)
|
|
key_validation_metadata = ValidationMetadata(
|
|
path_to_item=path_to_item,
|
|
configuration=validation_metadata.configuration,
|
|
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
|
|
)
|
|
property_names_schema._validate(key, validation_metadata=key_validation_metadata)
|
|
return None
|
|
|
|
|
|
def _get_validated_pattern_properties(
|
|
arg: typing.Any,
|
|
pattern_properties: typing.Mapping[PatternInfo, typing.Type[SchemaValidator]],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
path_to_schemas: PathToSchemasType = {}
|
|
module_namespace = vars(sys.modules[cls.__module__])
|
|
for property_name, property_value in arg.items():
|
|
path_to_item = validation_metadata.path_to_item + (property_name,)
|
|
property_validation_metadata = ValidationMetadata(
|
|
path_to_item=path_to_item,
|
|
configuration=validation_metadata.configuration,
|
|
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
|
|
)
|
|
for pattern_info, schema in pattern_properties.items():
|
|
flags = pattern_info.flags if pattern_info.flags is not None else 0
|
|
if not re.search(pattern_info.pattern, property_name, flags=flags):
|
|
continue
|
|
schema = _get_class(schema, module_namespace)
|
|
if validation_metadata.validation_ran_earlier(schema):
|
|
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
|
|
continue
|
|
other_path_to_schemas = schema._validate(property_value, validation_metadata=property_validation_metadata)
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
return path_to_schemas
|
|
|
|
|
|
def validate_pattern_properties(
|
|
arg: typing.Any,
|
|
pattern_properties_validation_results: typing.Tuple[
|
|
typing.Mapping[PatternInfo, typing.Type[SchemaValidator]],
|
|
typing.Optional[PathToSchemasType]
|
|
],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
validation_results = pattern_properties_validation_results[1]
|
|
return validation_results
|
|
|
|
|
|
def validate_prefix_items(
|
|
arg: typing.Any,
|
|
prefix_items: typing.Tuple[typing.Type[SchemaValidator], ...],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, tuple):
|
|
return None
|
|
path_to_schemas: PathToSchemasType = {}
|
|
module_namespace = vars(sys.modules[cls.__module__])
|
|
for i, val in enumerate(arg):
|
|
if i >= len(prefix_items):
|
|
break
|
|
schema = _get_class(prefix_items[i], module_namespace)
|
|
path_to_item = validation_metadata.path_to_item + (i,)
|
|
item_validation_metadata = ValidationMetadata(
|
|
path_to_item=path_to_item,
|
|
configuration=validation_metadata.configuration,
|
|
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
|
|
)
|
|
if item_validation_metadata.validation_ran_earlier(schema):
|
|
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
|
|
continue
|
|
other_path_to_schemas = schema._validate(val, validation_metadata=item_validation_metadata)
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
return path_to_schemas
|
|
|
|
|
|
def validate_unevaluated_items(
|
|
arg: typing.Any,
|
|
unevaluated_items_validated_path_to_schemas: typing.Tuple[typing.Type[SchemaValidator], PathToSchemasType],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, tuple):
|
|
return None
|
|
path_to_schemas: PathToSchemasType = {}
|
|
module_namespace = vars(sys.modules[cls.__module__])
|
|
schema = _get_class(unevaluated_items_validated_path_to_schemas[0], module_namespace)
|
|
validated_path_to_schemas = unevaluated_items_validated_path_to_schemas[1]
|
|
for i, val in enumerate(arg):
|
|
path_to_item = validation_metadata.path_to_item + (i,)
|
|
if path_to_item in validated_path_to_schemas:
|
|
continue
|
|
item_validation_metadata = ValidationMetadata(
|
|
path_to_item=path_to_item,
|
|
configuration=validation_metadata.configuration,
|
|
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
|
|
)
|
|
other_path_to_schemas = schema._validate(val, validation_metadata=item_validation_metadata)
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
return path_to_schemas
|
|
|
|
|
|
def validate_unevaluated_properties(
|
|
arg: typing.Any,
|
|
unevaluated_properties_validated_path_to_schemas: typing.Tuple[typing.Type[SchemaValidator], PathToSchemasType],
|
|
cls: typing.Type,
|
|
validation_metadata: ValidationMetadata,
|
|
) -> typing.Optional[PathToSchemasType]:
|
|
if not isinstance(arg, immutabledict):
|
|
return None
|
|
path_to_schemas: PathToSchemasType = {}
|
|
module_namespace = vars(sys.modules[cls.__module__])
|
|
schema = _get_class(unevaluated_properties_validated_path_to_schemas[0], module_namespace)
|
|
validated_path_to_schemas = unevaluated_properties_validated_path_to_schemas[1]
|
|
for property_name, val in arg.items():
|
|
path_to_item = validation_metadata.path_to_item + (property_name,)
|
|
if path_to_item in validated_path_to_schemas:
|
|
continue
|
|
property_validation_metadata = ValidationMetadata(
|
|
path_to_item=path_to_item,
|
|
configuration=validation_metadata.configuration,
|
|
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
|
|
)
|
|
other_path_to_schemas = schema._validate(val, validation_metadata=property_validation_metadata)
|
|
update(path_to_schemas, other_path_to_schemas)
|
|
return path_to_schemas
|
|
|
|
|
|
validator_type = typing.Callable[[typing.Any, typing.Any, type, ValidationMetadata], typing.Optional[PathToSchemasType]]
|
|
json_schema_keyword_to_validator: typing.Mapping[str, validator_type] = {
|
|
'types': validate_types,
|
|
'enum_value_to_name': validate_enum,
|
|
'unique_items': validate_unique_items,
|
|
'min_items': validate_min_items,
|
|
'max_items': validate_max_items,
|
|
'min_properties': validate_min_properties,
|
|
'max_properties': validate_max_properties,
|
|
'min_length': validate_min_length,
|
|
'max_length': validate_max_length,
|
|
'inclusive_minimum': validate_inclusive_minimum,
|
|
'exclusive_minimum': validate_exclusive_minimum,
|
|
'inclusive_maximum': validate_inclusive_maximum,
|
|
'exclusive_maximum': validate_exclusive_maximum,
|
|
'multiple_of': validate_multiple_of,
|
|
'pattern': validate_pattern,
|
|
'format': validate_format,
|
|
'required': validate_required,
|
|
'items': validate_items,
|
|
'properties': validate_properties,
|
|
'additional_properties': validate_additional_properties,
|
|
'one_of': validate_one_of,
|
|
'any_of': validate_any_of,
|
|
'all_of': validate_all_of,
|
|
'not_': validate_not,
|
|
'discriminator': validate_discriminator,
|
|
'contains': validate_contains,
|
|
'min_contains': validate_min_contains,
|
|
'max_contains': validate_max_contains,
|
|
'const_value_to_name': validate_const,
|
|
'dependent_required': validate_dependent_required,
|
|
'dependent_schemas': validate_dependent_schemas,
|
|
'property_names': validate_property_names,
|
|
'pattern_properties': validate_pattern_properties,
|
|
'prefix_items': validate_prefix_items,
|
|
'unevaluated_items': validate_unevaluated_items,
|
|
'unevaluated_properties': validate_unevaluated_properties,
|
|
'if_': validate_if,
|
|
'then': validate_then,
|
|
'else_': validate_else
|
|
} |