fix(runner): record failed task attempts

This commit is contained in:
wangbo 2026-05-17 20:50:20 +08:00
parent ae197a742f
commit 90c3315468
4 changed files with 287 additions and 29 deletions

View File

@ -1,6 +1,7 @@
package runner
import (
"errors"
"strings"
"time"
@ -210,6 +211,9 @@ func failureMetrics(err error, simulated bool) (string, map[string]any, time.Tim
metrics["error"] = err.Error()
metrics["errorCategory"] = info.Category
metrics["retryable"] = retryable
if detail := rateLimitFailureDetail(err); len(detail) > 0 {
metrics["rateLimit"] = detail
}
}
if meta.StatusCode > 0 {
metrics["statusCode"] = meta.StatusCode
@ -226,6 +230,47 @@ func failureMetrics(err error, simulated bool) (string, map[string]any, time.Tim
return meta.RequestID, metrics, meta.ResponseStartedAt, meta.ResponseFinishedAt, meta.ResponseDurationMS
}
func rateLimitFailureDetail(err error) map[string]any {
var limitErr *store.RateLimitExceededError
if !errors.As(err, &limitErr) {
return nil
}
detail := map[string]any{
"scopeType": limitErr.ScopeType,
"scopeKey": limitErr.ScopeKey,
"scopeName": limitErr.ScopeName,
"metric": limitErr.Metric,
"limit": limitErr.Limit,
"amount": limitErr.Amount,
"current": limitErr.Current,
"used": limitErr.Used,
"reserved": limitErr.Reserved,
"projected": limitErr.Projected,
"windowSeconds": limitErr.WindowSeconds,
"retryable": limitErr.Retryable,
"exceeded": map[string]any{
"metric": limitErr.Metric,
"current": limitErr.Current,
"amount": limitErr.Amount,
"projected": limitErr.Projected,
"limit": limitErr.Limit,
},
}
if limitErr.RetryAfter > 0 {
detail["retryAfterMs"] = limitErr.RetryAfter.Milliseconds()
}
if !limitErr.ResetAt.IsZero() {
detail["resetAt"] = limitErr.ResetAt.UTC().Format(time.RFC3339Nano)
}
if len(limitErr.ScopeMetadata) > 0 {
detail["scopeMetadata"] = limitErr.ScopeMetadata
}
if len(limitErr.Policy) > 0 {
detail["rateLimitPolicy"] = limitErr.Policy
}
return detail
}
func mergeMetrics(values ...map[string]any) map[string]any {
out := map[string]any{}
for _, value := range values {

View File

@ -82,6 +82,17 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
}
}
if err := validateRequest(task.Kind, body); err != nil {
s.recordFailedAttempt(ctx, failedAttemptRecord{
Task: task,
Body: body,
AttemptNo: task.AttemptCount + 1,
Code: "bad_request",
Cause: err,
Simulated: task.RunMode == "simulation",
Scope: "request_validation",
Reason: "request_validation_failed",
ModelType: modelType,
})
failed, finishErr := s.failTask(ctx, task.ID, "bad_request", err.Error(), task.RunMode == "simulation", err)
if finishErr != nil {
return Result{}, finishErr
@ -90,6 +101,17 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
}
candidates, err := s.store.ListModelCandidates(ctx, task.Model, modelType, user)
if err != nil {
s.recordFailedAttempt(ctx, failedAttemptRecord{
Task: task,
Body: body,
AttemptNo: task.AttemptCount + 1,
Code: store.ModelCandidateErrorCode(err),
Cause: err,
Simulated: task.RunMode == "simulation",
Scope: "candidate_selection",
Reason: "candidate_selection_failed",
ModelType: modelType,
})
failed, finishErr := s.failTask(ctx, task.ID, store.ModelCandidateErrorCode(err), err.Error(), task.RunMode == "simulation", err)
if finishErr != nil {
return Result{}, finishErr
@ -98,6 +120,7 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
}
firstCandidateBody := body
normalizedModelType := modelType
attemptNo := task.AttemptCount
var firstPreprocessing parameterPreprocessingLog
if len(candidates) > 0 {
preprocessing := preprocessRequestWithLog(task.Kind, body, candidates[0])
@ -106,9 +129,20 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
normalizedModelType = candidates[0].ModelType
if preprocessing.Err != nil {
clientErr := parameterPreprocessClientError(preprocessing.Err)
if logErr := s.recordTaskParameterPreprocessing(ctx, task.ID, "", 0, candidates[0], firstPreprocessing); logErr != nil {
return Result{}, logErr
}
attemptNo = s.recordFailedAttempt(ctx, failedAttemptRecord{
Task: task,
Body: firstCandidateBody,
Candidate: &candidates[0],
AttemptNo: attemptNo + 1,
Code: clients.ErrorCode(clientErr),
Cause: clientErr,
Simulated: task.RunMode == "simulation",
Scope: "parameter_preprocessing",
Reason: "parameter_preprocessing_failed",
ExtraMetrics: []map[string]any{parameterPreprocessingMetrics(firstPreprocessing)},
Preprocessing: &firstPreprocessing,
ModelType: normalizedModelType,
})
failed, finishErr := s.failTask(ctx, task.ID, clients.ErrorCode(clientErr), clientErr.Error(), task.RunMode == "simulation", clientErr, parameterPreprocessingMetrics(firstPreprocessing))
if finishErr != nil {
return Result{}, finishErr
@ -121,9 +155,20 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
estimatedBillings := s.estimatedBillings(ctx, user, task.Kind, firstCandidateBody, candidates[0])
if err := s.ensureWalletBalance(ctx, user, estimatedBillings); err != nil {
if errors.Is(err, store.ErrInsufficientWalletBalance) {
if logErr := s.recordTaskParameterPreprocessing(ctx, task.ID, "", 0, candidates[0], firstPreprocessing); logErr != nil {
return Result{}, logErr
}
attemptNo = s.recordFailedAttempt(ctx, failedAttemptRecord{
Task: task,
Body: firstCandidateBody,
Candidate: &candidates[0],
AttemptNo: attemptNo + 1,
Code: "insufficient_balance",
Cause: err,
Simulated: task.RunMode == "simulation",
Scope: "wallet_balance",
Reason: "wallet_balance_check_failed",
ExtraMetrics: []map[string]any{parameterPreprocessingMetrics(firstPreprocessing)},
Preprocessing: &firstPreprocessing,
ModelType: normalizedModelType,
})
failed, finishErr := s.failTask(ctx, task.ID, "insufficient_balance", err.Error(), task.RunMode == "simulation", err, parameterPreprocessingMetrics(firstPreprocessing))
if finishErr != nil {
return Result{}, finishErr
@ -143,7 +188,6 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
}
maxPlatforms := maxPlatformsForCandidates(candidates, runnerPolicy)
maxFailoverDuration := maxFailoverDurationForCandidates(candidates, runnerPolicy)
attemptNo := task.AttemptCount
var lastErr error
var lastCandidate store.RuntimeModelCandidate
var lastPreprocessing *parameterPreprocessingLog
@ -162,6 +206,20 @@ candidatesLoop:
lastPreprocessing = &preprocessingLog
if preprocessing.Err != nil {
lastErr = parameterPreprocessClientError(preprocessing.Err)
attemptNo = s.recordFailedAttempt(ctx, failedAttemptRecord{
Task: task,
Body: preprocessing.Body,
Candidate: &candidate,
AttemptNo: nextAttemptNo,
Code: clients.ErrorCode(lastErr),
Cause: lastErr,
Simulated: isSimulation(task, candidate),
Scope: "parameter_preprocessing",
Reason: "parameter_preprocessing_failed",
ExtraMetrics: []map[string]any{parameterPreprocessingMetrics(preprocessingLog)},
Preprocessing: &preprocessingLog,
ModelType: candidate.ModelType,
})
break candidatesLoop
}
candidateBody := preprocessing.Body
@ -222,6 +280,19 @@ candidatesLoop:
}
return Result{Task: queued, Output: queued.Result}, &TaskQueuedError{Delay: delay}
}
attemptNo = s.recordFailedAttempt(ctx, failedAttemptRecord{
Task: task,
Body: candidateBody,
Candidate: &candidate,
AttemptNo: nextAttemptNo,
Code: clients.ErrorCode(err),
Cause: err,
Simulated: isSimulation(task, candidate),
Scope: "rate_limit",
Reason: "local_rate_limit_blocked",
ExtraMetrics: []map[string]any{parameterPreprocessingMetrics(preprocessing.Log)},
ModelType: candidate.ModelType,
})
break candidatesLoop
}
attemptNo = nextAttemptNo
@ -616,6 +687,110 @@ func (s *Service) failTask(ctx context.Context, taskID string, code string, mess
return failed, nil
}
type failedAttemptRecord struct {
Task store.GatewayTask
Body map[string]any
Candidate *store.RuntimeModelCandidate
AttemptNo int
Code string
Cause error
Simulated bool
Scope string
Reason string
ExtraMetrics []map[string]any
Preprocessing *parameterPreprocessingLog
ModelType string
}
func (s *Service) recordFailedAttempt(ctx context.Context, input failedAttemptRecord) int {
attemptNo := input.AttemptNo
if attemptNo <= 0 {
attemptNo = input.Task.AttemptCount + 1
}
code := firstNonEmptyString(input.Code, clients.ErrorCode(input.Cause))
message := ""
if input.Cause != nil {
message = input.Cause.Error()
}
retryable := clients.IsRetryable(input.Cause)
requestID, failure, responseStartedAt, responseFinishedAt, responseDurationMS := failureMetrics(input.Cause, input.Simulated)
scope := firstNonEmptyString(input.Scope, "pre_provider")
reason := firstNonEmptyString(input.Reason, "pre_provider_failed")
trace := failureTraceEntryWithReason(input.Cause, retryable, scope, reason)
statusCode := clients.ErrorResponseMetadata(input.Cause).StatusCode
category := failureCategory(strings.ToLower(strings.TrimSpace(code)), statusCode, message)
if code != "" {
failure["errorCode"] = code
trace["errorCode"] = code
}
if category != "" {
failure["errorCategory"] = category
trace["category"] = category
}
failure["failureScope"] = scope
failure["failureReason"] = reason
failure["trace"] = []any{trace}
baseMetrics := map[string]any{
"attempt": attemptNo,
"kind": input.Task.Kind,
"runMode": input.Task.RunMode,
"requestedModel": input.Task.Model,
"simulated": input.Simulated,
}
if input.ModelType != "" {
baseMetrics["modelType"] = input.ModelType
}
var platformID, platformModelID, clientID, queueKey string
if input.Candidate != nil {
baseMetrics = attemptMetrics(*input.Candidate, attemptNo, input.Simulated)
baseMetrics["kind"] = input.Task.Kind
baseMetrics["runMode"] = input.Task.RunMode
baseMetrics["requestedModel"] = input.Task.Model
platformID = input.Candidate.PlatformID
platformModelID = input.Candidate.PlatformModelID
clientID = input.Candidate.ClientID
queueKey = input.Candidate.QueueKey
}
metrics := mergeMetrics(append([]map[string]any{baseMetrics, failure}, input.ExtraMetrics...)...)
attemptID, err := s.store.CreateTaskAttempt(ctx, store.CreateTaskAttemptInput{
TaskID: input.Task.ID,
AttemptNo: attemptNo,
PlatformID: platformID,
PlatformModelID: platformModelID,
ClientID: clientID,
QueueKey: queueKey,
Status: "running",
Simulated: input.Simulated,
RequestSnapshot: input.Body,
Metrics: metrics,
})
if err != nil {
s.logger.Warn("record failed task attempt failed", "taskID", input.Task.ID, "attempt", attemptNo, "error", err)
return attemptNo
}
if input.Preprocessing != nil && input.Candidate != nil {
if err := s.recordTaskParameterPreprocessing(ctx, input.Task.ID, attemptID, attemptNo, *input.Candidate, *input.Preprocessing); err != nil {
s.logger.Warn("record failed attempt parameter preprocessing failed", "taskID", input.Task.ID, "attempt", attemptNo, "error", err)
}
}
if err := s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
AttemptID: attemptID,
Status: "failed",
Retryable: retryable,
RequestID: requestID,
Metrics: metrics,
ResponseStartedAt: responseStartedAt,
ResponseFinishedAt: responseFinishedAt,
ResponseDurationMS: responseDurationMS,
ErrorCode: code,
ErrorMessage: message,
}); err != nil {
s.logger.Warn("finish failed task attempt failed", "taskID", input.Task.ID, "attempt", attemptNo, "error", err)
}
return attemptNo
}
func (s *Service) requeueRateLimitedTask(ctx context.Context, task store.GatewayTask, cause error, candidate store.RuntimeModelCandidate) (store.GatewayTask, time.Duration, error) {
delay := localRateLimitRetryAfter(cause)
if delay <= 0 {

View File

@ -7,8 +7,12 @@ import (
)
func failureTraceEntry(err error, retryable bool) map[string]any {
return failureTraceEntryWithReason(err, retryable, "client", "client_call_failed")
}
func failureTraceEntryWithReason(err error, retryable bool, scope string, reason string) map[string]any {
info := failureInfoFromError(err)
entry := policyTraceEntry("failure", "client", "failed", "client_call_failed", policyRuleMatch{}, info)
entry := policyTraceEntry("failure", scope, "failed", reason, policyRuleMatch{}, info)
entry["retryable"] = retryable
return entry
}

View File

@ -971,28 +971,33 @@ function TaskAttemptPopoverContent(props: { task: GatewayTask }) {
const attempts = props.task.attempts ?? [];
return (
<span className="taskRecordAttemptPopover" role="tooltip">
{attempts.map((attempt) => (
<span
key={attempt.id || `${props.task.id}-${attempt.attemptNo}`}
className={`taskRecordAttemptDetail ${attempt.status === 'failed' ? 'failed' : attempt.status === 'succeeded' ? 'succeeded' : ''}`}
>
<span className="taskRecordAttemptDetailHeader">
<strong>#{attempt.attemptNo} {taskAttemptTarget(attempt)}</strong>
<Badge variant={attempt.status === 'succeeded' ? 'success' : attempt.status === 'failed' ? 'destructive' : 'secondary'}>{taskAttemptStatusText(attempt.status)}</Badge>
</span>
<small>{taskAttemptMeta(attempt)}</small>
{attempt.status === 'failed' && <span className="taskRecordAttemptError">{taskAttemptFailureReason(attempt)}</span>}
{taskAttemptTrace(attempt).length > 0 && (
<span className="taskRecordAttemptTrace">
{taskAttemptTrace(attempt).map((entry, index) => (
<span key={`${attempt.id || attempt.attemptNo}-trace-${index}`} className="taskRecordAttemptTraceItem">
{taskAttemptTraceText(entry)}
</span>
))}
{attempts.map((attempt) => {
const trace = taskAttemptTrace(attempt);
const rateLimitText = taskAttemptRateLimitText(attempt);
return (
<span
key={attempt.id || `${props.task.id}-${attempt.attemptNo}`}
className={`taskRecordAttemptDetail ${attempt.status === 'failed' ? 'failed' : attempt.status === 'succeeded' ? 'succeeded' : ''}`}
>
<span className="taskRecordAttemptDetailHeader">
<strong>#{attempt.attemptNo} {taskAttemptTarget(attempt)}</strong>
<Badge variant={attempt.status === 'succeeded' ? 'success' : attempt.status === 'failed' ? 'destructive' : 'secondary'}>{taskAttemptStatusText(attempt.status)}</Badge>
</span>
)}
</span>
))}
<small>{taskAttemptMeta(attempt)}</small>
{attempt.status === 'failed' && <span className="taskRecordAttemptError">{taskAttemptFailureReason(attempt)}</span>}
{(rateLimitText || trace.length > 0) && (
<span className="taskRecordAttemptTrace">
{rateLimitText && <span className="taskRecordAttemptTraceItem">{rateLimitText}</span>}
{trace.map((entry, index) => (
<span key={`${attempt.id || attempt.attemptNo}-trace-${index}`} className="taskRecordAttemptTraceItem">
{taskAttemptTraceText(entry)}
</span>
))}
</span>
)}
</span>
);
})}
</span>
);
}
@ -1055,6 +1060,29 @@ function taskAttemptTrace(attempt: NonNullable<GatewayTask['attempts']>[number])
return raw.filter((item): item is Record<string, unknown> => Boolean(item) && typeof item === 'object' && !Array.isArray(item));
}
function taskAttemptRateLimitText(attempt: NonNullable<GatewayTask['attempts']>[number]) {
const detail = metadataObject(attempt.metrics, 'rateLimit');
if (!Object.keys(detail).length) return '';
const scopeName = objectString(detail, 'scopeName') || objectString(detail, 'scopeKey') || '限流对象';
const metric = objectString(detail, 'metric') || 'rate_limit';
const current = metadataNumber(detail, 'current');
const amount = metadataNumber(detail, 'amount');
const projected = metadataNumber(detail, 'projected');
const limit = metadataNumber(detail, 'limit');
const windowSeconds = metadataNumber(detail, 'windowSeconds');
const retryAfterMs = metadataNumber(detail, 'retryAfterMs');
const values = [
`限流 ${scopeName} · ${metric}`,
current !== null ? `当前 ${formatCellValue(current)}` : '',
amount !== null ? `本次 ${formatCellValue(amount)}` : '',
projected !== null ? `预计 ${formatCellValue(projected)}` : '',
limit !== null ? `限制 ${formatCellValue(limit)}` : '',
windowSeconds !== null ? `窗口 ${Math.trunc(windowSeconds)}` : '',
retryAfterMs !== null ? `${formatDuration(Math.trunc(retryAfterMs))} 后可重试` : '',
].filter(Boolean);
return values.join(' · ');
}
function taskAttemptTraceText(entry: Record<string, unknown>) {
const event = objectString(entry, 'event');
const action = objectString(entry, 'action');
@ -1116,6 +1144,12 @@ function taskAttemptTraceReasonLabel(reason: string) {
client_retryable: '客户端标记可重试',
client_non_retryable: '客户端标记不可重试',
same_client_max_attempts: '达到本平台最大尝试次数',
request_validation_failed: '请求校验失败',
candidate_selection_failed: '候选模型选择失败',
parameter_preprocessing_failed: '参数预处理失败',
wallet_balance_check_failed: '余额校验失败',
local_rate_limit_blocked: '本地限流拦截',
pre_provider_failed: '调用上游前失败',
local_rate_limit_wait_queue: '本地限流排队等待',
failover_time_budget_exceeded: '超过全局切换时间预算',
runner_policy_disabled: '全局调度策略停用',