fix: treat local rate limit queueing separately
This commit is contained in:
parent
ddfd4f9035
commit
2a91b31d12
@ -780,15 +780,15 @@ WHERE reference_type = 'gateway_task'
|
||||
if asyncRateLimitDetail.Status != "queued" {
|
||||
t.Fatalf("async rate-limited task should return to queued state, got %+v", asyncRateLimitDetail)
|
||||
}
|
||||
if len(asyncRateLimitDetail.Attempts) == 0 || asyncRateLimitDetail.Attempts[0].ErrorCode != "rate_limit" {
|
||||
t.Fatalf("async rate-limited task should record a rate_limit attempt before requeue: %+v", asyncRateLimitDetail)
|
||||
if len(asyncRateLimitDetail.Attempts) != 0 {
|
||||
t.Fatalf("async rate-limited task should wait in queue without recording a failed attempt: %+v", asyncRateLimitDetail)
|
||||
}
|
||||
asyncRateLimitCompleted := waitForTaskStatus(t, server.URL, apiKeyResponse.Secret, asyncRateLimitTask.TaskID, []string{"succeeded"}, time.Duration(rateLimitWindowSeconds+3)*time.Second)
|
||||
if asyncRateLimitCompleted.Status != "succeeded" {
|
||||
t.Fatalf("async rate-limited task should be pulled from queue after the limit window resets, got %+v", asyncRateLimitCompleted)
|
||||
}
|
||||
if len(asyncRateLimitCompleted.Attempts) < 2 || asyncRateLimitCompleted.Attempts[len(asyncRateLimitCompleted.Attempts)-1].Status != "succeeded" {
|
||||
t.Fatalf("async rate-limited task should create a new successful attempt after requeue: %+v", asyncRateLimitCompleted)
|
||||
if len(asyncRateLimitCompleted.Attempts) != 1 || asyncRateLimitCompleted.Attempts[0].Status != "succeeded" {
|
||||
t.Fatalf("async rate-limited task should record only the real provider attempt after requeue: %+v", asyncRateLimitCompleted)
|
||||
}
|
||||
|
||||
videoRouteModel := "video-route-smoke-" + suffixText
|
||||
|
||||
@ -45,6 +45,11 @@ func localRateLimitRetryAfter(err error) time.Duration {
|
||||
return store.RateLimitRetryAfter(err)
|
||||
}
|
||||
|
||||
func isLocalRateLimitError(err error) bool {
|
||||
var limitErr *localRateLimitError
|
||||
return errors.As(err, &limitErr)
|
||||
}
|
||||
|
||||
func (s *Service) rateLimitReservations(ctx context.Context, user *auth.User, candidate store.RuntimeModelCandidate, body map[string]any) []store.RateLimitReservation {
|
||||
out := make([]store.RateLimitReservation, 0)
|
||||
out = append(out, reservationsFromPolicy("platform_model", candidate.PlatformModelID, effectiveRateLimitPolicy(candidate), body)...)
|
||||
|
||||
@ -98,8 +98,8 @@ func failoverDecisionForCandidate(runnerPolicy store.RunnerPolicy, candidate sto
|
||||
if cooldownSeconds <= 0 {
|
||||
cooldownSeconds = 300
|
||||
}
|
||||
if errors.Is(err, store.ErrRateLimited) && store.RateLimitRetryable(err) {
|
||||
return failoverDecision{Retry: true, Action: "next", Reason: "local_rate_limit_try_next_candidate", CooldownSeconds: cooldownSeconds, Match: policyRuleMatch{Source: "gateway_rate_limits", Policy: "rateLimitPolicy", Rule: "localCapacity", Value: "exceeded"}, Info: info}
|
||||
if errors.Is(err, store.ErrRateLimited) {
|
||||
return failoverDecision{Retry: false, Action: "queue", Reason: "local_rate_limit_wait_queue", Match: policyRuleMatch{Source: "gateway_rate_limits", Policy: "rateLimitPolicy", Rule: "localCapacity", Value: "exceeded"}, Info: info}
|
||||
}
|
||||
if match, ok := failoverAllowMatchWithSources(runnerPolicy.FailoverPolicy, overridePolicy, info); ok {
|
||||
return failoverDecision{Retry: true, Action: action, Reason: "failover_allow_policy", CooldownSeconds: cooldownSeconds, Match: match, Info: info}
|
||||
|
||||
@ -93,6 +93,25 @@ func TestFailoverPolicyAllowsModelOverride(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLocalRateLimitWaitsInQueueWithoutRetryOrFailover(t *testing.T) {
|
||||
err := &localRateLimitError{
|
||||
clientErr: &clients.ClientError{Code: "rate_limit", Message: "local capacity exceeded", Retryable: true},
|
||||
cause: &store.RateLimitExceededError{Metric: "concurrent", Retryable: true},
|
||||
}
|
||||
retryDecision := retryDecisionForCandidate(store.RuntimeModelCandidate{}, err)
|
||||
if retryDecision.Retry || retryDecision.Reason != "local_rate_limit_wait_queue" {
|
||||
t.Fatalf("local rate limit should not be same-client retry, got %+v", retryDecision)
|
||||
}
|
||||
|
||||
failoverDecision := failoverDecisionForCandidate(store.RunnerPolicy{
|
||||
Status: "active",
|
||||
FailoverPolicy: map[string]any{"enabled": true, "allowCategories": []any{"rate_limit"}},
|
||||
}, store.RuntimeModelCandidate{}, err)
|
||||
if failoverDecision.Retry || failoverDecision.Action != "queue" || failoverDecision.Reason != "local_rate_limit_wait_queue" {
|
||||
t.Fatalf("local rate limit should wait in queue without failover, got %+v", failoverDecision)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProviderAuthErrorsFailOverInsteadOfHardStop(t *testing.T) {
|
||||
runnerPolicy := store.RunnerPolicy{
|
||||
Status: "active",
|
||||
|
||||
@ -121,6 +121,7 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
|
||||
maxFailoverDuration := maxFailoverDurationForCandidates(candidates, runnerPolicy)
|
||||
attemptNo := task.AttemptCount
|
||||
var lastErr error
|
||||
candidatesLoop:
|
||||
for index, candidate := range candidates {
|
||||
if index >= maxPlatforms {
|
||||
break
|
||||
@ -128,9 +129,10 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
|
||||
clientAttempts := clientAttemptsForCandidate(candidate)
|
||||
var candidateErr error
|
||||
for clientAttempt := 1; clientAttempt <= clientAttempts; clientAttempt++ {
|
||||
attemptNo++
|
||||
response, err := s.runCandidate(ctx, task, user, body, candidate, attemptNo, onDelta)
|
||||
nextAttemptNo := attemptNo + 1
|
||||
response, err := s.runCandidate(ctx, task, user, body, candidate, nextAttemptNo, onDelta)
|
||||
if err == nil {
|
||||
attemptNo = nextAttemptNo
|
||||
billings := s.billings(ctx, user, task.Kind, body, candidate, response, isSimulation(task, candidate))
|
||||
record := buildSuccessRecord(task, user, body, candidate, response, billings, isSimulation(task, candidate))
|
||||
record.Metrics = s.withAttemptHistory(ctx, task.ID, record.Metrics)
|
||||
@ -174,6 +176,19 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
|
||||
}
|
||||
return Result{Task: finished, Output: response.Result}, nil
|
||||
}
|
||||
if isLocalRateLimitError(err) {
|
||||
lastErr = err
|
||||
candidateErr = err
|
||||
if task.AsyncMode && store.RateLimitRetryable(err) {
|
||||
queued, delay, queueErr := s.requeueRateLimitedTask(ctx, task, err)
|
||||
if queueErr != nil {
|
||||
return Result{}, queueErr
|
||||
}
|
||||
return Result{Task: queued, Output: queued.Result}, &TaskQueuedError{Delay: delay}
|
||||
}
|
||||
break candidatesLoop
|
||||
}
|
||||
attemptNo = nextAttemptNo
|
||||
lastErr = err
|
||||
candidateErr = err
|
||||
retryDecision := retryDecisionForCandidate(candidate, err)
|
||||
@ -297,9 +312,21 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
|
||||
|
||||
func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, attemptNo int, onDelta clients.StreamDelta) (clients.Response, error) {
|
||||
simulated := isSimulation(task, candidate)
|
||||
if err := s.emit(ctx, task.ID, "task.attempt.started", "running", "submitting", 0.25, "client attempt started", map[string]any{"attempt": attemptNo, "clientId": candidate.ClientID}, simulated); err != nil {
|
||||
return clients.Response{}, fmt.Errorf("emit attempt started: %w", err)
|
||||
reservations := s.rateLimitReservations(ctx, user, candidate, body)
|
||||
limitResult, err := s.store.ReserveRateLimits(ctx, task.ID, "", reservations)
|
||||
if err != nil {
|
||||
retryable := store.RateLimitRetryable(err)
|
||||
clientErr := &clients.ClientError{Code: "rate_limit", Message: err.Error(), Retryable: retryable}
|
||||
return clients.Response{}, &localRateLimitError{clientErr: clientErr, cause: err, retryAfter: localRateLimitRetryAfter(err)}
|
||||
}
|
||||
rateReservationsFinalized := false
|
||||
defer func() {
|
||||
if !rateReservationsFinalized {
|
||||
_ = s.store.ReleaseRateLimitReservations(context.WithoutCancel(ctx), limitResult.Reservations, "attempt_failed")
|
||||
}
|
||||
}()
|
||||
defer s.store.ReleaseConcurrencyLeases(context.WithoutCancel(ctx), limitResult.LeaseIDs)
|
||||
|
||||
attemptID, err := s.store.CreateTaskAttempt(ctx, store.CreateTaskAttemptInput{
|
||||
TaskID: task.ID,
|
||||
AttemptNo: attemptNo,
|
||||
@ -315,28 +342,21 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
if err != nil {
|
||||
return clients.Response{}, fmt.Errorf("create task attempt: %w", err)
|
||||
}
|
||||
reservations := s.rateLimitReservations(ctx, user, candidate, body)
|
||||
limitResult, err := s.store.ReserveRateLimits(ctx, task.ID, attemptID, reservations)
|
||||
if err != nil {
|
||||
retryable := store.RateLimitRetryable(err)
|
||||
clientErr := &clients.ClientError{Code: "rate_limit", Message: err.Error(), Retryable: retryable}
|
||||
if err := s.store.AttachRateLimitResultToAttempt(ctx, attemptID, limitResult); err != nil {
|
||||
clientErr := &clients.ClientError{Code: "runtime_error", Message: err.Error(), Retryable: false}
|
||||
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
|
||||
AttemptID: attemptID,
|
||||
Status: "failed",
|
||||
Retryable: retryable,
|
||||
Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": retryable, "retryAfterMs": localRateLimitRetryAfter(err).Milliseconds(), "trace": []any{failureTraceEntry(clientErr, retryable)}}),
|
||||
ErrorCode: "rate_limit",
|
||||
Retryable: false,
|
||||
Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false, "trace": []any{failureTraceEntry(clientErr, false)}}),
|
||||
ErrorCode: clients.ErrorCode(clientErr),
|
||||
ErrorMessage: err.Error(),
|
||||
})
|
||||
return clients.Response{}, &localRateLimitError{clientErr: clientErr, cause: err, retryAfter: localRateLimitRetryAfter(err)}
|
||||
return clients.Response{}, fmt.Errorf("attach rate limit reservations to attempt: %w", err)
|
||||
}
|
||||
if err := s.emit(ctx, task.ID, "task.attempt.started", "running", "submitting", 0.25, "client attempt started", map[string]any{"attempt": attemptNo, "clientId": candidate.ClientID}, simulated); err != nil {
|
||||
return clients.Response{}, fmt.Errorf("emit attempt started: %w", err)
|
||||
}
|
||||
rateReservationsFinalized := false
|
||||
defer func() {
|
||||
if !rateReservationsFinalized {
|
||||
_ = s.store.ReleaseRateLimitReservations(context.WithoutCancel(ctx), limitResult.Reservations, "attempt_failed")
|
||||
}
|
||||
}()
|
||||
defer s.store.ReleaseConcurrencyLeases(context.WithoutCancel(ctx), limitResult.LeaseIDs)
|
||||
|
||||
if err := s.store.RecordClientAssignment(ctx, candidate); err != nil {
|
||||
return clients.Response{}, fmt.Errorf("record client assignment: %w", err)
|
||||
|
||||
@ -230,6 +230,42 @@ WHERE id = $1::uuid AND released_at IS NULL`, leaseID); err != nil && !errors.Is
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) AttachRateLimitResultToAttempt(ctx context.Context, attemptID string, result RateLimitResult) error {
|
||||
if attemptID == "" || (len(result.Reservations) == 0 && len(result.LeaseIDs) == 0) {
|
||||
return nil
|
||||
}
|
||||
tx, err := s.pool.Begin(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback(ctx)
|
||||
|
||||
for _, reservation := range result.Reservations {
|
||||
if reservation.ReservationID == "" {
|
||||
continue
|
||||
}
|
||||
if _, err := tx.Exec(ctx, `
|
||||
UPDATE gateway_rate_limit_reservations
|
||||
SET attempt_id = $2::uuid,
|
||||
updated_at = now()
|
||||
WHERE id = $1::uuid`, reservation.ReservationID, attemptID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, leaseID := range result.LeaseIDs {
|
||||
if leaseID == "" {
|
||||
continue
|
||||
}
|
||||
if _, err := tx.Exec(ctx, `
|
||||
UPDATE gateway_concurrency_leases
|
||||
SET attempt_id = $2::uuid
|
||||
WHERE id = $1::uuid`, leaseID, attemptID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return tx.Commit(ctx)
|
||||
}
|
||||
|
||||
func (s *Store) RecoverInterruptedRuntimeState(ctx context.Context) (RuntimeRecoveryResult, error) {
|
||||
tx, err := s.pool.Begin(ctx)
|
||||
if err != nil {
|
||||
|
||||
@ -985,6 +985,7 @@ function taskAttemptTraceReasonLabel(reason: string) {
|
||||
client_retryable: '客户端标记可重试',
|
||||
client_non_retryable: '客户端标记不可重试',
|
||||
same_client_max_attempts: '达到本平台最大尝试次数',
|
||||
local_rate_limit_wait_queue: '本地限流排队等待',
|
||||
failover_time_budget_exceeded: '超过全局切换时间预算',
|
||||
runner_policy_disabled: '全局调度策略停用',
|
||||
hard_stop_policy: '命中硬拒绝规则',
|
||||
|
||||
Loading…
Reference in New Issue
Block a user