ComfyUI/comfy/api/schemas/validation.py
doctorpangloss 1b2ea61345 Improved API support
- Run comfyui workflows directly inside other python applications using
   EmbeddedComfyClient.
 - Optional telemetry in prompts and models using anonymity preserving
   Plausible self-hosted or hosted.
 - Better OpenAPI schema
 - Basic support for distributed ComfyUI backends. Limitations: no
   progress reporting, no easy way to start your own distributed
   backend, requires RabbitMQ as a message broker.
2024-02-07 14:20:21 -08:00

1446 lines
53 KiB
Python

# coding: utf-8
"""
comfyui
No description provided (generated by Openapi JSON Schema Generator https://github.com/openapi-json-schema-tools/openapi-json-schema-generator) # noqa: E501
The version of the OpenAPI document: 0.0.1
Generated by: https://github.com/openapi-json-schema-tools/openapi-json-schema-generator
"""
from __future__ import annotations
import collections
import dataclasses
import decimal
import re
import sys
import types
import typing
import uuid
import typing_extensions
from comfy.api import exceptions
from comfy.api.configurations import schema_configuration
from . import format, original_immutabledict
immutabledict = original_immutabledict.immutabledict
@dataclasses.dataclass
class ValidationMetadata:
"""
A class storing metadata that is needed to validate OpenApi Schema payloads
"""
path_to_item: typing.Tuple[typing.Union[str, int], ...]
configuration: schema_configuration.SchemaConfiguration
validated_path_to_schemas: typing.Mapping[
typing.Tuple[typing.Union[str, int], ...],
typing.Mapping[type, None]
] = dataclasses.field(default_factory=dict)
seen_classes: typing.FrozenSet[type] = frozenset()
def validation_ran_earlier(self, cls: type) -> bool:
validated_schemas: typing.Union[typing.Mapping[type, None], None] = self.validated_path_to_schemas.get(self.path_to_item)
if validated_schemas and cls in validated_schemas:
return True
if cls in self.seen_classes:
return True
return False
def _raise_validation_error_message(value, constraint_msg, constraint_value, path_to_item, additional_txt=""):
raise exceptions.ApiValueError(
"Invalid value `{value}`, {constraint_msg} `{constraint_value}`{additional_txt} at {path_to_item}".format(
value=value,
constraint_msg=constraint_msg,
constraint_value=constraint_value,
additional_txt=additional_txt,
path_to_item=path_to_item,
)
)
class SchemaValidator:
__excluded_cls_properties = {
'__module__',
'__dict__',
'__weakref__',
'__doc__',
'__annotations__',
'default', # excluded because it has no impact on validation
'type_to_output_cls', # used to pluck the output class for instantiation
}
@classmethod
def _validate(
cls,
arg,
validation_metadata: ValidationMetadata,
) -> PathToSchemasType:
"""
SchemaValidator validate
All keyword validation except for type checking was done in calling stack frames
If those validations passed, the validated classes are collected in path_to_schemas
"""
cls_schema = cls()
json_schema_data = {
k: v
for k, v in vars(cls_schema).items()
if k not in cls.__excluded_cls_properties
and k
not in validation_metadata.configuration.disabled_json_schema_python_keywords
}
contains_path_to_schemas = []
path_to_schemas: PathToSchemasType = {}
if 'contains' in vars(cls_schema):
contains_path_to_schemas = _get_contains_path_to_schemas(
arg,
vars(cls_schema)['contains'],
validation_metadata,
path_to_schemas
)
if_path_to_schemas = None
if 'if_' in vars(cls_schema):
if_path_to_schemas = _get_if_path_to_schemas(
arg,
vars(cls_schema)['if_'],
validation_metadata,
)
validated_pattern_properties: typing.Optional[PathToSchemasType] = None
if 'pattern_properties' in vars(cls_schema):
validated_pattern_properties = _get_validated_pattern_properties(
arg,
vars(cls_schema)['pattern_properties'],
cls,
validation_metadata
)
prefix_items_length = 0
if 'prefix_items' in vars(cls_schema):
prefix_items_length = len(vars(cls_schema)['prefix_items'])
for keyword, val in json_schema_data.items():
used_val: typing.Any
if keyword in {'contains', 'min_contains', 'max_contains'}:
used_val = (val, contains_path_to_schemas)
elif keyword == 'items':
used_val = (val, prefix_items_length)
elif keyword in {'unevaluated_items', 'unevaluated_properties'}:
used_val = (val, path_to_schemas)
elif keyword in {'types'}:
format: typing.Optional[str] = vars(cls_schema).get('format', None)
used_val = (val, format)
elif keyword in {'pattern_properties', 'additional_properties'}:
used_val = (val, validated_pattern_properties)
elif keyword in {'if_', 'then', 'else_'}:
used_val = (val, if_path_to_schemas)
else:
used_val = val
validator = json_schema_keyword_to_validator[keyword]
other_path_to_schemas = validator(
arg,
used_val,
cls,
validation_metadata,
)
if other_path_to_schemas:
update(path_to_schemas, other_path_to_schemas)
base_class = type(arg)
if validation_metadata.path_to_item not in path_to_schemas:
path_to_schemas[validation_metadata.path_to_item] = dict()
path_to_schemas[validation_metadata.path_to_item][base_class] = None
path_to_schemas[validation_metadata.path_to_item][cls] = None
return path_to_schemas
PathToSchemasType = typing.Dict[
typing.Tuple[typing.Union[str, int], ...],
typing.Dict[
typing.Union[
typing.Type[SchemaValidator],
typing.Type[str],
typing.Type[int],
typing.Type[float],
typing.Type[bool],
typing.Type[None],
typing.Type[immutabledict],
typing.Type[tuple]
],
None
]
]
def _get_class(
item_cls: typing.Union[types.FunctionType, staticmethod, typing.Type[SchemaValidator]],
local_namespace: typing.Optional[dict] = None
) -> typing.Type[SchemaValidator]:
if isinstance(item_cls, typing._GenericAlias): # type: ignore
# petstore_api.schemas.StrSchema[~U] -> petstore_api.schemas.StrSchema
origin_cls = typing.get_origin(item_cls)
if origin_cls is None:
raise ValueError('origin class must not be None')
return origin_cls
elif isinstance(item_cls, types.FunctionType):
# referenced schema
return item_cls()
elif isinstance(item_cls, staticmethod):
# referenced schema
return item_cls.__func__()
elif isinstance(item_cls, type):
return item_cls
elif isinstance(item_cls, typing.ForwardRef):
if sys.version_info < (3, 9):
return item_cls._evaluate(None, local_namespace)
return item_cls._evaluate(None, local_namespace, set())
raise ValueError('invalid class value passed in')
def update(d: dict, u: dict):
"""
Adds u to d
Where each dict is collections.defaultdict(dict)
"""
if not u:
return d
for k, v in u.items():
if k not in d:
d[k] = v
else:
d[k].update(v)
def add_deeper_validated_schemas(validation_metadata: ValidationMetadata, path_to_schemas: dict):
# this is called if validation_ran_earlier and current and deeper locations need to be added
current_path_to_item = validation_metadata.path_to_item
other_path_to_schemas = {}
for path_to_item, schemas in validation_metadata.validated_path_to_schemas.items():
if len(path_to_item) < len(current_path_to_item):
continue
path_begins_with_current_path = path_to_item[:len(current_path_to_item)] == current_path_to_item
if path_begins_with_current_path:
other_path_to_schemas[path_to_item] = schemas
update(path_to_schemas, other_path_to_schemas)
def __get_valid_classes_phrase(input_classes):
"""Returns a string phrase describing what types are allowed"""
all_classes = list(input_classes)
all_classes = sorted(all_classes, key=lambda cls: cls.__name__)
all_class_names = [cls.__name__ for cls in all_classes]
if len(all_class_names) == 1:
return "is {0}".format(all_class_names[0])
return "is one of [{0}]".format(", ".join(all_class_names))
def __type_error_message(
var_value=None, var_name=None, valid_classes=None, key_type=None
):
"""
Keyword Args:
var_value (any): the variable which has the type_error
var_name (str): the name of the variable which has the typ error
valid_classes (tuple): the accepted classes for current_item's
value
key_type (bool): False if our value is a value in a dict
True if it is a key in a dict
False if our item is an item in a tuple
"""
key_or_value = "value"
if key_type:
key_or_value = "key"
valid_classes_phrase = __get_valid_classes_phrase(valid_classes)
msg = "Invalid type. Required {0} type {1} and " "passed type was {2}".format(
key_or_value,
valid_classes_phrase,
type(var_value).__name__,
)
return msg
def __get_type_error(var_value, path_to_item, valid_classes, key_type=False):
error_msg = __type_error_message(
var_name=path_to_item[-1],
var_value=var_value,
valid_classes=valid_classes,
key_type=key_type,
)
return exceptions.ApiTypeError(
error_msg,
path_to_item=path_to_item,
valid_classes=valid_classes,
key_type=key_type,
)
@dataclasses.dataclass(frozen=True)
class PatternInfo:
pattern: str
flags: typing.Optional[re.RegexFlag] = None
def validate_types(
arg: typing.Any,
allowed_types_format: typing.Tuple[typing.Set[typing.Type], typing.Optional[str]],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
allowed_types = allowed_types_format[0]
if type(arg) not in allowed_types:
raise __get_type_error(
arg,
validation_metadata.path_to_item,
allowed_types,
key_type=False,
)
if isinstance(arg, bool) or not isinstance(arg, (int, float)):
return None
format = allowed_types_format[1]
if format and format == 'int' and arg != int(arg):
# there is a json schema test where 1.0 validates as an integer
raise exceptions.ApiValueError(
"Invalid non-integer value '{}' for type {} at {}".format(
arg, format, validation_metadata.path_to_item
)
)
return None
def validate_enum(
arg: typing.Any,
enum_value_to_name: typing.Dict[typing.Any, str],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if arg not in enum_value_to_name:
raise exceptions.ApiValueError("Invalid value {} passed in to {}, allowed_values={}".format(arg, cls, enum_value_to_name.keys()))
return None
def validate_unique_items(
arg: typing.Any,
unique_items_value: bool,
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not unique_items_value or not isinstance(arg, tuple):
return None
if len(arg) == len(set(arg)):
return None
_raise_validation_error_message(
value=arg,
constraint_msg="duplicate items were found, and the tuple must not contain duplicates because",
constraint_value='unique_items==True',
path_to_item=validation_metadata.path_to_item
)
def validate_min_items(
arg: typing.Any,
min_items: int,
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, tuple):
return None
if len(arg) < min_items:
_raise_validation_error_message(
value=arg,
constraint_msg="number of items must be greater than or equal to",
constraint_value=min_items,
path_to_item=validation_metadata.path_to_item
)
return None
def validate_max_items(
arg: typing.Any,
max_items: int,
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, tuple):
return None
if len(arg) > max_items:
_raise_validation_error_message(
value=arg,
constraint_msg="number of items must be less than or equal to",
constraint_value=max_items,
path_to_item=validation_metadata.path_to_item
)
return None
def validate_min_properties(
arg: typing.Any,
min_properties: int,
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, immutabledict):
return None
if len(arg) < min_properties:
_raise_validation_error_message(
value=arg,
constraint_msg="number of properties must be greater than or equal to",
constraint_value=min_properties,
path_to_item=validation_metadata.path_to_item
)
return None
def validate_max_properties(
arg: typing.Any,
max_properties: int,
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, immutabledict):
return None
if len(arg) > max_properties:
_raise_validation_error_message(
value=arg,
constraint_msg="number of properties must be less than or equal to",
constraint_value=max_properties,
path_to_item=validation_metadata.path_to_item
)
return None
def validate_min_length(
arg: typing.Any,
min_length: int,
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, str):
return None
if len(arg) < min_length:
_raise_validation_error_message(
value=arg,
constraint_msg="length must be greater than or equal to",
constraint_value=min_length,
path_to_item=validation_metadata.path_to_item
)
return None
def validate_max_length(
arg: typing.Any,
max_length: int,
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, str):
return None
if len(arg) > max_length:
_raise_validation_error_message(
value=arg,
constraint_msg="length must be less than or equal to",
constraint_value=max_length,
path_to_item=validation_metadata.path_to_item
)
return None
def validate_inclusive_minimum(
arg: typing.Any,
inclusive_minimum: typing.Union[int, float],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, (int, float)):
return None
if arg < inclusive_minimum:
_raise_validation_error_message(
value=arg,
constraint_msg="must be a value greater than or equal to",
constraint_value=inclusive_minimum,
path_to_item=validation_metadata.path_to_item
)
return None
def validate_exclusive_minimum(
arg: typing.Any,
exclusive_minimum: typing.Union[int, float],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, (int, float)):
return None
if arg <= exclusive_minimum:
_raise_validation_error_message(
value=arg,
constraint_msg="must be a value greater than",
constraint_value=exclusive_minimum,
path_to_item=validation_metadata.path_to_item
)
return None
def validate_inclusive_maximum(
arg: typing.Any,
inclusive_maximum: typing.Union[int, float],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, (int, float)):
return None
if arg > inclusive_maximum:
_raise_validation_error_message(
value=arg,
constraint_msg="must be a value less than or equal to",
constraint_value=inclusive_maximum,
path_to_item=validation_metadata.path_to_item
)
return None
def validate_exclusive_maximum(
arg: typing.Any,
exclusive_maximum: typing.Union[int, float],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, (int, float)):
return None
if arg >= exclusive_maximum:
_raise_validation_error_message(
value=arg,
constraint_msg="must be a value less than",
constraint_value=exclusive_maximum,
path_to_item=validation_metadata.path_to_item
)
return None
def validate_multiple_of(
arg: typing.Any,
multiple_of: typing.Union[int, float],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, (int, float)):
return None
if (not (float(arg) / multiple_of).is_integer()):
# Note 'multipleOf' will be as good as the floating point arithmetic.
_raise_validation_error_message(
value=arg,
constraint_msg="value must be a multiple of",
constraint_value=multiple_of,
path_to_item=validation_metadata.path_to_item
)
return None
def validate_pattern(
arg: typing.Any,
pattern_info: PatternInfo,
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, str):
return None
flags = pattern_info.flags if pattern_info.flags is not None else 0
if not re.search(pattern_info.pattern, arg, flags=flags):
if flags != 0:
# Don't print the regex flags if the flags are not
# specified in the OAS document.
_raise_validation_error_message(
value=arg,
constraint_msg="must match regular expression",
constraint_value=pattern_info.pattern,
path_to_item=validation_metadata.path_to_item,
additional_txt=" with flags=`{}`".format(flags)
)
_raise_validation_error_message(
value=arg,
constraint_msg="must match regular expression",
constraint_value=pattern_info.pattern,
path_to_item=validation_metadata.path_to_item
)
return None
__int32_inclusive_minimum = -2147483648
__int32_inclusive_maximum = 2147483647
__int64_inclusive_minimum = -9223372036854775808
__int64_inclusive_maximum = 9223372036854775807
__float_inclusive_minimum = -3.4028234663852886e+38
__float_inclusive_maximum = 3.4028234663852886e+38
__double_inclusive_minimum = -1.7976931348623157E+308
__double_inclusive_maximum = 1.7976931348623157E+308
def __validate_numeric_format(
arg: typing.Union[int, float],
format_value: str,
validation_metadata: ValidationMetadata
) -> None:
if format_value[:3] == 'int':
# there is a json schema test where 1.0 validates as an integer
if arg != int(arg):
raise exceptions.ApiValueError(
"Invalid non-integer value '{}' for type {} at {}".format(
arg, format, validation_metadata.path_to_item
)
)
if format_value == 'int32':
if not __int32_inclusive_minimum <= arg <= __int32_inclusive_maximum:
raise exceptions.ApiValueError(
"Invalid value '{}' for type int32 at {}".format(arg, validation_metadata.path_to_item)
)
return None
elif format_value == 'int64':
if not __int64_inclusive_minimum <= arg <= __int64_inclusive_maximum:
raise exceptions.ApiValueError(
"Invalid value '{}' for type int64 at {}".format(arg, validation_metadata.path_to_item)
)
return None
return None
elif format_value in {'float', 'double'}:
if format_value == 'float':
if not __float_inclusive_minimum <= arg <= __float_inclusive_maximum:
raise exceptions.ApiValueError(
"Invalid value '{}' for type float at {}".format(arg, validation_metadata.path_to_item)
)
return None
# double
if not __double_inclusive_minimum <= arg <= __double_inclusive_maximum:
raise exceptions.ApiValueError(
"Invalid value '{}' for type double at {}".format(arg, validation_metadata.path_to_item)
)
return None
return None
def __validate_string_format(
arg: str,
format_value: str,
validation_metadata: ValidationMetadata
) -> None:
if format_value == 'uuid':
try:
uuid.UUID(arg)
return None
except ValueError:
raise exceptions.ApiValueError(
"Invalid value '{}' for type UUID at {}".format(arg, validation_metadata.path_to_item)
)
elif format_value == 'number':
try:
decimal.Decimal(arg)
return None
except decimal.InvalidOperation:
raise exceptions.ApiValueError(
"Value cannot be converted to a decimal. "
"Invalid value '{}' for type decimal at {}".format(arg, validation_metadata.path_to_item)
)
elif format_value == 'date':
try:
format.DEFAULT_ISOPARSER.parse_isodate_str(arg)
return None
except ValueError:
raise exceptions.ApiValueError(
"Value does not conform to the required ISO-8601 date format. "
"Invalid value '{}' for type date at {}".format(arg, validation_metadata.path_to_item)
)
elif format_value == 'date-time':
try:
format.DEFAULT_ISOPARSER.parse_isodatetime(arg)
return None
except ValueError:
raise exceptions.ApiValueError(
"Value does not conform to the required ISO-8601 datetime format. "
"Invalid value '{}' for type datetime at {}".format(arg, validation_metadata.path_to_item)
)
return None
def validate_format(
arg: typing.Union[str, int, float],
format_value: str,
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
# formats work for strings + numbers
if isinstance(arg, (int, float)):
return __validate_numeric_format(
arg,
format_value,
validation_metadata
)
elif isinstance(arg, str):
return __validate_string_format(
arg,
format_value,
validation_metadata
)
return None
def validate_required(
arg: typing.Any,
required: typing.Set[str],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, immutabledict):
return None
missing_req_args = required - arg.keys()
if missing_req_args:
missing_required_arguments = list(missing_req_args)
missing_required_arguments.sort()
raise exceptions.ApiTypeError(
"{} is missing {} required argument{}: {}".format(
cls.__name__,
len(missing_required_arguments),
"s" if len(missing_required_arguments) > 1 else "",
missing_required_arguments
)
)
return None
def validate_items(
arg: typing.Any,
item_cls_prefix_items_length: typing.Tuple[typing.Type[SchemaValidator], int],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, tuple):
return None
item_cls = _get_class(item_cls_prefix_items_length[0])
prefix_items_length = item_cls_prefix_items_length[1]
path_to_schemas: PathToSchemasType = {}
for i in range(prefix_items_length, len(arg)):
value = arg[i]
item_validation_metadata = ValidationMetadata(
path_to_item=validation_metadata.path_to_item+(i,),
configuration=validation_metadata.configuration,
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
if item_validation_metadata.validation_ran_earlier(item_cls):
add_deeper_validated_schemas(item_validation_metadata, path_to_schemas)
continue
other_path_to_schemas = item_cls._validate(
value, validation_metadata=item_validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
def validate_properties(
arg: typing.Any,
properties: typing.Mapping[str, typing.Type[SchemaValidator]],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, immutabledict):
return None
path_to_schemas: PathToSchemasType = {}
present_properties = {k: v for k, v, in arg.items() if k in properties}
module_namespace = vars(sys.modules[cls.__module__])
for property_name, value in present_properties.items():
path_to_item = validation_metadata.path_to_item + (property_name,)
schema = properties[property_name]
schema = _get_class(schema, module_namespace)
arg_validation_metadata = ValidationMetadata(
path_to_item=path_to_item,
configuration=validation_metadata.configuration,
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
if arg_validation_metadata.validation_ran_earlier(schema):
add_deeper_validated_schemas(arg_validation_metadata, path_to_schemas)
continue
other_path_to_schemas = schema._validate(value, validation_metadata=arg_validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
def validate_additional_properties(
arg: typing.Any,
additional_properties_cls_val_pprops: typing.Tuple[
typing.Type[SchemaValidator],
typing.Optional[PathToSchemasType]
],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, immutabledict):
return None
schema = _get_class(additional_properties_cls_val_pprops[0])
path_to_schemas: PathToSchemasType = {}
cls_schema = cls()
properties = cls_schema.properties if hasattr(cls_schema, 'properties') else {}
present_additional_properties = {k: v for k, v, in arg.items() if k not in properties}
validated_pattern_properties = additional_properties_cls_val_pprops[1]
for property_name, value in present_additional_properties.items():
path_to_item = validation_metadata.path_to_item + (property_name,)
if validated_pattern_properties and path_to_item in validated_pattern_properties:
continue
arg_validation_metadata = ValidationMetadata(
path_to_item=path_to_item,
configuration=validation_metadata.configuration,
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
if arg_validation_metadata.validation_ran_earlier(schema):
add_deeper_validated_schemas(arg_validation_metadata, path_to_schemas)
continue
other_path_to_schemas = schema._validate(value, validation_metadata=arg_validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
def validate_one_of(
arg: typing.Any,
classes: typing.Tuple[typing.Type[SchemaValidator], ...],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> PathToSchemasType:
oneof_classes = []
path_to_schemas: PathToSchemasType = collections.defaultdict(dict)
for schema in classes:
schema = _get_class(schema)
if schema in path_to_schemas[validation_metadata.path_to_item]:
oneof_classes.append(schema)
continue
if schema is cls:
"""
optimistically assume that cls schema will pass validation
do not invoke _validate on it because that is recursive
"""
oneof_classes.append(schema)
continue
if validation_metadata.validation_ran_earlier(schema):
oneof_classes.append(schema)
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
continue
try:
path_to_schemas = schema._validate(arg, validation_metadata=validation_metadata)
except (exceptions.ApiValueError, exceptions.ApiTypeError) as ex:
# silence exceptions because the code needs to accumulate oneof_classes
continue
oneof_classes.append(schema)
if not oneof_classes:
raise exceptions.ApiValueError(
"Invalid inputs given to generate an instance of {}. None "
"of the oneOf schemas matched the input data.".format(cls)
)
elif len(oneof_classes) > 1:
raise exceptions.ApiValueError(
"Invalid inputs given to generate an instance of {}. Multiple "
"oneOf schemas {} matched the inputs, but a max of one is allowed.".format(cls, oneof_classes)
)
# exactly one class matches
return path_to_schemas
def validate_any_of(
arg: typing.Any,
classes: typing.Tuple[typing.Type[SchemaValidator], ...],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> PathToSchemasType:
anyof_classes = []
path_to_schemas: PathToSchemasType = collections.defaultdict(dict)
module_namespace = vars(sys.modules[cls.__module__])
for schema in classes:
schema = _get_class(schema, module_namespace)
if schema is cls:
"""
optimistically assume that cls schema will pass validation
do not invoke _validate on it because that is recursive
"""
anyof_classes.append(schema)
continue
if validation_metadata.validation_ran_earlier(schema):
anyof_classes.append(schema)
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
continue
try:
other_path_to_schemas = schema._validate(arg, validation_metadata=validation_metadata)
except (exceptions.ApiValueError, exceptions.ApiTypeError) as ex:
# silence exceptions because the code needs to accumulate anyof_classes
continue
anyof_classes.append(schema)
update(path_to_schemas, other_path_to_schemas)
if not anyof_classes:
raise exceptions.ApiValueError(
"Invalid inputs given to generate an instance of {}. None "
"of the anyOf schemas matched the input data.".format(cls)
)
return path_to_schemas
def validate_all_of(
arg: typing.Any,
classes: typing.Tuple[typing.Type[SchemaValidator], ...],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> PathToSchemasType:
path_to_schemas: PathToSchemasType = collections.defaultdict(dict)
for schema in classes:
schema = _get_class(schema)
if schema is cls:
"""
optimistically assume that cls schema will pass validation
do not invoke _validate on it because that is recursive
"""
continue
if validation_metadata.validation_ran_earlier(schema):
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
continue
other_path_to_schemas = schema._validate(arg, validation_metadata=validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
def validate_not(
arg: typing.Any,
not_cls: typing.Type[SchemaValidator],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
not_schema = _get_class(not_cls)
other_path_to_schemas = None
not_exception = exceptions.ApiValueError(
"Invalid value '{}' was passed in to {}. Value is invalid because it is disallowed by {}".format(
arg,
cls.__name__,
not_schema.__name__,
)
)
if validation_metadata.validation_ran_earlier(not_schema):
raise not_exception
try:
other_path_to_schemas = not_schema._validate(arg, validation_metadata=validation_metadata)
except (exceptions.ApiValueError, exceptions.ApiTypeError):
pass
if other_path_to_schemas:
raise not_exception
return None
def __ensure_discriminator_value_present(
disc_property_name: str,
validation_metadata: ValidationMetadata,
arg
):
if disc_property_name not in arg:
# The input data does not contain the discriminator property
raise exceptions.ApiValueError(
"Cannot deserialize input data due to missing discriminator. "
"The discriminator property '{}' is missing at path: {}".format(disc_property_name, validation_metadata.path_to_item)
)
def __get_discriminated_class(cls, disc_property_name: str, disc_payload_value: str):
"""
Used in schemas with discriminators
"""
cls_schema = cls()
if not hasattr(cls_schema, 'discriminator'):
return None
disc = cls_schema.discriminator
if disc_property_name not in disc:
return None
discriminated_cls = disc[disc_property_name].get(disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
if not (
hasattr(cls_schema, 'all_of') or
hasattr(cls_schema, 'one_of') or
hasattr(cls_schema, 'any_of')
):
return None
# TODO stop traveling if a cycle is hit
if hasattr(cls_schema, 'all_of'):
for allof_cls in cls_schema.all_of:
discriminated_cls = __get_discriminated_class(
allof_cls, disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
if hasattr(cls_schema, 'one_of'):
for oneof_cls in cls_schema.one_of:
discriminated_cls = __get_discriminated_class(
oneof_cls, disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
if hasattr(cls_schema, 'any_of'):
for anyof_cls in cls_schema.any_of:
discriminated_cls = __get_discriminated_class(
anyof_cls, disc_property_name=disc_property_name, disc_payload_value=disc_payload_value)
if discriminated_cls is not None:
return discriminated_cls
return None
def validate_discriminator(
arg: typing.Any,
discriminator: typing.Mapping[str, typing.Mapping[str, typing.Type[SchemaValidator]]],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, immutabledict):
return None
disc_prop_name = list(discriminator.keys())[0]
__ensure_discriminator_value_present(disc_prop_name, validation_metadata, arg)
discriminated_cls = __get_discriminated_class(
cls, disc_property_name=disc_prop_name, disc_payload_value=arg[disc_prop_name]
)
if discriminated_cls is None:
raise exceptions.ApiValueError(
"Invalid discriminator value was passed in to {}.{} Only the values {} are allowed at {}".format(
cls.__name__,
disc_prop_name,
list(discriminator[disc_prop_name].keys()),
validation_metadata.path_to_item + (disc_prop_name,)
)
)
if discriminated_cls is cls:
"""
Optimistically assume that cls will pass validation
If the code invoked _validate on cls it would infinitely recurse
"""
return None
if validation_metadata.validation_ran_earlier(discriminated_cls):
path_to_schemas: PathToSchemasType = {}
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
return path_to_schemas
updated_vm = ValidationMetadata(
path_to_item=validation_metadata.path_to_item,
configuration=validation_metadata.configuration,
seen_classes=validation_metadata.seen_classes | frozenset({cls}),
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
return discriminated_cls._validate(arg, validation_metadata=updated_vm)
def _get_if_path_to_schemas(
arg: typing.Any,
if_cls: typing.Type[SchemaValidator],
validation_metadata: ValidationMetadata,
) -> PathToSchemasType:
if_cls = _get_class(if_cls)
these_path_to_schemas: PathToSchemasType = {}
try:
other_path_to_schemas = if_cls._validate(
arg, validation_metadata=validation_metadata)
update(these_path_to_schemas, other_path_to_schemas)
except exceptions.OpenApiException:
pass
return these_path_to_schemas
def validate_if(
arg: typing.Any,
if_cls_if_path_to_schemas: typing.Tuple[
typing.Type[SchemaValidator], typing.Optional[PathToSchemasType]
],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if_path_to_schemas = if_cls_if_path_to_schemas[1]
if if_path_to_schemas is None:
raise exceptions.OpenApiException('Invalid type for if_path_to_schemas')
"""
if is false use case
if_path_to_schemas == {}
no need to add any data to path_to_schemas
if true, then true -> true for whole schema
so validate_then will add if_path_to_schemas data to path_to_schemas
"""
return None
def validate_then(
arg: typing.Any,
then_cls_if_path_to_schemas: typing.Tuple[
typing.Type[SchemaValidator], typing.Optional[PathToSchemasType]
],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if_path_to_schemas = then_cls_if_path_to_schemas[1]
if if_path_to_schemas is None:
# use case: there is no if
return None
"""
if is false use case
if_path_to_schemas == {}
no need to add any data to path_to_schemas
"""
if not if_path_to_schemas:
return None
then_cls = _get_class(then_cls_if_path_to_schemas[0])
these_path_to_schemas: PathToSchemasType = {}
try:
other_path_to_schemas = then_cls._validate(
arg, validation_metadata=validation_metadata)
update(these_path_to_schemas, if_path_to_schemas)
update(these_path_to_schemas, other_path_to_schemas)
return these_path_to_schemas
except exceptions.OpenApiException as ex:
# then False case
raise ex
def validate_else(
arg: typing.Any,
else_cls_if_path_to_schemas: typing.Tuple[
typing.Type[SchemaValidator], typing.Optional[PathToSchemasType]
],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if_path_to_schemas = else_cls_if_path_to_schemas[1]
if if_path_to_schemas is None:
# use case: there is no if
return None
if if_path_to_schemas:
# skip validation if if_path_to_schemas was true
return None
"""
if is false use case
if_path_to_schemas == {}
"""
else_cls = _get_class(else_cls_if_path_to_schemas[0])
these_path_to_schemas: PathToSchemasType = {}
try:
other_path_to_schemas = else_cls._validate(
arg, validation_metadata=validation_metadata)
update(these_path_to_schemas, if_path_to_schemas)
update(these_path_to_schemas, other_path_to_schemas)
return these_path_to_schemas
except exceptions.OpenApiException as ex:
# else False case
raise ex
def _get_contains_path_to_schemas(
arg: typing.Any,
contains_cls: typing.Type[SchemaValidator],
validation_metadata: ValidationMetadata,
path_to_schemas: PathToSchemasType
) -> typing.List[PathToSchemasType]:
if not isinstance(arg, tuple):
return []
contains_cls = _get_class(contains_cls)
contains_path_to_schemas = []
for i, value in enumerate(arg):
these_path_to_schemas: PathToSchemasType = {}
item_validation_metadata = ValidationMetadata(
path_to_item=validation_metadata.path_to_item+(i,),
configuration=validation_metadata.configuration,
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
if item_validation_metadata.validation_ran_earlier(contains_cls):
add_deeper_validated_schemas(item_validation_metadata, these_path_to_schemas)
contains_path_to_schemas.append(these_path_to_schemas)
continue
try:
other_path_to_schemas = contains_cls._validate(
value, validation_metadata=item_validation_metadata)
contains_path_to_schemas.append(other_path_to_schemas)
except exceptions.OpenApiException:
pass
return contains_path_to_schemas
def validate_contains(
arg: typing.Any,
contains_cls_path_to_schemas: typing.Tuple[typing.Type[SchemaValidator], typing.List[PathToSchemasType]],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, tuple):
return None
many_path_to_schemas = contains_cls_path_to_schemas[1]
if not many_path_to_schemas:
raise exceptions.ApiValueError(
"Validation failed for contains keyword in class={} at path_to_item={}. No "
"items validated to the contains schema.".format(cls, validation_metadata.path_to_item)
)
these_path_to_schemas: PathToSchemasType = {}
for other_path_to_schema in many_path_to_schemas:
update(these_path_to_schemas, other_path_to_schema)
return these_path_to_schemas
def validate_min_contains(
arg: typing.Any,
min_contains_and_contains_path_to_schemas: typing.Tuple[int, typing.List[PathToSchemasType]],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, tuple):
return None
min_contains = min_contains_and_contains_path_to_schemas[0]
contains_path_to_schemas = min_contains_and_contains_path_to_schemas[1]
if len(contains_path_to_schemas) < min_contains:
raise exceptions.ApiValueError(
"Validation failed for minContains keyword in class={} at path_to_item={}. No "
"items validated to the contains schema.".format(cls, validation_metadata.path_to_item)
)
return None
def validate_max_contains(
arg: typing.Any,
max_contains_and_contains_path_to_schemas: typing.Tuple[int, typing.List[PathToSchemasType]],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, tuple):
return None
max_contains = max_contains_and_contains_path_to_schemas[0]
contains_path_to_schemas = max_contains_and_contains_path_to_schemas[1]
if len(contains_path_to_schemas) > max_contains:
raise exceptions.ApiValueError(
"Validation failed for maxContains keyword in class={} at path_to_item={}. Too "
"many items validated to the contains schema.".format(cls, validation_metadata.path_to_item)
)
return None
def validate_const(
arg: typing.Any,
const_value_to_name: typing.Dict[typing.Any, str],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if arg not in const_value_to_name:
raise exceptions.ApiValueError("Invalid value {} passed in to {}, allowed_values={}".format(arg, cls, const_value_to_name.keys()))
return None
def validate_dependent_required(
arg: typing.Any,
dependent_required: typing.Mapping[str, typing.Set[str]],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, immutabledict):
return None
for key, keys_that_must_exist in dependent_required.items():
if key not in arg:
continue
missing_keys = keys_that_must_exist - arg.keys()
if missing_keys:
raise exceptions.ApiValueError(
f"Validation failed for dependentRequired because these_keys={missing_keys} are "
f"missing at path_to_item={validation_metadata.path_to_item} in class {cls}"
)
return None
def validate_dependent_schemas(
arg: typing.Any,
dependent_schemas: typing.Mapping[str, typing.Type[SchemaValidator]],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, immutabledict):
return None
path_to_schemas: PathToSchemasType = {}
module_namespace = vars(sys.modules[cls.__module__])
for key, schema in dependent_schemas.items():
if key not in arg:
continue
schema = _get_class(schema, module_namespace)
if validation_metadata.validation_ran_earlier(schema):
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
continue
other_path_to_schemas = schema._validate(arg, validation_metadata=validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
def validate_property_names(
arg: typing.Any,
property_names_schema: typing.Type[SchemaValidator],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> None:
if not isinstance(arg, immutabledict):
return None
module_namespace = vars(sys.modules[cls.__module__])
property_names_schema = _get_class(property_names_schema, module_namespace)
for key in arg.keys():
path_to_item = validation_metadata.path_to_item + (key,)
key_validation_metadata = ValidationMetadata(
path_to_item=path_to_item,
configuration=validation_metadata.configuration,
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
property_names_schema._validate(key, validation_metadata=key_validation_metadata)
return None
def _get_validated_pattern_properties(
arg: typing.Any,
pattern_properties: typing.Mapping[PatternInfo, typing.Type[SchemaValidator]],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, immutabledict):
return None
path_to_schemas: PathToSchemasType = {}
module_namespace = vars(sys.modules[cls.__module__])
for property_name, property_value in arg.items():
path_to_item = validation_metadata.path_to_item + (property_name,)
property_validation_metadata = ValidationMetadata(
path_to_item=path_to_item,
configuration=validation_metadata.configuration,
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
for pattern_info, schema in pattern_properties.items():
flags = pattern_info.flags if pattern_info.flags is not None else 0
if not re.search(pattern_info.pattern, property_name, flags=flags):
continue
schema = _get_class(schema, module_namespace)
if validation_metadata.validation_ran_earlier(schema):
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
continue
other_path_to_schemas = schema._validate(property_value, validation_metadata=property_validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
def validate_pattern_properties(
arg: typing.Any,
pattern_properties_validation_results: typing.Tuple[
typing.Mapping[PatternInfo, typing.Type[SchemaValidator]],
typing.Optional[PathToSchemasType]
],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, immutabledict):
return None
validation_results = pattern_properties_validation_results[1]
return validation_results
def validate_prefix_items(
arg: typing.Any,
prefix_items: typing.Tuple[typing.Type[SchemaValidator], ...],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, tuple):
return None
path_to_schemas: PathToSchemasType = {}
module_namespace = vars(sys.modules[cls.__module__])
for i, val in enumerate(arg):
if i >= len(prefix_items):
break
schema = _get_class(prefix_items[i], module_namespace)
path_to_item = validation_metadata.path_to_item + (i,)
item_validation_metadata = ValidationMetadata(
path_to_item=path_to_item,
configuration=validation_metadata.configuration,
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
if item_validation_metadata.validation_ran_earlier(schema):
add_deeper_validated_schemas(validation_metadata, path_to_schemas)
continue
other_path_to_schemas = schema._validate(val, validation_metadata=item_validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
def validate_unevaluated_items(
arg: typing.Any,
unevaluated_items_validated_path_to_schemas: typing.Tuple[typing.Type[SchemaValidator], PathToSchemasType],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, tuple):
return None
path_to_schemas: PathToSchemasType = {}
module_namespace = vars(sys.modules[cls.__module__])
schema = _get_class(unevaluated_items_validated_path_to_schemas[0], module_namespace)
validated_path_to_schemas = unevaluated_items_validated_path_to_schemas[1]
for i, val in enumerate(arg):
path_to_item = validation_metadata.path_to_item + (i,)
if path_to_item in validated_path_to_schemas:
continue
item_validation_metadata = ValidationMetadata(
path_to_item=path_to_item,
configuration=validation_metadata.configuration,
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
other_path_to_schemas = schema._validate(val, validation_metadata=item_validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
def validate_unevaluated_properties(
arg: typing.Any,
unevaluated_properties_validated_path_to_schemas: typing.Tuple[typing.Type[SchemaValidator], PathToSchemasType],
cls: typing.Type,
validation_metadata: ValidationMetadata,
) -> typing.Optional[PathToSchemasType]:
if not isinstance(arg, immutabledict):
return None
path_to_schemas: PathToSchemasType = {}
module_namespace = vars(sys.modules[cls.__module__])
schema = _get_class(unevaluated_properties_validated_path_to_schemas[0], module_namespace)
validated_path_to_schemas = unevaluated_properties_validated_path_to_schemas[1]
for property_name, val in arg.items():
path_to_item = validation_metadata.path_to_item + (property_name,)
if path_to_item in validated_path_to_schemas:
continue
property_validation_metadata = ValidationMetadata(
path_to_item=path_to_item,
configuration=validation_metadata.configuration,
validated_path_to_schemas=validation_metadata.validated_path_to_schemas
)
other_path_to_schemas = schema._validate(val, validation_metadata=property_validation_metadata)
update(path_to_schemas, other_path_to_schemas)
return path_to_schemas
validator_type = typing.Callable[[typing.Any, typing.Any, type, ValidationMetadata], typing.Optional[PathToSchemasType]]
json_schema_keyword_to_validator: typing.Mapping[str, validator_type] = {
'types': validate_types,
'enum_value_to_name': validate_enum,
'unique_items': validate_unique_items,
'min_items': validate_min_items,
'max_items': validate_max_items,
'min_properties': validate_min_properties,
'max_properties': validate_max_properties,
'min_length': validate_min_length,
'max_length': validate_max_length,
'inclusive_minimum': validate_inclusive_minimum,
'exclusive_minimum': validate_exclusive_minimum,
'inclusive_maximum': validate_inclusive_maximum,
'exclusive_maximum': validate_exclusive_maximum,
'multiple_of': validate_multiple_of,
'pattern': validate_pattern,
'format': validate_format,
'required': validate_required,
'items': validate_items,
'properties': validate_properties,
'additional_properties': validate_additional_properties,
'one_of': validate_one_of,
'any_of': validate_any_of,
'all_of': validate_all_of,
'not_': validate_not,
'discriminator': validate_discriminator,
'contains': validate_contains,
'min_contains': validate_min_contains,
'max_contains': validate_max_contains,
'const_value_to_name': validate_const,
'dependent_required': validate_dependent_required,
'dependent_schemas': validate_dependent_schemas,
'property_names': validate_property_names,
'pattern_properties': validate_pattern_properties,
'prefix_items': validate_prefix_items,
'unevaluated_items': validate_unevaluated_items,
'unevaluated_properties': validate_unevaluated_properties,
'if_': validate_if,
'then': validate_then,
'else_': validate_else
}