From 2685450f3ee9b2598407d2420da9e00a56e6fd24 Mon Sep 17 00:00:00 2001 From: wangbo Date: Wed, 13 May 2026 08:37:39 +0800 Subject: [PATCH] Restrict priority demotion to same-model peers --- .../httpapi/core_flow_integration_test.go | 40 +++++++++++++++++ apps/api/internal/runner/runtime_policy.go | 4 +- apps/api/internal/runner/service.go | 4 +- apps/api/internal/store/runtime_policies.go | 44 +++++++++++++++---- .../src/pages/admin/RuntimePoliciesPanel.tsx | 4 +- 5 files changed, 82 insertions(+), 14 deletions(-) diff --git a/apps/api/internal/httpapi/core_flow_integration_test.go b/apps/api/internal/httpapi/core_flow_integration_test.go index 48659b9..5ee452c 100644 --- a/apps/api/internal/httpapi/core_flow_integration_test.go +++ b/apps/api/internal/httpapi/core_flow_integration_test.go @@ -944,6 +944,26 @@ WHERE reference_type = 'gateway_task' "credentials": map[string]any{"mode": "simulation"}, "priority": 20, }, http.StatusCreated, &successPlatform) + var unrelatedPriorityPlatform struct { + ID string `json:"id"` + } + doJSON(t, server.URL, http.MethodPost, "/api/admin/platforms", loginResponse.AccessToken, map[string]any{ + "provider": "openai", + "platformKey": "openai-unrelated-priority-" + suffixText, + "name": "OpenAI Unrelated Priority", + "baseUrl": "https://api.openai.com/v1", + "authType": "bearer", + "credentials": map[string]any{"mode": "simulation"}, + "priority": 900, + }, http.StatusCreated, &unrelatedPriorityPlatform) + var unrelatedPriorityPlatformModel map[string]any + doJSON(t, server.URL, http.MethodPost, "/api/admin/platforms/"+unrelatedPriorityPlatform.ID+"/models", loginResponse.AccessToken, map[string]any{ + "canonicalModelKey": "openai:gpt-4o-mini", + "modelName": "priority-demote-unrelated-" + suffixText, + "modelAlias": "priority-demote-unrelated-" + suffixText, + "modelType": []string{"text_generate"}, + "displayName": "Unrelated Priority", + }, http.StatusCreated, &unrelatedPriorityPlatformModel) for _, platformID := range []string{failedPlatform.ID, successPlatform.ID} { var platformModel map[string]any doJSON(t, server.URL, http.MethodPost, "/api/admin/platforms/"+platformID+"/models", loginResponse.AccessToken, map[string]any{ @@ -1000,6 +1020,26 @@ WHERE reference_type = 'gateway_task' if summary, ok := failoverDetail.Metrics["attempts"].([]any); !ok || len(summary) != 3 { t.Fatalf("task metrics should keep attempt-chain summary, got %+v", failoverDetail.Metrics) } + var demotedDynamicPriority int + var successEffectivePriority int + var unrelatedEffectivePriority int + if err := testPool.QueryRow(ctx, ` +SELECT COALESCE(failed.dynamic_priority, -1), + COALESCE(success.dynamic_priority, success.priority), + COALESCE(unrelated.dynamic_priority, unrelated.priority) +FROM integration_platforms failed +JOIN integration_platforms success ON success.id = $2::uuid +JOIN integration_platforms unrelated ON unrelated.id = $3::uuid +WHERE failed.id = $1::uuid`, failedPlatform.ID, successPlatform.ID, unrelatedPriorityPlatform.ID).Scan(&demotedDynamicPriority, &successEffectivePriority, &unrelatedEffectivePriority); err != nil { + t.Fatalf("read priority demotion state: %v", err) + } + expectedDemotedPriority := successEffectivePriority + 1 + if unrelatedEffectivePriority <= expectedDemotedPriority { + t.Fatalf("unrelated priority fixture should sit beyond same-model peers: unrelated=%d expected=%d", unrelatedEffectivePriority, expectedDemotedPriority) + } + if demotedDynamicPriority != expectedDemotedPriority { + t.Fatalf("priority demotion should use only same-model peer clients, got dynamic_priority=%d want %d", demotedDynamicPriority, expectedDemotedPriority) + } var degradePolicySet struct { ID string `json:"id"` diff --git a/apps/api/internal/runner/runtime_policy.go b/apps/api/internal/runner/runtime_policy.go index 0fb2d59..f9ca501 100644 --- a/apps/api/internal/runner/runtime_policy.go +++ b/apps/api/internal/runner/runtime_policy.go @@ -65,7 +65,7 @@ func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candid } } -func (s *Service) applyPriorityDemotePolicy(ctx context.Context, taskID string, attemptNo int, runnerPolicy store.RunnerPolicy, candidate store.RuntimeModelCandidate, cause error, simulated bool) { +func (s *Service) applyPriorityDemotePolicy(ctx context.Context, taskID string, attemptNo int, runnerPolicy store.RunnerPolicy, candidate store.RuntimeModelCandidate, requestedModel string, cause error, simulated bool) { if errors.Is(cause, store.ErrRateLimited) { return } @@ -73,7 +73,7 @@ func (s *Service) applyPriorityDemotePolicy(ctx context.Context, taskID string, if !decision.Demote { return } - if dynamicPriority, err := s.store.DemoteCandidatePlatformPriority(ctx, candidate.PlatformID); err == nil { + if dynamicPriority, err := s.store.DemoteCandidatePlatformPriority(ctx, candidate.PlatformID, candidate.PlatformModelID, requestedModel, candidate.ModelType); err == nil { s.recordAttemptTrace(ctx, taskID, attemptNo, priorityDemoteTraceEntry(decision, candidate.PlatformID, candidate.PlatformModelID, dynamicPriority)) _ = s.emit(ctx, taskID, "task.policy.priority_demoted", "running", "priority_demote", 0.52, "candidate platform priority demoted by runner policy", addPolicyTracePayload(map[string]any{ "platformId": candidate.PlatformID, diff --git a/apps/api/internal/runner/service.go b/apps/api/internal/runner/service.go index 01cf582..ade0144 100644 --- a/apps/api/internal/runner/service.go +++ b/apps/api/internal/runner/service.go @@ -258,7 +258,7 @@ candidatesLoop: } if candidateErr == nil || index+1 >= len(candidates) || index+1 >= maxPlatforms { if candidateErr != nil { - s.applyPriorityDemotePolicy(ctx, task.ID, attemptNo, runnerPolicy, candidate, candidateErr, isSimulation(task, candidate)) + s.applyPriorityDemotePolicy(ctx, task.ID, attemptNo, runnerPolicy, candidate, task.Model, candidateErr, isSimulation(task, candidate)) decision := failoverDecisionForCandidate(runnerPolicy, candidate, candidateErr) if decision.Retry { decision.Retry = false @@ -274,7 +274,7 @@ candidatesLoop: } break } - s.applyPriorityDemotePolicy(ctx, task.ID, attemptNo, runnerPolicy, candidate, candidateErr, isSimulation(task, candidate)) + s.applyPriorityDemotePolicy(ctx, task.ID, attemptNo, runnerPolicy, candidate, task.Model, candidateErr, isSimulation(task, candidate)) if failoverTimeBudgetExceeded(executeStartedAt, maxFailoverDuration) { elapsedSeconds := int(time.Since(executeStartedAt).Seconds()) maxDurationSeconds := int(maxFailoverDuration.Seconds()) diff --git a/apps/api/internal/store/runtime_policies.go b/apps/api/internal/store/runtime_policies.go index ddd0c7a..6ff7ceb 100644 --- a/apps/api/internal/store/runtime_policies.go +++ b/apps/api/internal/store/runtime_policies.go @@ -159,21 +159,49 @@ WHERE id = $1::uuid`, platformModelID, cooldownSeconds) return err } -func (s *Store) DemoteCandidatePlatformPriority(ctx context.Context, platformID string) (int, error) { - if strings.TrimSpace(platformID) == "" { +func (s *Store) DemoteCandidatePlatformPriority(ctx context.Context, platformID string, platformModelID string, requestedModel string, modelType string) (int, error) { + if strings.TrimSpace(platformID) == "" || strings.TrimSpace(platformModelID) == "" || strings.TrimSpace(requestedModel) == "" { return 0, nil } var dynamicPriority int err := s.pool.QueryRow(ctx, ` + WITH current_model AS ( + SELECT id, platform_id + FROM platform_models + WHERE id = $2::uuid + AND platform_id = $1::uuid + ), + peer_priority AS ( + SELECT MAX(COALESCE(peer.dynamic_priority, peer.priority)) AS max_priority + FROM current_model current + JOIN platform_models peer_model ON peer_model.platform_id <> current.platform_id + JOIN integration_platforms peer ON peer.id = peer_model.platform_id + LEFT JOIN base_model_catalog peer_base ON peer_base.id = peer_model.base_model_id + WHERE peer.status = 'enabled' + AND peer.deleted_at IS NULL + AND (peer.cooldown_until IS NULL OR peer.cooldown_until <= now()) + AND peer_model.enabled = true + AND (peer_model.cooldown_until IS NULL OR peer_model.cooldown_until <= now()) + AND (NULLIF($4::text, '') IS NULL OR peer_model.model_type @> jsonb_build_array($4::text)) + AND ( + (COALESCE(peer_model.model_alias, '') <> '' AND peer_model.model_alias = $3::text) + OR ( + COALESCE(peer_model.model_alias, '') = '' + AND ( + peer_model.model_name = $3::text + OR peer_base.canonical_model_key = $3::text + OR peer_base.provider_model_name = $3::text + ) + ) + ) + ) UPDATE integration_platforms target - SET dynamic_priority = COALESCE(( - SELECT MAX(COALESCE(peer.dynamic_priority, peer.priority)) - FROM integration_platforms peer - WHERE peer.deleted_at IS NULL - ), target.priority) + 1, + SET dynamic_priority = COALESCE((SELECT max_priority FROM peer_priority), target.priority) + 1, updated_at = now() WHERE target.id = $1::uuid - RETURNING dynamic_priority`, platformID).Scan(&dynamicPriority) + AND target.deleted_at IS NULL + AND EXISTS (SELECT 1 FROM current_model) + RETURNING dynamic_priority`, platformID, platformModelID, requestedModel, modelType).Scan(&dynamicPriority) return dynamicPriority, err } diff --git a/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx b/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx index 48dcb4d..e950835 100644 --- a/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx +++ b/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx @@ -349,7 +349,7 @@ function RunnerPolicyEditor(props: { { value: 'priorityDemote' as const, title: '优先级降级', - description: '命中后将失败平台的动态优先级调整到当前最后', + description: '命中后将失败平台的动态优先级调整到当前模型队尾', enabled: props.form.priorityDemoteEnabled, icon: , }, @@ -448,7 +448,7 @@ function RunnerPolicyEditor(props: { {activeStrategy === 'priorityDemote' && (
patch({ priorityDemoteEnabled: checked })} /> - 命中降级规则后,失败平台会自动调整到当前所有平台的优先级队尾。 + 命中降级规则后,失败平台会自动调整到提供当前模型的其它客户端优先级队尾。 patch({ priorityDemoteCategories: value })} /> patch({ priorityDemoteCodes: value })} /> patch({ priorityDemoteStatusCodes: value })} />