diff --git a/apps/api/internal/runner/recording.go b/apps/api/internal/runner/recording.go index 37facb4..4862559 100644 --- a/apps/api/internal/runner/recording.go +++ b/apps/api/internal/runner/recording.go @@ -1,6 +1,7 @@ package runner import ( + "errors" "strings" "time" @@ -210,6 +211,9 @@ func failureMetrics(err error, simulated bool) (string, map[string]any, time.Tim metrics["error"] = err.Error() metrics["errorCategory"] = info.Category metrics["retryable"] = retryable + if detail := rateLimitFailureDetail(err); len(detail) > 0 { + metrics["rateLimit"] = detail + } } if meta.StatusCode > 0 { metrics["statusCode"] = meta.StatusCode @@ -226,6 +230,47 @@ func failureMetrics(err error, simulated bool) (string, map[string]any, time.Tim return meta.RequestID, metrics, meta.ResponseStartedAt, meta.ResponseFinishedAt, meta.ResponseDurationMS } +func rateLimitFailureDetail(err error) map[string]any { + var limitErr *store.RateLimitExceededError + if !errors.As(err, &limitErr) { + return nil + } + detail := map[string]any{ + "scopeType": limitErr.ScopeType, + "scopeKey": limitErr.ScopeKey, + "scopeName": limitErr.ScopeName, + "metric": limitErr.Metric, + "limit": limitErr.Limit, + "amount": limitErr.Amount, + "current": limitErr.Current, + "used": limitErr.Used, + "reserved": limitErr.Reserved, + "projected": limitErr.Projected, + "windowSeconds": limitErr.WindowSeconds, + "retryable": limitErr.Retryable, + "exceeded": map[string]any{ + "metric": limitErr.Metric, + "current": limitErr.Current, + "amount": limitErr.Amount, + "projected": limitErr.Projected, + "limit": limitErr.Limit, + }, + } + if limitErr.RetryAfter > 0 { + detail["retryAfterMs"] = limitErr.RetryAfter.Milliseconds() + } + if !limitErr.ResetAt.IsZero() { + detail["resetAt"] = limitErr.ResetAt.UTC().Format(time.RFC3339Nano) + } + if len(limitErr.ScopeMetadata) > 0 { + detail["scopeMetadata"] = limitErr.ScopeMetadata + } + if len(limitErr.Policy) > 0 { + detail["rateLimitPolicy"] = limitErr.Policy + } + return detail +} + func mergeMetrics(values ...map[string]any) map[string]any { out := map[string]any{} for _, value := range values { diff --git a/apps/api/internal/runner/service.go b/apps/api/internal/runner/service.go index 8fb2c9e..9bb6b50 100644 --- a/apps/api/internal/runner/service.go +++ b/apps/api/internal/runner/service.go @@ -82,6 +82,17 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut } } if err := validateRequest(task.Kind, body); err != nil { + s.recordFailedAttempt(ctx, failedAttemptRecord{ + Task: task, + Body: body, + AttemptNo: task.AttemptCount + 1, + Code: "bad_request", + Cause: err, + Simulated: task.RunMode == "simulation", + Scope: "request_validation", + Reason: "request_validation_failed", + ModelType: modelType, + }) failed, finishErr := s.failTask(ctx, task.ID, "bad_request", err.Error(), task.RunMode == "simulation", err) if finishErr != nil { return Result{}, finishErr @@ -90,6 +101,17 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut } candidates, err := s.store.ListModelCandidates(ctx, task.Model, modelType, user) if err != nil { + s.recordFailedAttempt(ctx, failedAttemptRecord{ + Task: task, + Body: body, + AttemptNo: task.AttemptCount + 1, + Code: store.ModelCandidateErrorCode(err), + Cause: err, + Simulated: task.RunMode == "simulation", + Scope: "candidate_selection", + Reason: "candidate_selection_failed", + ModelType: modelType, + }) failed, finishErr := s.failTask(ctx, task.ID, store.ModelCandidateErrorCode(err), err.Error(), task.RunMode == "simulation", err) if finishErr != nil { return Result{}, finishErr @@ -98,6 +120,7 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut } firstCandidateBody := body normalizedModelType := modelType + attemptNo := task.AttemptCount var firstPreprocessing parameterPreprocessingLog if len(candidates) > 0 { preprocessing := preprocessRequestWithLog(task.Kind, body, candidates[0]) @@ -106,9 +129,20 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut normalizedModelType = candidates[0].ModelType if preprocessing.Err != nil { clientErr := parameterPreprocessClientError(preprocessing.Err) - if logErr := s.recordTaskParameterPreprocessing(ctx, task.ID, "", 0, candidates[0], firstPreprocessing); logErr != nil { - return Result{}, logErr - } + attemptNo = s.recordFailedAttempt(ctx, failedAttemptRecord{ + Task: task, + Body: firstCandidateBody, + Candidate: &candidates[0], + AttemptNo: attemptNo + 1, + Code: clients.ErrorCode(clientErr), + Cause: clientErr, + Simulated: task.RunMode == "simulation", + Scope: "parameter_preprocessing", + Reason: "parameter_preprocessing_failed", + ExtraMetrics: []map[string]any{parameterPreprocessingMetrics(firstPreprocessing)}, + Preprocessing: &firstPreprocessing, + ModelType: normalizedModelType, + }) failed, finishErr := s.failTask(ctx, task.ID, clients.ErrorCode(clientErr), clientErr.Error(), task.RunMode == "simulation", clientErr, parameterPreprocessingMetrics(firstPreprocessing)) if finishErr != nil { return Result{}, finishErr @@ -121,9 +155,20 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut estimatedBillings := s.estimatedBillings(ctx, user, task.Kind, firstCandidateBody, candidates[0]) if err := s.ensureWalletBalance(ctx, user, estimatedBillings); err != nil { if errors.Is(err, store.ErrInsufficientWalletBalance) { - if logErr := s.recordTaskParameterPreprocessing(ctx, task.ID, "", 0, candidates[0], firstPreprocessing); logErr != nil { - return Result{}, logErr - } + attemptNo = s.recordFailedAttempt(ctx, failedAttemptRecord{ + Task: task, + Body: firstCandidateBody, + Candidate: &candidates[0], + AttemptNo: attemptNo + 1, + Code: "insufficient_balance", + Cause: err, + Simulated: task.RunMode == "simulation", + Scope: "wallet_balance", + Reason: "wallet_balance_check_failed", + ExtraMetrics: []map[string]any{parameterPreprocessingMetrics(firstPreprocessing)}, + Preprocessing: &firstPreprocessing, + ModelType: normalizedModelType, + }) failed, finishErr := s.failTask(ctx, task.ID, "insufficient_balance", err.Error(), task.RunMode == "simulation", err, parameterPreprocessingMetrics(firstPreprocessing)) if finishErr != nil { return Result{}, finishErr @@ -143,7 +188,6 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut } maxPlatforms := maxPlatformsForCandidates(candidates, runnerPolicy) maxFailoverDuration := maxFailoverDurationForCandidates(candidates, runnerPolicy) - attemptNo := task.AttemptCount var lastErr error var lastCandidate store.RuntimeModelCandidate var lastPreprocessing *parameterPreprocessingLog @@ -162,6 +206,20 @@ candidatesLoop: lastPreprocessing = &preprocessingLog if preprocessing.Err != nil { lastErr = parameterPreprocessClientError(preprocessing.Err) + attemptNo = s.recordFailedAttempt(ctx, failedAttemptRecord{ + Task: task, + Body: preprocessing.Body, + Candidate: &candidate, + AttemptNo: nextAttemptNo, + Code: clients.ErrorCode(lastErr), + Cause: lastErr, + Simulated: isSimulation(task, candidate), + Scope: "parameter_preprocessing", + Reason: "parameter_preprocessing_failed", + ExtraMetrics: []map[string]any{parameterPreprocessingMetrics(preprocessingLog)}, + Preprocessing: &preprocessingLog, + ModelType: candidate.ModelType, + }) break candidatesLoop } candidateBody := preprocessing.Body @@ -222,6 +280,19 @@ candidatesLoop: } return Result{Task: queued, Output: queued.Result}, &TaskQueuedError{Delay: delay} } + attemptNo = s.recordFailedAttempt(ctx, failedAttemptRecord{ + Task: task, + Body: candidateBody, + Candidate: &candidate, + AttemptNo: nextAttemptNo, + Code: clients.ErrorCode(err), + Cause: err, + Simulated: isSimulation(task, candidate), + Scope: "rate_limit", + Reason: "local_rate_limit_blocked", + ExtraMetrics: []map[string]any{parameterPreprocessingMetrics(preprocessing.Log)}, + ModelType: candidate.ModelType, + }) break candidatesLoop } attemptNo = nextAttemptNo @@ -616,6 +687,110 @@ func (s *Service) failTask(ctx context.Context, taskID string, code string, mess return failed, nil } +type failedAttemptRecord struct { + Task store.GatewayTask + Body map[string]any + Candidate *store.RuntimeModelCandidate + AttemptNo int + Code string + Cause error + Simulated bool + Scope string + Reason string + ExtraMetrics []map[string]any + Preprocessing *parameterPreprocessingLog + ModelType string +} + +func (s *Service) recordFailedAttempt(ctx context.Context, input failedAttemptRecord) int { + attemptNo := input.AttemptNo + if attemptNo <= 0 { + attemptNo = input.Task.AttemptCount + 1 + } + code := firstNonEmptyString(input.Code, clients.ErrorCode(input.Cause)) + message := "" + if input.Cause != nil { + message = input.Cause.Error() + } + retryable := clients.IsRetryable(input.Cause) + requestID, failure, responseStartedAt, responseFinishedAt, responseDurationMS := failureMetrics(input.Cause, input.Simulated) + scope := firstNonEmptyString(input.Scope, "pre_provider") + reason := firstNonEmptyString(input.Reason, "pre_provider_failed") + trace := failureTraceEntryWithReason(input.Cause, retryable, scope, reason) + statusCode := clients.ErrorResponseMetadata(input.Cause).StatusCode + category := failureCategory(strings.ToLower(strings.TrimSpace(code)), statusCode, message) + if code != "" { + failure["errorCode"] = code + trace["errorCode"] = code + } + if category != "" { + failure["errorCategory"] = category + trace["category"] = category + } + failure["failureScope"] = scope + failure["failureReason"] = reason + failure["trace"] = []any{trace} + + baseMetrics := map[string]any{ + "attempt": attemptNo, + "kind": input.Task.Kind, + "runMode": input.Task.RunMode, + "requestedModel": input.Task.Model, + "simulated": input.Simulated, + } + if input.ModelType != "" { + baseMetrics["modelType"] = input.ModelType + } + var platformID, platformModelID, clientID, queueKey string + if input.Candidate != nil { + baseMetrics = attemptMetrics(*input.Candidate, attemptNo, input.Simulated) + baseMetrics["kind"] = input.Task.Kind + baseMetrics["runMode"] = input.Task.RunMode + baseMetrics["requestedModel"] = input.Task.Model + platformID = input.Candidate.PlatformID + platformModelID = input.Candidate.PlatformModelID + clientID = input.Candidate.ClientID + queueKey = input.Candidate.QueueKey + } + metrics := mergeMetrics(append([]map[string]any{baseMetrics, failure}, input.ExtraMetrics...)...) + attemptID, err := s.store.CreateTaskAttempt(ctx, store.CreateTaskAttemptInput{ + TaskID: input.Task.ID, + AttemptNo: attemptNo, + PlatformID: platformID, + PlatformModelID: platformModelID, + ClientID: clientID, + QueueKey: queueKey, + Status: "running", + Simulated: input.Simulated, + RequestSnapshot: input.Body, + Metrics: metrics, + }) + if err != nil { + s.logger.Warn("record failed task attempt failed", "taskID", input.Task.ID, "attempt", attemptNo, "error", err) + return attemptNo + } + if input.Preprocessing != nil && input.Candidate != nil { + if err := s.recordTaskParameterPreprocessing(ctx, input.Task.ID, attemptID, attemptNo, *input.Candidate, *input.Preprocessing); err != nil { + s.logger.Warn("record failed attempt parameter preprocessing failed", "taskID", input.Task.ID, "attempt", attemptNo, "error", err) + } + } + if err := s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{ + AttemptID: attemptID, + Status: "failed", + Retryable: retryable, + RequestID: requestID, + Metrics: metrics, + ResponseStartedAt: responseStartedAt, + ResponseFinishedAt: responseFinishedAt, + ResponseDurationMS: responseDurationMS, + ErrorCode: code, + ErrorMessage: message, + }); err != nil { + s.logger.Warn("finish failed task attempt failed", "taskID", input.Task.ID, "attempt", attemptNo, "error", err) + } + return attemptNo +} + func (s *Service) requeueRateLimitedTask(ctx context.Context, task store.GatewayTask, cause error, candidate store.RuntimeModelCandidate) (store.GatewayTask, time.Duration, error) { delay := localRateLimitRetryAfter(cause) if delay <= 0 { diff --git a/apps/api/internal/runner/trace.go b/apps/api/internal/runner/trace.go index e0fcff5..3f8dd4c 100644 --- a/apps/api/internal/runner/trace.go +++ b/apps/api/internal/runner/trace.go @@ -7,8 +7,12 @@ import ( ) func failureTraceEntry(err error, retryable bool) map[string]any { + return failureTraceEntryWithReason(err, retryable, "client", "client_call_failed") +} + +func failureTraceEntryWithReason(err error, retryable bool, scope string, reason string) map[string]any { info := failureInfoFromError(err) - entry := policyTraceEntry("failure", "client", "failed", "client_call_failed", policyRuleMatch{}, info) + entry := policyTraceEntry("failure", scope, "failed", reason, policyRuleMatch{}, info) entry["retryable"] = retryable return entry } diff --git a/apps/web/src/pages/WorkspacePage.tsx b/apps/web/src/pages/WorkspacePage.tsx index 6da9ff5..e7e8eef 100644 --- a/apps/web/src/pages/WorkspacePage.tsx +++ b/apps/web/src/pages/WorkspacePage.tsx @@ -971,28 +971,33 @@ function TaskAttemptPopoverContent(props: { task: GatewayTask }) { const attempts = props.task.attempts ?? []; return ( - {attempts.map((attempt) => ( - - - #{attempt.attemptNo} {taskAttemptTarget(attempt)} - {taskAttemptStatusText(attempt.status)} - - {taskAttemptMeta(attempt)} - {attempt.status === 'failed' && {taskAttemptFailureReason(attempt)}} - {taskAttemptTrace(attempt).length > 0 && ( - - {taskAttemptTrace(attempt).map((entry, index) => ( - - {taskAttemptTraceText(entry)} - - ))} + {attempts.map((attempt) => { + const trace = taskAttemptTrace(attempt); + const rateLimitText = taskAttemptRateLimitText(attempt); + return ( + + + #{attempt.attemptNo} {taskAttemptTarget(attempt)} + {taskAttemptStatusText(attempt.status)} - )} - - ))} + {taskAttemptMeta(attempt)} + {attempt.status === 'failed' && {taskAttemptFailureReason(attempt)}} + {(rateLimitText || trace.length > 0) && ( + + {rateLimitText && {rateLimitText}} + {trace.map((entry, index) => ( + + {taskAttemptTraceText(entry)} + + ))} + + )} + + ); + })} ); } @@ -1055,6 +1060,29 @@ function taskAttemptTrace(attempt: NonNullable[number]) return raw.filter((item): item is Record => Boolean(item) && typeof item === 'object' && !Array.isArray(item)); } +function taskAttemptRateLimitText(attempt: NonNullable[number]) { + const detail = metadataObject(attempt.metrics, 'rateLimit'); + if (!Object.keys(detail).length) return ''; + const scopeName = objectString(detail, 'scopeName') || objectString(detail, 'scopeKey') || '限流对象'; + const metric = objectString(detail, 'metric') || 'rate_limit'; + const current = metadataNumber(detail, 'current'); + const amount = metadataNumber(detail, 'amount'); + const projected = metadataNumber(detail, 'projected'); + const limit = metadataNumber(detail, 'limit'); + const windowSeconds = metadataNumber(detail, 'windowSeconds'); + const retryAfterMs = metadataNumber(detail, 'retryAfterMs'); + const values = [ + `限流 ${scopeName} · ${metric}`, + current !== null ? `当前 ${formatCellValue(current)}` : '', + amount !== null ? `本次 ${formatCellValue(amount)}` : '', + projected !== null ? `预计 ${formatCellValue(projected)}` : '', + limit !== null ? `限制 ${formatCellValue(limit)}` : '', + windowSeconds !== null ? `窗口 ${Math.trunc(windowSeconds)} 秒` : '', + retryAfterMs !== null ? `约 ${formatDuration(Math.trunc(retryAfterMs))} 后可重试` : '', + ].filter(Boolean); + return values.join(' · '); +} + function taskAttemptTraceText(entry: Record) { const event = objectString(entry, 'event'); const action = objectString(entry, 'action'); @@ -1116,6 +1144,12 @@ function taskAttemptTraceReasonLabel(reason: string) { client_retryable: '客户端标记可重试', client_non_retryable: '客户端标记不可重试', same_client_max_attempts: '达到本平台最大尝试次数', + request_validation_failed: '请求校验失败', + candidate_selection_failed: '候选模型选择失败', + parameter_preprocessing_failed: '参数预处理失败', + wallet_balance_check_failed: '余额校验失败', + local_rate_limit_blocked: '本地限流拦截', + pre_provider_failed: '调用上游前失败', local_rate_limit_wait_queue: '本地限流排队等待', failover_time_budget_exceeded: '超过全局切换时间预算', runner_policy_disabled: '全局调度策略停用',