From 8ee7a7969e7affc4de1768d0e6ac186f61e2200e Mon Sep 17 00:00:00 2001 From: wangbo Date: Sat, 30 May 2026 14:25:29 +0800 Subject: [PATCH] feat: expose retry priority in retry chain --- apps/api/internal/runner/recording.go | 34 ++++++++++++++------------- apps/api/internal/runner/service.go | 8 +++---- apps/api/internal/runner/trace.go | 25 +++++++++++++++++--- apps/web/src/pages/WorkspacePage.tsx | 12 ++++++++++ 4 files changed, 56 insertions(+), 23 deletions(-) diff --git a/apps/api/internal/runner/recording.go b/apps/api/internal/runner/recording.go index 6c0d5fa..d23dc28 100644 --- a/apps/api/internal/runner/recording.go +++ b/apps/api/internal/runner/recording.go @@ -100,22 +100,24 @@ func taskMetrics(task store.GatewayTask, user *auth.User, body map[string]any, c func attemptMetrics(candidate store.RuntimeModelCandidate, attemptNo int, simulated bool) map[string]any { metrics := map[string]any{ - "resolvedModel": candidate.ModelName, - "modelName": candidate.ModelName, - "modelAlias": candidate.ModelAlias, - "providerModel": candidate.ProviderModelName, - "canonicalModel": candidate.CanonicalModelKey, - "modelType": candidate.ModelType, - "provider": candidate.Provider, - "platformId": candidate.PlatformID, - "platformKey": candidate.PlatformKey, - "platformName": candidate.PlatformName, - "platformModelId": candidate.PlatformModelID, - "clientId": candidate.ClientID, - "queueKey": candidate.QueueKey, - "loadRatio": candidate.LoadRatio, - "loadAvoided": candidate.LoadAvoided, - "simulated": simulated, + "resolvedModel": candidate.ModelName, + "modelName": candidate.ModelName, + "modelAlias": candidate.ModelAlias, + "providerModel": candidate.ProviderModelName, + "canonicalModel": candidate.CanonicalModelKey, + "modelType": candidate.ModelType, + "provider": candidate.Provider, + "platformId": candidate.PlatformID, + "platformKey": candidate.PlatformKey, + "platformName": candidate.PlatformName, + "platformModelId": candidate.PlatformModelID, + "clientId": candidate.ClientID, + "queueKey": candidate.QueueKey, + "platformPriority": candidate.PlatformPriority, + "currentPriority": candidate.PlatformPriority, + "loadRatio": candidate.LoadRatio, + "loadAvoided": candidate.LoadAvoided, + "simulated": simulated, } if candidate.LoadLimited { metrics["loadMetrics"] = map[string]any{ diff --git a/apps/api/internal/runner/service.go b/apps/api/internal/runner/service.go index 13e6b75..f4607a3 100644 --- a/apps/api/internal/runner/service.go +++ b/apps/api/internal/runner/service.go @@ -384,7 +384,7 @@ candidatesLoop: retryDecision.Info = failureInfoFromError(err) retryAction = "stop" } - s.recordAttemptTrace(ctx, task.ID, attemptNo, retryTraceEntry(retryDecision, retryAction, clientAttempt, clientAttempts)) + s.recordAttemptTrace(ctx, task.ID, attemptNo, retryTraceEntry(candidate, retryDecision, retryAction, clientAttempt, clientAttempts)) if !retryDecision.Retry { break } @@ -413,7 +413,7 @@ candidatesLoop: decision.Match = policyRuleMatch{Source: "gateway_runner_policies.failover_policy", Policy: "failoverPolicy", Rule: "maxPlatforms", Value: strconv.Itoa(maxPlatforms)} } } - s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision)) + s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision, candidate)) } break } @@ -421,7 +421,7 @@ candidatesLoop: if failoverTimeBudgetExceeded(executeStartedAt, maxFailoverDuration) { elapsedSeconds := int(time.Since(executeStartedAt).Seconds()) maxDurationSeconds := int(maxFailoverDuration.Seconds()) - s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTimeBudgetTraceEntry(elapsedSeconds, maxDurationSeconds, failureInfoFromError(candidateErr))) + s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTimeBudgetTraceEntry(elapsedSeconds, maxDurationSeconds, failureInfoFromError(candidateErr), candidate)) if err := s.emit(ctx, task.ID, "task.failover.stopped", "running", "retry", 0.55, "failover time budget exceeded", map[string]any{ "elapsedSeconds": elapsedSeconds, "maxDurationSeconds": maxDurationSeconds, @@ -436,7 +436,7 @@ candidatesLoop: if !decision.Retry && hasLoadAvoidanceFallback(candidates, index, maxPlatforms) { decision = loadAvoidanceFallbackDecision(candidateErr) } - s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision)) + s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision, candidate)) if !decision.Retry { break } diff --git a/apps/api/internal/runner/trace.go b/apps/api/internal/runner/trace.go index 3f8dd4c..83f4de5 100644 --- a/apps/api/internal/runner/trace.go +++ b/apps/api/internal/runner/trace.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "time" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" ) func failureTraceEntry(err error, retryable bool) map[string]any { @@ -17,21 +19,23 @@ func failureTraceEntryWithReason(err error, retryable bool, scope string, reason return entry } -func retryTraceEntry(decision retryDecision, action string, clientAttempt int, maxAttempts int) map[string]any { +func retryTraceEntry(candidate store.RuntimeModelCandidate, decision retryDecision, action string, clientAttempt int, maxAttempts int) map[string]any { entry := policyTraceEntry("same_client_retry", "same_client", action, decision.Reason, decision.Match, decision.Info) entry["retry"] = decision.Retry entry["clientAttempt"] = clientAttempt entry["maxAttempts"] = maxAttempts + addCandidatePriorityTraceFields(entry, candidate) return entry } -func failoverTraceEntry(decision failoverDecision) map[string]any { +func failoverTraceEntry(decision failoverDecision, candidate store.RuntimeModelCandidate) map[string]any { event := "failover_next" if !decision.Retry { event = "failover_stop" } entry := policyTraceEntry(event, "next_platform", decision.Action, decision.Reason, decision.Match, decision.Info) entry["retry"] = decision.Retry + addCandidatePriorityTraceFields(entry, candidate) if decision.CooldownSeconds > 0 { entry["cooldownSeconds"] = decision.CooldownSeconds } @@ -49,7 +53,7 @@ func priorityDemoteTraceEntry(decision priorityDemoteDecision, platformID string return entry } -func failoverTimeBudgetTraceEntry(elapsedSeconds int, maxDurationSeconds int, info failureInfo) map[string]any { +func failoverTimeBudgetTraceEntry(elapsedSeconds int, maxDurationSeconds int, info failureInfo, candidate store.RuntimeModelCandidate) map[string]any { entry := policyTraceEntry("failover_stop", "next_platform", "stop", "failover_time_budget_exceeded", policyRuleMatch{ Source: "gateway_runner_policies.failover_policy", Policy: "failoverPolicy", @@ -58,9 +62,24 @@ func failoverTimeBudgetTraceEntry(elapsedSeconds int, maxDurationSeconds int, in }, info) entry["elapsedSeconds"] = elapsedSeconds entry["maxDurationSeconds"] = maxDurationSeconds + addCandidatePriorityTraceFields(entry, candidate) return entry } +func addCandidatePriorityTraceFields(entry map[string]any, candidate store.RuntimeModelCandidate) { + if entry == nil { + return + } + entry["currentPriority"] = candidate.PlatformPriority + entry["platformPriority"] = candidate.PlatformPriority + if candidate.PlatformID != "" { + entry["currentPlatformId"] = candidate.PlatformID + } + if candidate.PlatformName != "" { + entry["currentPlatformName"] = candidate.PlatformName + } +} + func policyTraceEntry(event string, scope string, action string, reason string, match policyRuleMatch, info failureInfo) map[string]any { entry := map[string]any{ "event": event, diff --git a/apps/web/src/pages/WorkspacePage.tsx b/apps/web/src/pages/WorkspacePage.tsx index c7ff2c8..2e779e1 100644 --- a/apps/web/src/pages/WorkspacePage.tsx +++ b/apps/web/src/pages/WorkspacePage.tsx @@ -1027,6 +1027,7 @@ function taskAttemptMeta(attempt: NonNullable[number]) const statusCode = taskAttemptStatusCode(attempt); const values = [ attempt.providerModelName || attempt.modelName || attempt.modelAlias, + taskAttemptPriorityText(attempt), attempt.requestId ? `RequestID ${attempt.requestId}` : '', statusCode ? `状态码 ${statusCode}` : '', formatDuration(attemptDurationMs(attempt)), @@ -1091,8 +1092,10 @@ function taskAttemptTraceText(entry: Record) { const errorCode = objectString(entry, 'errorCode'); const category = objectString(entry, 'category'); const policy = taskAttemptTracePolicy(entry); + const priority = metadataPriorityText(entry); const values = [ taskAttemptTraceEventLabel(event, action), + priority, statusCode ? `状态码 ${Math.trunc(statusCode)}` : '', errorCode ? `错误 ${errorCode}` : '', category ? `错误分类 ${category}` : '', @@ -1102,6 +1105,15 @@ function taskAttemptTraceText(entry: Record) { return values.join(' · '); } +function taskAttemptPriorityText(attempt: NonNullable[number]) { + return metadataPriorityText(attempt.metrics); +} + +function metadataPriorityText(metadata: Record | undefined) { + const priority = metadataNumber(metadata, 'currentPriority') ?? metadataNumber(metadata, 'platformPriority'); + return priority !== null ? `优先级 ${formatCellValue(priority)}` : ''; +} + function taskAttemptFailureCategory(attempt: NonNullable[number]) { const category = firstText( metadataString(attempt.metrics, 'errorCategory'),