feat: expose retry priority in retry chain
This commit is contained in:
parent
a6f1be8f07
commit
8ee7a7969e
@ -100,22 +100,24 @@ func taskMetrics(task store.GatewayTask, user *auth.User, body map[string]any, c
|
||||
|
||||
func attemptMetrics(candidate store.RuntimeModelCandidate, attemptNo int, simulated bool) map[string]any {
|
||||
metrics := map[string]any{
|
||||
"resolvedModel": candidate.ModelName,
|
||||
"modelName": candidate.ModelName,
|
||||
"modelAlias": candidate.ModelAlias,
|
||||
"providerModel": candidate.ProviderModelName,
|
||||
"canonicalModel": candidate.CanonicalModelKey,
|
||||
"modelType": candidate.ModelType,
|
||||
"provider": candidate.Provider,
|
||||
"platformId": candidate.PlatformID,
|
||||
"platformKey": candidate.PlatformKey,
|
||||
"platformName": candidate.PlatformName,
|
||||
"platformModelId": candidate.PlatformModelID,
|
||||
"clientId": candidate.ClientID,
|
||||
"queueKey": candidate.QueueKey,
|
||||
"loadRatio": candidate.LoadRatio,
|
||||
"loadAvoided": candidate.LoadAvoided,
|
||||
"simulated": simulated,
|
||||
"resolvedModel": candidate.ModelName,
|
||||
"modelName": candidate.ModelName,
|
||||
"modelAlias": candidate.ModelAlias,
|
||||
"providerModel": candidate.ProviderModelName,
|
||||
"canonicalModel": candidate.CanonicalModelKey,
|
||||
"modelType": candidate.ModelType,
|
||||
"provider": candidate.Provider,
|
||||
"platformId": candidate.PlatformID,
|
||||
"platformKey": candidate.PlatformKey,
|
||||
"platformName": candidate.PlatformName,
|
||||
"platformModelId": candidate.PlatformModelID,
|
||||
"clientId": candidate.ClientID,
|
||||
"queueKey": candidate.QueueKey,
|
||||
"platformPriority": candidate.PlatformPriority,
|
||||
"currentPriority": candidate.PlatformPriority,
|
||||
"loadRatio": candidate.LoadRatio,
|
||||
"loadAvoided": candidate.LoadAvoided,
|
||||
"simulated": simulated,
|
||||
}
|
||||
if candidate.LoadLimited {
|
||||
metrics["loadMetrics"] = map[string]any{
|
||||
|
||||
@ -384,7 +384,7 @@ candidatesLoop:
|
||||
retryDecision.Info = failureInfoFromError(err)
|
||||
retryAction = "stop"
|
||||
}
|
||||
s.recordAttemptTrace(ctx, task.ID, attemptNo, retryTraceEntry(retryDecision, retryAction, clientAttempt, clientAttempts))
|
||||
s.recordAttemptTrace(ctx, task.ID, attemptNo, retryTraceEntry(candidate, retryDecision, retryAction, clientAttempt, clientAttempts))
|
||||
if !retryDecision.Retry {
|
||||
break
|
||||
}
|
||||
@ -413,7 +413,7 @@ candidatesLoop:
|
||||
decision.Match = policyRuleMatch{Source: "gateway_runner_policies.failover_policy", Policy: "failoverPolicy", Rule: "maxPlatforms", Value: strconv.Itoa(maxPlatforms)}
|
||||
}
|
||||
}
|
||||
s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision))
|
||||
s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision, candidate))
|
||||
}
|
||||
break
|
||||
}
|
||||
@ -421,7 +421,7 @@ candidatesLoop:
|
||||
if failoverTimeBudgetExceeded(executeStartedAt, maxFailoverDuration) {
|
||||
elapsedSeconds := int(time.Since(executeStartedAt).Seconds())
|
||||
maxDurationSeconds := int(maxFailoverDuration.Seconds())
|
||||
s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTimeBudgetTraceEntry(elapsedSeconds, maxDurationSeconds, failureInfoFromError(candidateErr)))
|
||||
s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTimeBudgetTraceEntry(elapsedSeconds, maxDurationSeconds, failureInfoFromError(candidateErr), candidate))
|
||||
if err := s.emit(ctx, task.ID, "task.failover.stopped", "running", "retry", 0.55, "failover time budget exceeded", map[string]any{
|
||||
"elapsedSeconds": elapsedSeconds,
|
||||
"maxDurationSeconds": maxDurationSeconds,
|
||||
@ -436,7 +436,7 @@ candidatesLoop:
|
||||
if !decision.Retry && hasLoadAvoidanceFallback(candidates, index, maxPlatforms) {
|
||||
decision = loadAvoidanceFallbackDecision(candidateErr)
|
||||
}
|
||||
s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision))
|
||||
s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision, candidate))
|
||||
if !decision.Retry {
|
||||
break
|
||||
}
|
||||
|
||||
@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/easyai/easyai-ai-gateway/apps/api/internal/store"
|
||||
)
|
||||
|
||||
func failureTraceEntry(err error, retryable bool) map[string]any {
|
||||
@ -17,21 +19,23 @@ func failureTraceEntryWithReason(err error, retryable bool, scope string, reason
|
||||
return entry
|
||||
}
|
||||
|
||||
func retryTraceEntry(decision retryDecision, action string, clientAttempt int, maxAttempts int) map[string]any {
|
||||
func retryTraceEntry(candidate store.RuntimeModelCandidate, decision retryDecision, action string, clientAttempt int, maxAttempts int) map[string]any {
|
||||
entry := policyTraceEntry("same_client_retry", "same_client", action, decision.Reason, decision.Match, decision.Info)
|
||||
entry["retry"] = decision.Retry
|
||||
entry["clientAttempt"] = clientAttempt
|
||||
entry["maxAttempts"] = maxAttempts
|
||||
addCandidatePriorityTraceFields(entry, candidate)
|
||||
return entry
|
||||
}
|
||||
|
||||
func failoverTraceEntry(decision failoverDecision) map[string]any {
|
||||
func failoverTraceEntry(decision failoverDecision, candidate store.RuntimeModelCandidate) map[string]any {
|
||||
event := "failover_next"
|
||||
if !decision.Retry {
|
||||
event = "failover_stop"
|
||||
}
|
||||
entry := policyTraceEntry(event, "next_platform", decision.Action, decision.Reason, decision.Match, decision.Info)
|
||||
entry["retry"] = decision.Retry
|
||||
addCandidatePriorityTraceFields(entry, candidate)
|
||||
if decision.CooldownSeconds > 0 {
|
||||
entry["cooldownSeconds"] = decision.CooldownSeconds
|
||||
}
|
||||
@ -49,7 +53,7 @@ func priorityDemoteTraceEntry(decision priorityDemoteDecision, platformID string
|
||||
return entry
|
||||
}
|
||||
|
||||
func failoverTimeBudgetTraceEntry(elapsedSeconds int, maxDurationSeconds int, info failureInfo) map[string]any {
|
||||
func failoverTimeBudgetTraceEntry(elapsedSeconds int, maxDurationSeconds int, info failureInfo, candidate store.RuntimeModelCandidate) map[string]any {
|
||||
entry := policyTraceEntry("failover_stop", "next_platform", "stop", "failover_time_budget_exceeded", policyRuleMatch{
|
||||
Source: "gateway_runner_policies.failover_policy",
|
||||
Policy: "failoverPolicy",
|
||||
@ -58,9 +62,24 @@ func failoverTimeBudgetTraceEntry(elapsedSeconds int, maxDurationSeconds int, in
|
||||
}, info)
|
||||
entry["elapsedSeconds"] = elapsedSeconds
|
||||
entry["maxDurationSeconds"] = maxDurationSeconds
|
||||
addCandidatePriorityTraceFields(entry, candidate)
|
||||
return entry
|
||||
}
|
||||
|
||||
func addCandidatePriorityTraceFields(entry map[string]any, candidate store.RuntimeModelCandidate) {
|
||||
if entry == nil {
|
||||
return
|
||||
}
|
||||
entry["currentPriority"] = candidate.PlatformPriority
|
||||
entry["platformPriority"] = candidate.PlatformPriority
|
||||
if candidate.PlatformID != "" {
|
||||
entry["currentPlatformId"] = candidate.PlatformID
|
||||
}
|
||||
if candidate.PlatformName != "" {
|
||||
entry["currentPlatformName"] = candidate.PlatformName
|
||||
}
|
||||
}
|
||||
|
||||
func policyTraceEntry(event string, scope string, action string, reason string, match policyRuleMatch, info failureInfo) map[string]any {
|
||||
entry := map[string]any{
|
||||
"event": event,
|
||||
|
||||
@ -1027,6 +1027,7 @@ function taskAttemptMeta(attempt: NonNullable<GatewayTask['attempts']>[number])
|
||||
const statusCode = taskAttemptStatusCode(attempt);
|
||||
const values = [
|
||||
attempt.providerModelName || attempt.modelName || attempt.modelAlias,
|
||||
taskAttemptPriorityText(attempt),
|
||||
attempt.requestId ? `RequestID ${attempt.requestId}` : '',
|
||||
statusCode ? `状态码 ${statusCode}` : '',
|
||||
formatDuration(attemptDurationMs(attempt)),
|
||||
@ -1091,8 +1092,10 @@ function taskAttemptTraceText(entry: Record<string, unknown>) {
|
||||
const errorCode = objectString(entry, 'errorCode');
|
||||
const category = objectString(entry, 'category');
|
||||
const policy = taskAttemptTracePolicy(entry);
|
||||
const priority = metadataPriorityText(entry);
|
||||
const values = [
|
||||
taskAttemptTraceEventLabel(event, action),
|
||||
priority,
|
||||
statusCode ? `状态码 ${Math.trunc(statusCode)}` : '',
|
||||
errorCode ? `错误 ${errorCode}` : '',
|
||||
category ? `错误分类 ${category}` : '',
|
||||
@ -1102,6 +1105,15 @@ function taskAttemptTraceText(entry: Record<string, unknown>) {
|
||||
return values.join(' · ');
|
||||
}
|
||||
|
||||
function taskAttemptPriorityText(attempt: NonNullable<GatewayTask['attempts']>[number]) {
|
||||
return metadataPriorityText(attempt.metrics);
|
||||
}
|
||||
|
||||
function metadataPriorityText(metadata: Record<string, unknown> | undefined) {
|
||||
const priority = metadataNumber(metadata, 'currentPriority') ?? metadataNumber(metadata, 'platformPriority');
|
||||
return priority !== null ? `优先级 ${formatCellValue(priority)}` : '';
|
||||
}
|
||||
|
||||
function taskAttemptFailureCategory(attempt: NonNullable<GatewayTask['attempts']>[number]) {
|
||||
const category = firstText(
|
||||
metadataString(attempt.metrics, 'errorCategory'),
|
||||
|
||||
Loading…
Reference in New Issue
Block a user