From 2a91b31d12b12de1517ec2f971c1bf7c01f3b96d Mon Sep 17 00:00:00 2001 From: wangbo Date: Tue, 12 May 2026 10:34:06 +0800 Subject: [PATCH] fix: treat local rate limit queueing separately --- .../httpapi/core_flow_integration_test.go | 8 +-- apps/api/internal/runner/limits.go | 5 ++ apps/api/internal/runner/retry_decision.go | 4 +- .../internal/runner/retry_decision_test.go | 19 ++++++ apps/api/internal/runner/service.go | 60 ++++++++++++------- apps/api/internal/store/rate_limits.go | 36 +++++++++++ apps/web/src/pages/WorkspacePage.tsx | 1 + 7 files changed, 107 insertions(+), 26 deletions(-) diff --git a/apps/api/internal/httpapi/core_flow_integration_test.go b/apps/api/internal/httpapi/core_flow_integration_test.go index 61ad82d..3b4047b 100644 --- a/apps/api/internal/httpapi/core_flow_integration_test.go +++ b/apps/api/internal/httpapi/core_flow_integration_test.go @@ -780,15 +780,15 @@ WHERE reference_type = 'gateway_task' if asyncRateLimitDetail.Status != "queued" { t.Fatalf("async rate-limited task should return to queued state, got %+v", asyncRateLimitDetail) } - if len(asyncRateLimitDetail.Attempts) == 0 || asyncRateLimitDetail.Attempts[0].ErrorCode != "rate_limit" { - t.Fatalf("async rate-limited task should record a rate_limit attempt before requeue: %+v", asyncRateLimitDetail) + if len(asyncRateLimitDetail.Attempts) != 0 { + t.Fatalf("async rate-limited task should wait in queue without recording a failed attempt: %+v", asyncRateLimitDetail) } asyncRateLimitCompleted := waitForTaskStatus(t, server.URL, apiKeyResponse.Secret, asyncRateLimitTask.TaskID, []string{"succeeded"}, time.Duration(rateLimitWindowSeconds+3)*time.Second) if asyncRateLimitCompleted.Status != "succeeded" { t.Fatalf("async rate-limited task should be pulled from queue after the limit window resets, got %+v", asyncRateLimitCompleted) } - if len(asyncRateLimitCompleted.Attempts) < 2 || asyncRateLimitCompleted.Attempts[len(asyncRateLimitCompleted.Attempts)-1].Status != "succeeded" { - t.Fatalf("async rate-limited task should create a new successful attempt after requeue: %+v", asyncRateLimitCompleted) + if len(asyncRateLimitCompleted.Attempts) != 1 || asyncRateLimitCompleted.Attempts[0].Status != "succeeded" { + t.Fatalf("async rate-limited task should record only the real provider attempt after requeue: %+v", asyncRateLimitCompleted) } videoRouteModel := "video-route-smoke-" + suffixText diff --git a/apps/api/internal/runner/limits.go b/apps/api/internal/runner/limits.go index 94f7f21..f7ddfc1 100644 --- a/apps/api/internal/runner/limits.go +++ b/apps/api/internal/runner/limits.go @@ -45,6 +45,11 @@ func localRateLimitRetryAfter(err error) time.Duration { return store.RateLimitRetryAfter(err) } +func isLocalRateLimitError(err error) bool { + var limitErr *localRateLimitError + return errors.As(err, &limitErr) +} + func (s *Service) rateLimitReservations(ctx context.Context, user *auth.User, candidate store.RuntimeModelCandidate, body map[string]any) []store.RateLimitReservation { out := make([]store.RateLimitReservation, 0) out = append(out, reservationsFromPolicy("platform_model", candidate.PlatformModelID, effectiveRateLimitPolicy(candidate), body)...) diff --git a/apps/api/internal/runner/retry_decision.go b/apps/api/internal/runner/retry_decision.go index b9ac960..7d15356 100644 --- a/apps/api/internal/runner/retry_decision.go +++ b/apps/api/internal/runner/retry_decision.go @@ -98,8 +98,8 @@ func failoverDecisionForCandidate(runnerPolicy store.RunnerPolicy, candidate sto if cooldownSeconds <= 0 { cooldownSeconds = 300 } - if errors.Is(err, store.ErrRateLimited) && store.RateLimitRetryable(err) { - return failoverDecision{Retry: true, Action: "next", Reason: "local_rate_limit_try_next_candidate", CooldownSeconds: cooldownSeconds, Match: policyRuleMatch{Source: "gateway_rate_limits", Policy: "rateLimitPolicy", Rule: "localCapacity", Value: "exceeded"}, Info: info} + if errors.Is(err, store.ErrRateLimited) { + return failoverDecision{Retry: false, Action: "queue", Reason: "local_rate_limit_wait_queue", Match: policyRuleMatch{Source: "gateway_rate_limits", Policy: "rateLimitPolicy", Rule: "localCapacity", Value: "exceeded"}, Info: info} } if match, ok := failoverAllowMatchWithSources(runnerPolicy.FailoverPolicy, overridePolicy, info); ok { return failoverDecision{Retry: true, Action: action, Reason: "failover_allow_policy", CooldownSeconds: cooldownSeconds, Match: match, Info: info} diff --git a/apps/api/internal/runner/retry_decision_test.go b/apps/api/internal/runner/retry_decision_test.go index ee1aca8..413a713 100644 --- a/apps/api/internal/runner/retry_decision_test.go +++ b/apps/api/internal/runner/retry_decision_test.go @@ -93,6 +93,25 @@ func TestFailoverPolicyAllowsModelOverride(t *testing.T) { } } +func TestLocalRateLimitWaitsInQueueWithoutRetryOrFailover(t *testing.T) { + err := &localRateLimitError{ + clientErr: &clients.ClientError{Code: "rate_limit", Message: "local capacity exceeded", Retryable: true}, + cause: &store.RateLimitExceededError{Metric: "concurrent", Retryable: true}, + } + retryDecision := retryDecisionForCandidate(store.RuntimeModelCandidate{}, err) + if retryDecision.Retry || retryDecision.Reason != "local_rate_limit_wait_queue" { + t.Fatalf("local rate limit should not be same-client retry, got %+v", retryDecision) + } + + failoverDecision := failoverDecisionForCandidate(store.RunnerPolicy{ + Status: "active", + FailoverPolicy: map[string]any{"enabled": true, "allowCategories": []any{"rate_limit"}}, + }, store.RuntimeModelCandidate{}, err) + if failoverDecision.Retry || failoverDecision.Action != "queue" || failoverDecision.Reason != "local_rate_limit_wait_queue" { + t.Fatalf("local rate limit should wait in queue without failover, got %+v", failoverDecision) + } +} + func TestProviderAuthErrorsFailOverInsteadOfHardStop(t *testing.T) { runnerPolicy := store.RunnerPolicy{ Status: "active", diff --git a/apps/api/internal/runner/service.go b/apps/api/internal/runner/service.go index dffba70..55dde51 100644 --- a/apps/api/internal/runner/service.go +++ b/apps/api/internal/runner/service.go @@ -121,6 +121,7 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut maxFailoverDuration := maxFailoverDurationForCandidates(candidates, runnerPolicy) attemptNo := task.AttemptCount var lastErr error +candidatesLoop: for index, candidate := range candidates { if index >= maxPlatforms { break @@ -128,9 +129,10 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut clientAttempts := clientAttemptsForCandidate(candidate) var candidateErr error for clientAttempt := 1; clientAttempt <= clientAttempts; clientAttempt++ { - attemptNo++ - response, err := s.runCandidate(ctx, task, user, body, candidate, attemptNo, onDelta) + nextAttemptNo := attemptNo + 1 + response, err := s.runCandidate(ctx, task, user, body, candidate, nextAttemptNo, onDelta) if err == nil { + attemptNo = nextAttemptNo billings := s.billings(ctx, user, task.Kind, body, candidate, response, isSimulation(task, candidate)) record := buildSuccessRecord(task, user, body, candidate, response, billings, isSimulation(task, candidate)) record.Metrics = s.withAttemptHistory(ctx, task.ID, record.Metrics) @@ -174,6 +176,19 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut } return Result{Task: finished, Output: response.Result}, nil } + if isLocalRateLimitError(err) { + lastErr = err + candidateErr = err + if task.AsyncMode && store.RateLimitRetryable(err) { + queued, delay, queueErr := s.requeueRateLimitedTask(ctx, task, err) + if queueErr != nil { + return Result{}, queueErr + } + return Result{Task: queued, Output: queued.Result}, &TaskQueuedError{Delay: delay} + } + break candidatesLoop + } + attemptNo = nextAttemptNo lastErr = err candidateErr = err retryDecision := retryDecisionForCandidate(candidate, err) @@ -297,9 +312,21 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, attemptNo int, onDelta clients.StreamDelta) (clients.Response, error) { simulated := isSimulation(task, candidate) - if err := s.emit(ctx, task.ID, "task.attempt.started", "running", "submitting", 0.25, "client attempt started", map[string]any{"attempt": attemptNo, "clientId": candidate.ClientID}, simulated); err != nil { - return clients.Response{}, fmt.Errorf("emit attempt started: %w", err) + reservations := s.rateLimitReservations(ctx, user, candidate, body) + limitResult, err := s.store.ReserveRateLimits(ctx, task.ID, "", reservations) + if err != nil { + retryable := store.RateLimitRetryable(err) + clientErr := &clients.ClientError{Code: "rate_limit", Message: err.Error(), Retryable: retryable} + return clients.Response{}, &localRateLimitError{clientErr: clientErr, cause: err, retryAfter: localRateLimitRetryAfter(err)} } + rateReservationsFinalized := false + defer func() { + if !rateReservationsFinalized { + _ = s.store.ReleaseRateLimitReservations(context.WithoutCancel(ctx), limitResult.Reservations, "attempt_failed") + } + }() + defer s.store.ReleaseConcurrencyLeases(context.WithoutCancel(ctx), limitResult.LeaseIDs) + attemptID, err := s.store.CreateTaskAttempt(ctx, store.CreateTaskAttemptInput{ TaskID: task.ID, AttemptNo: attemptNo, @@ -315,28 +342,21 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user if err != nil { return clients.Response{}, fmt.Errorf("create task attempt: %w", err) } - reservations := s.rateLimitReservations(ctx, user, candidate, body) - limitResult, err := s.store.ReserveRateLimits(ctx, task.ID, attemptID, reservations) - if err != nil { - retryable := store.RateLimitRetryable(err) - clientErr := &clients.ClientError{Code: "rate_limit", Message: err.Error(), Retryable: retryable} + if err := s.store.AttachRateLimitResultToAttempt(ctx, attemptID, limitResult); err != nil { + clientErr := &clients.ClientError{Code: "runtime_error", Message: err.Error(), Retryable: false} _ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{ AttemptID: attemptID, Status: "failed", - Retryable: retryable, - Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": retryable, "retryAfterMs": localRateLimitRetryAfter(err).Milliseconds(), "trace": []any{failureTraceEntry(clientErr, retryable)}}), - ErrorCode: "rate_limit", + Retryable: false, + Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false, "trace": []any{failureTraceEntry(clientErr, false)}}), + ErrorCode: clients.ErrorCode(clientErr), ErrorMessage: err.Error(), }) - return clients.Response{}, &localRateLimitError{clientErr: clientErr, cause: err, retryAfter: localRateLimitRetryAfter(err)} + return clients.Response{}, fmt.Errorf("attach rate limit reservations to attempt: %w", err) + } + if err := s.emit(ctx, task.ID, "task.attempt.started", "running", "submitting", 0.25, "client attempt started", map[string]any{"attempt": attemptNo, "clientId": candidate.ClientID}, simulated); err != nil { + return clients.Response{}, fmt.Errorf("emit attempt started: %w", err) } - rateReservationsFinalized := false - defer func() { - if !rateReservationsFinalized { - _ = s.store.ReleaseRateLimitReservations(context.WithoutCancel(ctx), limitResult.Reservations, "attempt_failed") - } - }() - defer s.store.ReleaseConcurrencyLeases(context.WithoutCancel(ctx), limitResult.LeaseIDs) if err := s.store.RecordClientAssignment(ctx, candidate); err != nil { return clients.Response{}, fmt.Errorf("record client assignment: %w", err) diff --git a/apps/api/internal/store/rate_limits.go b/apps/api/internal/store/rate_limits.go index d00e9a7..e564e61 100644 --- a/apps/api/internal/store/rate_limits.go +++ b/apps/api/internal/store/rate_limits.go @@ -230,6 +230,42 @@ WHERE id = $1::uuid AND released_at IS NULL`, leaseID); err != nil && !errors.Is return nil } +func (s *Store) AttachRateLimitResultToAttempt(ctx context.Context, attemptID string, result RateLimitResult) error { + if attemptID == "" || (len(result.Reservations) == 0 && len(result.LeaseIDs) == 0) { + return nil + } + tx, err := s.pool.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + for _, reservation := range result.Reservations { + if reservation.ReservationID == "" { + continue + } + if _, err := tx.Exec(ctx, ` +UPDATE gateway_rate_limit_reservations +SET attempt_id = $2::uuid, + updated_at = now() +WHERE id = $1::uuid`, reservation.ReservationID, attemptID); err != nil { + return err + } + } + for _, leaseID := range result.LeaseIDs { + if leaseID == "" { + continue + } + if _, err := tx.Exec(ctx, ` +UPDATE gateway_concurrency_leases +SET attempt_id = $2::uuid +WHERE id = $1::uuid`, leaseID, attemptID); err != nil { + return err + } + } + return tx.Commit(ctx) +} + func (s *Store) RecoverInterruptedRuntimeState(ctx context.Context) (RuntimeRecoveryResult, error) { tx, err := s.pool.Begin(ctx) if err != nil { diff --git a/apps/web/src/pages/WorkspacePage.tsx b/apps/web/src/pages/WorkspacePage.tsx index 2f510eb..750202b 100644 --- a/apps/web/src/pages/WorkspacePage.tsx +++ b/apps/web/src/pages/WorkspacePage.tsx @@ -985,6 +985,7 @@ function taskAttemptTraceReasonLabel(reason: string) { client_retryable: '客户端标记可重试', client_non_retryable: '客户端标记不可重试', same_client_max_attempts: '达到本平台最大尝试次数', + local_rate_limit_wait_queue: '本地限流排队等待', failover_time_budget_exceeded: '超过全局切换时间预算', runner_policy_disabled: '全局调度策略停用', hard_stop_policy: '命中硬拒绝规则',