Traces now include the arguments for executing a node, wherever it makes sense to do so.

This commit is contained in:
doctorpangloss 2024-05-13 15:48:16 -07:00
parent d11aed87ba
commit 78e340e2d8
4 changed files with 40 additions and 9 deletions

View File

@ -66,7 +66,35 @@ def get_input_data(inputs, class_def, unique_id, outputs=None, prompt=None, extr
return input_data_all return input_data_all
def map_node_over_list(obj, input_data_all, func, allow_interrupt=False): @tracer.start_as_current_span("Execute Node")
def map_node_over_list(obj, input_data_all: typing.Dict[str, typing.Any], func: str, allow_interrupt=False):
span = get_current_span()
class_type = obj.__class__.__name__
span.set_attribute("class_type", class_type)
if input_data_all is not None:
for kwarg_name, kwarg_value in input_data_all.items():
if isinstance(kwarg_value, str) or isinstance(kwarg_value, bool) or isinstance(kwarg_value, int) or isinstance(kwarg_value, float):
span.set_attribute(f"input_data_all.{kwarg_name}", kwarg_value)
else:
try:
items_to_display = []
if hasattr(kwarg_value, "shape"):
# if the object has a shape attribute (likely a NumPy array or similar), get up to the first ten elements
flat_values = kwarg_value.flatten() if hasattr(kwarg_value, "flatten") else kwarg_value
items_to_display = [flat_values[i] for i in range(min(10, flat_values.size))]
elif hasattr(kwarg_value, "__getitem__") and hasattr(kwarg_value, "__len__"):
# If the object is indexable and has a length, get the first ten items
items_to_display = [kwarg_value[i] for i in range(min(10, len(kwarg_value)))]
filtered_items = [
item for item in items_to_display if isinstance(item, (str, bool, int, float))
]
if filtered_items:
span.set_attribute(f"input_data_all.{kwarg_name}", filtered_items)
except TypeError:
pass
# check if node wants the lists # check if node wants the lists
input_is_list = False input_is_list = False
if hasattr(obj, "INPUT_IS_LIST"): if hasattr(obj, "INPUT_IS_LIST"):

View File

@ -15,6 +15,7 @@ import warnings
from opentelemetry import trace from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SpanExporter from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter, SpanExporter
@ -74,6 +75,7 @@ def _create_tracer():
patch_spanbuilder_set_channel() patch_spanbuilder_set_channel()
AioPikaInstrumentor().instrument() AioPikaInstrumentor().instrument()
AioHttpServerInstrumentor().instrument() AioHttpServerInstrumentor().instrument()
RequestsInstrumentor().instrument()
return trace.get_tracer(args.otel_service_name) return trace.get_tracer(args.otel_service_name)

View File

@ -25,25 +25,25 @@ class ModelManageable(Protocol):
... ...
def is_clone(self, other: torch.nn.Module) -> bool: def is_clone(self, other: torch.nn.Module) -> bool:
pass ...
def clone_has_same_weights(self, clone: torch.nn.Module) -> bool: def clone_has_same_weights(self, clone: torch.nn.Module) -> bool:
pass ...
def model_size(self) -> int: def model_size(self) -> int:
pass ...
def model_patches_to(self, arg: torch.device | torch.dtype): def model_patches_to(self, arg: torch.device | torch.dtype):
pass ...
def model_dtype(self) -> torch.dtype: def model_dtype(self) -> torch.dtype:
pass ...
def patch_model_lowvram(self, device_to: torch.device, lowvram_model_memory: int) -> torch.nn.Module: def patch_model_lowvram(self, device_to: torch.device, lowvram_model_memory: int) -> torch.nn.Module:
pass ...
def patch_model(self, device_to: torch.device, patch_weights: bool) -> torch.nn.Module: def patch_model(self, device_to: torch.device, patch_weights: bool) -> torch.nn.Module:
pass ...
def unpatch_model(self, offload_device: torch.device, unpatch_weights: Optional[bool] = False) -> torch.nn.Module: def unpatch_model(self, offload_device: torch.device, unpatch_weights: Optional[bool] = False) -> torch.nn.Module:
pass ...

View File

@ -43,5 +43,6 @@ opentelemetry-propagator-jaeger
opentelemetry-instrumentation opentelemetry-instrumentation
opentelemetry-util-http opentelemetry-util-http
opentelemetry-instrumentation-aio-pika opentelemetry-instrumentation-aio-pika
opentelemetry-instrumentation-requests
opentelemetry-semantic-conventions opentelemetry-semantic-conventions
wrapt>=1.16.0 wrapt>=1.16.0