diff --git a/apps/api/docs/swagger.json b/apps/api/docs/swagger.json index adf0469..36237fa 100644 --- a/apps/api/docs/swagger.json +++ b/apps/api/docs/swagger.json @@ -11157,6 +11157,10 @@ "type": "object", "additionalProperties": {} }, + "singleSourcePolicy": { + "type": "object", + "additionalProperties": {} + }, "status": { "type": "string" }, @@ -11193,6 +11197,10 @@ "type": "object", "additionalProperties": {} }, + "singleSourcePolicy": { + "type": "object", + "additionalProperties": {} + }, "status": { "type": "string" } diff --git a/apps/api/docs/swagger.yaml b/apps/api/docs/swagger.yaml index 709ee8a..4a1dbc1 100644 --- a/apps/api/docs/swagger.yaml +++ b/apps/api/docs/swagger.yaml @@ -2108,6 +2108,9 @@ definitions: priorityDemotePolicy: additionalProperties: {} type: object + singleSourcePolicy: + additionalProperties: {} + type: object status: type: string updatedAt: @@ -2133,6 +2136,9 @@ definitions: priorityDemotePolicy: additionalProperties: {} type: object + singleSourcePolicy: + additionalProperties: {} + type: object status: type: string type: object diff --git a/apps/api/internal/httpapi/core_flow_integration_test.go b/apps/api/internal/httpapi/core_flow_integration_test.go index c9e9fce..1981e56 100644 --- a/apps/api/internal/httpapi/core_flow_integration_test.go +++ b/apps/api/internal/httpapi/core_flow_integration_test.go @@ -1282,6 +1282,41 @@ WHERE m.platform_id = $1::uuid if err := testPool.QueryRow(ctx, `SELECT status FROM integration_platforms WHERE id = $1::uuid`, invalidKeyPlatform.ID).Scan(&invalidKeyPlatformStatus); err != nil { t.Fatalf("read invalid key platform status: %v", err) } + if invalidKeyPlatformStatus != "enabled" { + t.Fatalf("single source protection should keep the only source enabled, got %q", invalidKeyPlatformStatus) + } + var validKeyPlatform struct { + ID string `json:"id"` + } + doJSON(t, server.URL, http.MethodPost, "/api/admin/platforms", loginResponse.AccessToken, map[string]any{ + "provider": "openai", + "platformKey": "openai-auto-disable-success-" + suffixText, + "name": "OpenAI Auto Disable Success", + "baseUrl": "https://api.openai.com/v1", + "authType": "bearer", + "credentials": map[string]any{"mode": "simulation"}, + "priority": 60, + }, http.StatusCreated, &validKeyPlatform) + doJSON(t, server.URL, http.MethodPost, "/api/admin/platforms/"+validKeyPlatform.ID+"/models", loginResponse.AccessToken, map[string]any{ + "canonicalModelKey": "openai:gpt-4o-mini", + "modelName": autoDisableModel, + "modelAlias": autoDisableModel, + "modelType": []string{"text_generate"}, + "displayName": "Auto Disable Smoke Success", + }, http.StatusCreated, nil) + doAPIV1ChatCompletionAndLoadTask(t, ctx, testPool, server.URL, apiKeyResponse.Secret, map[string]any{ + "model": autoDisableModel, + "runMode": "simulation", + "simulation": true, + "simulationDurationMs": 5, + "messages": []map[string]any{{"role": "user", "content": "disable with fallback please"}}, + }, "auto-disable-with-fallback-chat-"+suffixText, http.StatusOK, nil, &autoDisableTask.Task) + if autoDisableTask.Task.Status != "succeeded" { + t.Fatalf("auto disable task should fail over after disabling the bad source: %+v", autoDisableTask.Task) + } + if err := testPool.QueryRow(ctx, `SELECT status FROM integration_platforms WHERE id = $1::uuid`, invalidKeyPlatform.ID).Scan(&invalidKeyPlatformStatus); err != nil { + t.Fatalf("read invalid key platform status after fallback: %v", err) + } if invalidKeyPlatformStatus != "disabled" { t.Fatalf("auto disable policy should disable platform, got %q", invalidKeyPlatformStatus) } diff --git a/apps/api/internal/runner/retry_decision_test.go b/apps/api/internal/runner/retry_decision_test.go index 22db362..0a2e493 100644 --- a/apps/api/internal/runner/retry_decision_test.go +++ b/apps/api/internal/runner/retry_decision_test.go @@ -37,6 +37,40 @@ func TestFailoverBudgetDefaults(t *testing.T) { } } +func TestSingleSourceProtectionDefaultsOn(t *testing.T) { + runnerPolicy := store.RunnerPolicy{Status: "active"} + + if !singleSourceProtectionActive([]store.RuntimeModelCandidate{{PlatformID: "only-source"}}, 99, runnerPolicy) { + t.Fatal("single source protection should default to enabled") + } +} + +func TestSingleSourceProtectionCanBeDisabled(t *testing.T) { + runnerPolicy := store.RunnerPolicy{ + Status: "active", + SingleSourcePolicy: map[string]any{"enabled": false}, + } + + if singleSourceProtectionActive([]store.RuntimeModelCandidate{{PlatformID: "only-source"}}, 99, runnerPolicy) { + t.Fatal("single source protection should respect the global toggle") + } +} + +func TestSingleSourceProtectionUsesEffectivePlatformBudget(t *testing.T) { + candidates := []store.RuntimeModelCandidate{ + {PlatformID: "first-source"}, + {PlatformID: "second-source"}, + } + runnerPolicy := store.RunnerPolicy{Status: "active"} + + if !singleSourceProtectionActive(candidates, 1, runnerPolicy) { + t.Fatal("maxPlatforms=1 should make this execution a single-source run") + } + if singleSourceProtectionActive(candidates, 2, runnerPolicy) { + t.Fatal("multiple effective sources should allow disable and cooldown actions") + } +} + func TestFailoverTimeBudgetExceeded(t *testing.T) { if !failoverTimeBudgetExceeded(time.Now().Add(-601*time.Second), 10*time.Minute) { t.Fatal("failover time budget should stop retries after the configured duration") diff --git a/apps/api/internal/runner/runtime_policy.go b/apps/api/internal/runner/runtime_policy.go index f9ca501..a80375c 100644 --- a/apps/api/internal/runner/runtime_policy.go +++ b/apps/api/internal/runner/runtime_policy.go @@ -9,7 +9,7 @@ import ( "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" ) -func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, cause error, simulated bool) { +func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, cause error, simulated bool, singleSourceProtected bool) { code := clients.ErrorCode(cause) message := "" if cause != nil { @@ -18,7 +18,9 @@ func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID stri autoDisablePolicy := effectiveRuntimePolicy(candidate.AutoDisablePolicy, candidate.RuntimePolicyOverride, "autoDisablePolicy") if failurePolicyMatches(autoDisablePolicy, code, message) && intFromPolicy(autoDisablePolicy, "threshold") <= 1 { - if err := s.store.DisableCandidatePlatform(ctx, candidate.PlatformID); err == nil { + if singleSourceProtected { + s.emitSingleSourceProtected(ctx, taskID, candidate, "auto_disable", code, simulated) + } else if err := s.store.DisableCandidatePlatform(ctx, candidate.PlatformID); err == nil { _ = s.emit(ctx, taskID, "task.policy.auto_disabled", "running", "auto_disable", 0.48, "candidate platform disabled by failure policy", map[string]any{ "platformId": candidate.PlatformID, "platformModelId": candidate.PlatformModelID, @@ -30,7 +32,9 @@ func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID stri degradePolicy := effectiveRuntimePolicy(candidate.DegradePolicy, candidate.RuntimePolicyOverride, "degradePolicy") if failurePolicyMatches(degradePolicy, code, message) { cooldownSeconds := intFromPolicy(degradePolicy, "cooldownSeconds") - if err := s.store.CooldownCandidatePlatformModel(ctx, candidate.PlatformModelID, cooldownSeconds); err == nil { + if singleSourceProtected { + s.emitSingleSourceProtected(ctx, taskID, candidate, "degrade_cooldown", code, simulated) + } else if err := s.store.CooldownCandidatePlatformModel(ctx, candidate.PlatformModelID, cooldownSeconds); err == nil { _ = s.emit(ctx, taskID, "task.policy.degraded", "running", "degrade", 0.5, "candidate model cooled down by failure policy", map[string]any{ "platformId": candidate.PlatformID, "platformModelId": candidate.PlatformModelID, @@ -41,9 +45,13 @@ func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID stri } } -func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, decision failoverDecision, simulated bool) { +func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, decision failoverDecision, simulated bool, singleSourceProtected bool) { switch decision.Action { case "disable_and_next": + if singleSourceProtected { + s.emitSingleSourceProtected(ctx, taskID, candidate, decision.Action, decision.Info.Code, simulated) + return + } if err := s.store.DisableCandidatePlatform(ctx, candidate.PlatformID); err == nil { _ = s.emit(ctx, taskID, "task.policy.failover_disabled", "running", "failover", 0.51, "candidate platform disabled by runner failover policy", addPolicyTracePayload(map[string]any{ "platformId": candidate.PlatformID, @@ -53,6 +61,10 @@ func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candid }, decision.Match, decision.Info), simulated) } case "cooldown_and_next": + if singleSourceProtected { + s.emitSingleSourceProtected(ctx, taskID, candidate, decision.Action, decision.Info.Code, simulated) + return + } if err := s.store.CooldownCandidatePlatformModel(ctx, candidate.PlatformModelID, decision.CooldownSeconds); err == nil { _ = s.emit(ctx, taskID, "task.policy.failover_cooled_down", "running", "failover", 0.51, "candidate model cooled down by runner failover policy", addPolicyTracePayload(map[string]any{ "platformId": candidate.PlatformID, @@ -65,6 +77,15 @@ func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candid } } +func (s *Service) emitSingleSourceProtected(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, action string, code string, simulated bool) { + _ = s.emit(ctx, taskID, "task.policy.single_source_protected", "running", "policy_guard", 0.5, "single source protected from disable or cooldown", map[string]any{ + "platformId": candidate.PlatformID, + "platformModelId": candidate.PlatformModelID, + "action": action, + "code": code, + }, simulated) +} + func (s *Service) applyPriorityDemotePolicy(ctx context.Context, taskID string, attemptNo int, runnerPolicy store.RunnerPolicy, candidate store.RuntimeModelCandidate, requestedModel string, cause error, simulated bool) { if errors.Is(cause, store.ErrRateLimited) { return diff --git a/apps/api/internal/runner/service.go b/apps/api/internal/runner/service.go index d446560..ebe281f 100644 --- a/apps/api/internal/runner/service.go +++ b/apps/api/internal/runner/service.go @@ -240,6 +240,7 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut } maxPlatforms := maxPlatformsForCandidates(candidates, runnerPolicy) maxFailoverDuration := maxFailoverDurationForCandidates(candidates, runnerPolicy) + singleSourceProtected := singleSourceProtectionActive(candidates, maxPlatforms, runnerPolicy) var lastErr error var lastCandidate store.RuntimeModelCandidate var lastPreprocessing *parameterPreprocessingLog @@ -275,7 +276,7 @@ candidatesLoop: break candidatesLoop } candidateBody := preprocessing.Body - response, err := s.runCandidate(ctx, task, user, candidateBody, preprocessing.Log, candidate, nextAttemptNo, onDelta) + response, err := s.runCandidate(ctx, task, user, candidateBody, preprocessing.Log, candidate, nextAttemptNo, onDelta, singleSourceProtected) if err == nil { attemptNo = nextAttemptNo billings := s.billings(ctx, user, task.Kind, candidateBody, candidate, response, isSimulation(task, candidate)) @@ -445,7 +446,7 @@ candidatesLoop: if !decision.Retry { break } - s.applyFailoverAction(ctx, task.ID, candidate, decision, isSimulation(task, candidate)) + s.applyFailoverAction(ctx, task.ID, candidate, decision, isSimulation(task, candidate), singleSourceProtected) if err := s.emit(ctx, task.ID, "task.retrying", "running", "retry", 0.55, "retrying next client", addPolicyTracePayload(map[string]any{ "attempt": attemptNo, "action": decision.Action, @@ -486,7 +487,7 @@ candidatesLoop: return Result{Task: failed, Output: failed.Result}, lastErr } -func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, preprocessing parameterPreprocessingLog, candidate store.RuntimeModelCandidate, attemptNo int, onDelta clients.StreamDelta) (clients.Response, error) { +func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, preprocessing parameterPreprocessingLog, candidate store.RuntimeModelCandidate, attemptNo int, onDelta clients.StreamDelta, singleSourceProtected bool) (clients.Response, error) { simulated := isSimulation(task, candidate) baseAttemptMetrics := mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), parameterPreprocessingMetrics(preprocessing)) reservations := s.rateLimitReservations(ctx, user, candidate, body) @@ -638,7 +639,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user ErrorMessage: err.Error(), }) _ = s.emit(ctx, task.ID, "task.attempt.failed", "running", "attempt_failed", 0.45, err.Error(), map[string]any{"attempt": attemptNo, "retryable": retryable, "requestId": requestID, "statusCode": clients.ErrorResponseMetadata(err).StatusCode, "metrics": metrics}, simulated) - s.applyCandidateFailurePolicies(ctx, task.ID, candidate, err, simulated) + s.applyCandidateFailurePolicies(ctx, task.ID, candidate, err, simulated, singleSourceProtected) return clients.Response{}, err } uploadedResult, err := s.uploadGeneratedAssets(ctx, task.ID, task.Kind, response.Result) @@ -1099,6 +1100,42 @@ func maxPlatformsForCandidates(candidates []store.RuntimeModelCandidate, runnerP return maxPlatforms } +func singleSourceProtectionActive(candidates []store.RuntimeModelCandidate, maxPlatforms int, runnerPolicy store.RunnerPolicy) bool { + if !runnerSingleSourceProtectionEnabled(runnerPolicy) { + return false + } + return runtimeCandidateSourceCount(candidates, maxPlatforms) <= 1 +} + +func runnerSingleSourceProtectionEnabled(runnerPolicy store.RunnerPolicy) bool { + if enabled, ok := runnerPolicy.SingleSourcePolicy["enabled"].(bool); ok { + return enabled + } + return true +} + +func runtimeCandidateSourceCount(candidates []store.RuntimeModelCandidate, maxPlatforms int) int { + limit := len(candidates) + if maxPlatforms > 0 && maxPlatforms < limit { + limit = maxPlatforms + } + sources := make(map[string]bool, limit) + for index := 0; index < limit; index++ { + key := strings.TrimSpace(candidates[index].PlatformID) + if key == "" { + key = strings.TrimSpace(candidates[index].PlatformModelID) + } + if key == "" { + key = strings.TrimSpace(candidates[index].ClientID) + } + if key == "" { + key = strconv.Itoa(index) + } + sources[key] = true + } + return len(sources) +} + func maxFailoverDurationForCandidates(candidates []store.RuntimeModelCandidate, runnerPolicy store.RunnerPolicy) time.Duration { seconds := intFromPolicy(runnerPolicy.FailoverPolicy, "maxDurationSeconds") if seconds <= 0 { diff --git a/apps/api/internal/store/postgres.go b/apps/api/internal/store/postgres.go index 0d8a61c..1b13eaa 100644 --- a/apps/api/internal/store/postgres.go +++ b/apps/api/internal/store/postgres.go @@ -277,6 +277,7 @@ type RunnerPolicy struct { FailoverPolicy map[string]any `json:"failoverPolicy,omitempty"` HardStopPolicy map[string]any `json:"hardStopPolicy,omitempty"` PriorityDemotePolicy map[string]any `json:"priorityDemotePolicy,omitempty"` + SingleSourcePolicy map[string]any `json:"singleSourcePolicy,omitempty"` Metadata map[string]any `json:"metadata,omitempty"` Status string `json:"status"` CreatedAt time.Time `json:"createdAt"` diff --git a/apps/api/internal/store/runner_policies.go b/apps/api/internal/store/runner_policies.go index 2ea4fca..57e73fd 100644 --- a/apps/api/internal/store/runner_policies.go +++ b/apps/api/internal/store/runner_policies.go @@ -11,7 +11,7 @@ import ( const runnerPolicyColumns = ` id::text, policy_key, name, COALESCE(description, ''), failover_policy, -hard_stop_policy, priority_demote_policy, metadata, status, created_at, updated_at` +hard_stop_policy, priority_demote_policy, single_source_policy, metadata, status, created_at, updated_at` type RunnerPolicyInput struct { PolicyKey string `json:"policyKey"` @@ -20,6 +20,7 @@ type RunnerPolicyInput struct { FailoverPolicy map[string]any `json:"failoverPolicy"` HardStopPolicy map[string]any `json:"hardStopPolicy"` PriorityDemotePolicy map[string]any `json:"priorityDemotePolicy"` + SingleSourcePolicy map[string]any `json:"singleSourcePolicy"` Metadata map[string]any `json:"metadata"` Status string `json:"status"` } @@ -50,23 +51,25 @@ func (s *Store) UpsertDefaultRunnerPolicy(ctx context.Context, input RunnerPolic failoverPolicy, _ := json.Marshal(emptyObjectIfNil(input.FailoverPolicy)) hardStopPolicy, _ := json.Marshal(emptyObjectIfNil(input.HardStopPolicy)) priorityDemotePolicy, _ := json.Marshal(emptyObjectIfNil(input.PriorityDemotePolicy)) + singleSourcePolicy, _ := json.Marshal(emptyObjectIfNil(input.SingleSourcePolicy)) metadata, _ := json.Marshal(emptyObjectIfNil(input.Metadata)) return scanRunnerPolicy(s.pool.QueryRow(ctx, ` INSERT INTO gateway_runner_policies ( - policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, metadata, status + policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, single_source_policy, metadata, status ) -VALUES ($1, $2, NULLIF($3, ''), $4, $5, $6, $7, $8) +VALUES ($1, $2, NULLIF($3, ''), $4, $5, $6, $7, $8, $9) ON CONFLICT (policy_key) DO UPDATE SET name = EXCLUDED.name, description = EXCLUDED.description, failover_policy = EXCLUDED.failover_policy, hard_stop_policy = EXCLUDED.hard_stop_policy, priority_demote_policy = EXCLUDED.priority_demote_policy, + single_source_policy = EXCLUDED.single_source_policy, metadata = EXCLUDED.metadata, status = EXCLUDED.status, updated_at = now() RETURNING `+runnerPolicyColumns, - input.PolicyKey, input.Name, input.Description, failoverPolicy, hardStopPolicy, priorityDemotePolicy, metadata, input.Status, + input.PolicyKey, input.Name, input.Description, failoverPolicy, hardStopPolicy, priorityDemotePolicy, singleSourcePolicy, metadata, input.Status, )) } @@ -75,6 +78,7 @@ func scanRunnerPolicy(scanner runnerPolicyScanner) (RunnerPolicy, error) { var failoverPolicy []byte var hardStopPolicy []byte var priorityDemotePolicy []byte + var singleSourcePolicy []byte var metadata []byte if err := scanner.Scan( &item.ID, @@ -84,6 +88,7 @@ func scanRunnerPolicy(scanner runnerPolicyScanner) (RunnerPolicy, error) { &failoverPolicy, &hardStopPolicy, &priorityDemotePolicy, + &singleSourcePolicy, &metadata, &item.Status, &item.CreatedAt, @@ -94,6 +99,7 @@ func scanRunnerPolicy(scanner runnerPolicyScanner) (RunnerPolicy, error) { item.FailoverPolicy = decodeObject(failoverPolicy) item.HardStopPolicy = decodeObject(hardStopPolicy) item.PriorityDemotePolicy = decodeObject(priorityDemotePolicy) + item.SingleSourcePolicy = decodeObject(singleSourcePolicy) item.Metadata = decodeObject(metadata) return item, nil } @@ -124,6 +130,7 @@ func defaultRunnerPolicy() RunnerPolicy { FailoverPolicy: defaultRunnerFailoverPolicy(), HardStopPolicy: defaultRunnerHardStopPolicy(), PriorityDemotePolicy: defaultRunnerPriorityDemotePolicy(), + SingleSourcePolicy: defaultRunnerSingleSourcePolicy(), Metadata: map[string]any{"source": "code-default"}, Status: "active", CreatedAt: now, @@ -141,6 +148,12 @@ func defaultRunnerPriorityDemotePolicy() map[string]any { } } +func defaultRunnerSingleSourcePolicy() map[string]any { + return map[string]any{ + "enabled": true, + } +} + func defaultRunnerFailoverPolicy() map[string]any { return map[string]any{ "enabled": true, diff --git a/apps/api/migrations/0026_runner_policies.sql b/apps/api/migrations/0026_runner_policies.sql index 3be3623..4f1a009 100644 --- a/apps/api/migrations/0026_runner_policies.sql +++ b/apps/api/migrations/0026_runner_policies.sql @@ -6,6 +6,7 @@ CREATE TABLE IF NOT EXISTS gateway_runner_policies ( failover_policy jsonb NOT NULL DEFAULT '{}'::jsonb, hard_stop_policy jsonb NOT NULL DEFAULT '{}'::jsonb, priority_demote_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + single_source_policy jsonb NOT NULL DEFAULT '{"enabled":true}'::jsonb, metadata jsonb NOT NULL DEFAULT '{}'::jsonb, status text NOT NULL DEFAULT 'active', created_at timestamptz NOT NULL DEFAULT now(), @@ -15,8 +16,11 @@ CREATE TABLE IF NOT EXISTS gateway_runner_policies ( ALTER TABLE IF EXISTS gateway_runner_policies ADD COLUMN IF NOT EXISTS priority_demote_policy jsonb NOT NULL DEFAULT '{}'::jsonb; +ALTER TABLE IF EXISTS gateway_runner_policies + ADD COLUMN IF NOT EXISTS single_source_policy jsonb NOT NULL DEFAULT '{"enabled":true}'::jsonb; + INSERT INTO gateway_runner_policies ( - policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, metadata, status + policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, single_source_policy, metadata, status ) VALUES ( 'default-runner-v1', @@ -54,6 +58,7 @@ VALUES ( "statusCodes": [408, 429, 500, 502, 503, 504], "keywords": ["timeout", "network", "rate_limit", "overloaded", "temporarily_unavailable", "server_error", "429", "5xx"] }'::jsonb, + '{"enabled": true}'::jsonb, '{"seed":"0026_runner_policies"}'::jsonb, 'active' ) @@ -63,6 +68,7 @@ SET name = EXCLUDED.name, failover_policy = gateway_runner_policies.failover_policy || EXCLUDED.failover_policy, hard_stop_policy = gateway_runner_policies.hard_stop_policy || EXCLUDED.hard_stop_policy, priority_demote_policy = gateway_runner_policies.priority_demote_policy || EXCLUDED.priority_demote_policy, + single_source_policy = gateway_runner_policies.single_source_policy || EXCLUDED.single_source_policy, metadata = gateway_runner_policies.metadata || EXCLUDED.metadata, updated_at = now(); diff --git a/apps/api/migrations/0050_runner_single_source_policy.sql b/apps/api/migrations/0050_runner_single_source_policy.sql new file mode 100644 index 0000000..00bd0bc --- /dev/null +++ b/apps/api/migrations/0050_runner_single_source_policy.sql @@ -0,0 +1,9 @@ +ALTER TABLE IF EXISTS gateway_runner_policies + ADD COLUMN IF NOT EXISTS single_source_policy jsonb NOT NULL DEFAULT '{"enabled":true}'::jsonb; + +UPDATE gateway_runner_policies +SET single_source_policy = COALESCE(single_source_policy, '{}'::jsonb) || '{"enabled":true}'::jsonb, + metadata = metadata || jsonb_build_object('singleSourcePolicy', '0050_runner_single_source_policy'), + updated_at = now() +WHERE COALESCE(single_source_policy, '{}'::jsonb) = '{}'::jsonb + OR NOT (COALESCE(single_source_policy, '{}'::jsonb) ? 'enabled'); diff --git a/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx b/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx index 3724bc4..7fca483 100644 --- a/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx +++ b/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx @@ -6,7 +6,7 @@ import { Badge, Button, Card, CardContent, CardHeader, CardTitle, ConfirmDialog, import type { LoadState } from '../../types'; type RuntimePanelTab = 'model' | 'runner'; -type RunnerPolicyStrategy = 'failover' | 'hardStop' | 'priorityDemote'; +type RunnerPolicyStrategy = 'failover' | 'hardStop' | 'priorityDemote' | 'singleSource'; type RuntimePolicyForm = { policyKey: string; @@ -54,6 +54,7 @@ type RunnerPolicyForm = { priorityDemoteCodes: string[]; priorityDemoteStatusCodes: string[]; priorityDemoteKeywords: string[]; + singleSourceProtectionEnabled: boolean; metadataJson: string; status: string; }; @@ -353,6 +354,13 @@ function RunnerPolicyEditor(props: { enabled: props.form.priorityDemoteEnabled, icon: , }, + { + value: 'singleSource' as const, + title: '单一源保护', + description: '只有一个可用源时跳过自动禁用和冷却', + enabled: props.form.singleSourceProtectionEnabled, + icon: , + }, ]; const activeDefinition = strategyDefinitions.find((item) => item.value === activeStrategy) ?? strategyDefinitions[0]; @@ -455,6 +463,17 @@ function RunnerPolicyEditor(props: { patch({ priorityDemoteKeywords: value })} /> )} + + {activeStrategy === 'singleSource' && ( +
+ patch({ singleSourceProtectionEnabled: checked })} + /> + 本次调度只有一个可用源时,命中自动禁用、运行策略冷却或故障切换冷却动作都只记录保护事件,不修改平台或模型状态。 +
+ )} @@ -554,6 +573,7 @@ function runnerPolicyToForm(policy: GatewayRunnerPolicy | null): RunnerPolicyFor const failover = readObject(policy?.failoverPolicy); const hardStop = readObject(policy?.hardStopPolicy); const priorityDemote = readObject(policy?.priorityDemotePolicy); + const singleSource = readObject(policy?.singleSourcePolicy); return { name: policy?.name ?? '默认全局调度策略', description: policy?.description ?? '控制多个候选平台之间的故障切换;模型运行策略只可覆盖 failoverPolicy,不能覆盖 hardStopPolicy。', @@ -579,6 +599,7 @@ function runnerPolicyToForm(policy: GatewayRunnerPolicy | null): RunnerPolicyFor priorityDemoteCodes: tagsFromValue(priorityDemote.codes ?? ['network', 'timeout', 'stream_read_error', 'rate_limit', 'server_error', 'overloaded']), priorityDemoteStatusCodes: tagsFromValue(priorityDemote.statusCodes ?? [408, 429, 500, 502, 503, 504]), priorityDemoteKeywords: tagsFromValue(priorityDemote.keywords ?? ['timeout', 'network', 'rate_limit', 'overloaded', 'temporarily_unavailable', 'server_error', '429', '5xx']), + singleSourceProtectionEnabled: readBool(singleSource.enabled, true), metadataJson: JSON.stringify(policy?.metadata ?? {}, null, 2), status: policy?.status ?? 'active', }; @@ -617,6 +638,9 @@ function runnerFormToPayload(form: RunnerPolicyForm): GatewayRunnerPolicyUpsertR statusCodes: parseNumberTags(form.priorityDemoteStatusCodes), keywords: cleanTags(form.priorityDemoteKeywords), }, + singleSourcePolicy: { + enabled: form.singleSourceProtectionEnabled, + }, metadata: parseJson(form.metadataJson), status: form.status.trim() || 'active', }; diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts index d85714b..7f2c2b8 100644 --- a/packages/contracts/src/index.ts +++ b/packages/contracts/src/index.ts @@ -282,6 +282,7 @@ export interface GatewayRunnerPolicy { failoverPolicy?: Record; hardStopPolicy?: Record; priorityDemotePolicy?: Record; + singleSourcePolicy?: Record; metadata?: Record; status: 'active' | 'disabled' | string; createdAt: string; @@ -295,6 +296,7 @@ export interface GatewayRunnerPolicyUpsertRequest { failoverPolicy?: Record; hardStopPolicy?: Record; priorityDemotePolicy?: Record; + singleSourcePolicy?: Record; metadata?: Record; status?: 'active' | 'disabled' | string; }