Unify signature sanitize and hash

This commit is contained in:
xmarre 2026-03-15 07:09:24 +01:00
parent 0b512198e8
commit a6624a9afd

View File

@ -252,6 +252,7 @@ def to_hashable(obj, max_nodes=_MAX_SIGNATURE_CONTAINER_VISITS):
memo = {} memo = {}
active = set() active = set()
snapshots = {}
sort_memo = {} sort_memo = {}
processed = 0 processed = 0
stack = [(obj, False)] stack = [(obj, False)]
@ -263,12 +264,12 @@ def to_hashable(obj, max_nodes=_MAX_SIGNATURE_CONTAINER_VISITS):
return value return value
return memo.get(id(value), Unhashable()) return memo.get(id(value), Unhashable())
def resolve_unordered_values(current, container_tag): def resolve_unordered_values(current_items, container_tag):
"""Resolve a set-like container or fail closed if ordering is ambiguous.""" """Resolve a set-like container or fail closed if ordering is ambiguous."""
try: try:
ordered_items = [ ordered_items = [
(_sanitized_sort_key(item, memo=sort_memo), resolve_value(item)) (_sanitized_sort_key(item, memo=sort_memo), resolve_value(item))
for item in current for item in current_items
] ]
ordered_items.sort(key=lambda item: item[0]) ordered_items.sort(key=lambda item: item[0])
except RuntimeError: except RuntimeError:
@ -300,18 +301,33 @@ def to_hashable(obj, max_nodes=_MAX_SIGNATURE_CONTAINER_VISITS):
active.discard(current_id) active.discard(current_id)
try: try:
if current_type is dict: if current_type is dict:
items = snapshots.pop(current_id, None)
if items is None:
items = list(current.items())
memo[current_id] = ( memo[current_id] = (
"dict", "dict",
tuple((resolve_value(k), resolve_value(v)) for k, v in current.items()), tuple((resolve_value(k), resolve_value(v)) for k, v in items),
) )
elif current_type is list: elif current_type is list:
memo[current_id] = ("list", tuple(resolve_value(item) for item in current)) items = snapshots.pop(current_id, None)
if items is None:
items = list(current)
memo[current_id] = ("list", tuple(resolve_value(item) for item in items))
elif current_type is tuple: elif current_type is tuple:
memo[current_id] = ("tuple", tuple(resolve_value(item) for item in current)) items = snapshots.pop(current_id, None)
if items is None:
items = list(current)
memo[current_id] = ("tuple", tuple(resolve_value(item) for item in items))
elif current_type is set: elif current_type is set:
memo[current_id] = resolve_unordered_values(current, "set") items = snapshots.pop(current_id, None)
if items is None:
items = list(current)
memo[current_id] = resolve_unordered_values(items, "set")
else: else:
memo[current_id] = resolve_unordered_values(current, "frozenset") items = snapshots.pop(current_id, None)
if items is None:
items = list(current)
memo[current_id] = resolve_unordered_values(items, "frozenset")
except RuntimeError: except RuntimeError:
memo[current_id] = Unhashable() memo[current_id] = Unhashable()
continue continue
@ -329,6 +345,7 @@ def to_hashable(obj, max_nodes=_MAX_SIGNATURE_CONTAINER_VISITS):
if current_type is dict: if current_type is dict:
try: try:
items = list(current.items()) items = list(current.items())
snapshots[current_id] = items
except RuntimeError: except RuntimeError:
memo[current_id] = Unhashable() memo[current_id] = Unhashable()
active.discard(current_id) active.discard(current_id)
@ -339,6 +356,7 @@ def to_hashable(obj, max_nodes=_MAX_SIGNATURE_CONTAINER_VISITS):
else: else:
try: try:
items = list(current) items = list(current)
snapshots[current_id] = items
except RuntimeError: except RuntimeError:
memo[current_id] = Unhashable() memo[current_id] = Unhashable()
active.discard(current_id) active.discard(current_id)