增加单一源保护全局策略
This commit is contained in:
parent
bffd4ecb98
commit
b860ef37e8
@ -11157,6 +11157,10 @@
|
||||
"type": "object",
|
||||
"additionalProperties": {}
|
||||
},
|
||||
"singleSourcePolicy": {
|
||||
"type": "object",
|
||||
"additionalProperties": {}
|
||||
},
|
||||
"status": {
|
||||
"type": "string"
|
||||
},
|
||||
@ -11193,6 +11197,10 @@
|
||||
"type": "object",
|
||||
"additionalProperties": {}
|
||||
},
|
||||
"singleSourcePolicy": {
|
||||
"type": "object",
|
||||
"additionalProperties": {}
|
||||
},
|
||||
"status": {
|
||||
"type": "string"
|
||||
}
|
||||
|
||||
@ -2108,6 +2108,9 @@ definitions:
|
||||
priorityDemotePolicy:
|
||||
additionalProperties: {}
|
||||
type: object
|
||||
singleSourcePolicy:
|
||||
additionalProperties: {}
|
||||
type: object
|
||||
status:
|
||||
type: string
|
||||
updatedAt:
|
||||
@ -2133,6 +2136,9 @@ definitions:
|
||||
priorityDemotePolicy:
|
||||
additionalProperties: {}
|
||||
type: object
|
||||
singleSourcePolicy:
|
||||
additionalProperties: {}
|
||||
type: object
|
||||
status:
|
||||
type: string
|
||||
type: object
|
||||
|
||||
@ -1282,6 +1282,41 @@ WHERE m.platform_id = $1::uuid
|
||||
if err := testPool.QueryRow(ctx, `SELECT status FROM integration_platforms WHERE id = $1::uuid`, invalidKeyPlatform.ID).Scan(&invalidKeyPlatformStatus); err != nil {
|
||||
t.Fatalf("read invalid key platform status: %v", err)
|
||||
}
|
||||
if invalidKeyPlatformStatus != "enabled" {
|
||||
t.Fatalf("single source protection should keep the only source enabled, got %q", invalidKeyPlatformStatus)
|
||||
}
|
||||
var validKeyPlatform struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
doJSON(t, server.URL, http.MethodPost, "/api/admin/platforms", loginResponse.AccessToken, map[string]any{
|
||||
"provider": "openai",
|
||||
"platformKey": "openai-auto-disable-success-" + suffixText,
|
||||
"name": "OpenAI Auto Disable Success",
|
||||
"baseUrl": "https://api.openai.com/v1",
|
||||
"authType": "bearer",
|
||||
"credentials": map[string]any{"mode": "simulation"},
|
||||
"priority": 60,
|
||||
}, http.StatusCreated, &validKeyPlatform)
|
||||
doJSON(t, server.URL, http.MethodPost, "/api/admin/platforms/"+validKeyPlatform.ID+"/models", loginResponse.AccessToken, map[string]any{
|
||||
"canonicalModelKey": "openai:gpt-4o-mini",
|
||||
"modelName": autoDisableModel,
|
||||
"modelAlias": autoDisableModel,
|
||||
"modelType": []string{"text_generate"},
|
||||
"displayName": "Auto Disable Smoke Success",
|
||||
}, http.StatusCreated, nil)
|
||||
doAPIV1ChatCompletionAndLoadTask(t, ctx, testPool, server.URL, apiKeyResponse.Secret, map[string]any{
|
||||
"model": autoDisableModel,
|
||||
"runMode": "simulation",
|
||||
"simulation": true,
|
||||
"simulationDurationMs": 5,
|
||||
"messages": []map[string]any{{"role": "user", "content": "disable with fallback please"}},
|
||||
}, "auto-disable-with-fallback-chat-"+suffixText, http.StatusOK, nil, &autoDisableTask.Task)
|
||||
if autoDisableTask.Task.Status != "succeeded" {
|
||||
t.Fatalf("auto disable task should fail over after disabling the bad source: %+v", autoDisableTask.Task)
|
||||
}
|
||||
if err := testPool.QueryRow(ctx, `SELECT status FROM integration_platforms WHERE id = $1::uuid`, invalidKeyPlatform.ID).Scan(&invalidKeyPlatformStatus); err != nil {
|
||||
t.Fatalf("read invalid key platform status after fallback: %v", err)
|
||||
}
|
||||
if invalidKeyPlatformStatus != "disabled" {
|
||||
t.Fatalf("auto disable policy should disable platform, got %q", invalidKeyPlatformStatus)
|
||||
}
|
||||
|
||||
@ -37,6 +37,40 @@ func TestFailoverBudgetDefaults(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSingleSourceProtectionDefaultsOn(t *testing.T) {
|
||||
runnerPolicy := store.RunnerPolicy{Status: "active"}
|
||||
|
||||
if !singleSourceProtectionActive([]store.RuntimeModelCandidate{{PlatformID: "only-source"}}, 99, runnerPolicy) {
|
||||
t.Fatal("single source protection should default to enabled")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSingleSourceProtectionCanBeDisabled(t *testing.T) {
|
||||
runnerPolicy := store.RunnerPolicy{
|
||||
Status: "active",
|
||||
SingleSourcePolicy: map[string]any{"enabled": false},
|
||||
}
|
||||
|
||||
if singleSourceProtectionActive([]store.RuntimeModelCandidate{{PlatformID: "only-source"}}, 99, runnerPolicy) {
|
||||
t.Fatal("single source protection should respect the global toggle")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSingleSourceProtectionUsesEffectivePlatformBudget(t *testing.T) {
|
||||
candidates := []store.RuntimeModelCandidate{
|
||||
{PlatformID: "first-source"},
|
||||
{PlatformID: "second-source"},
|
||||
}
|
||||
runnerPolicy := store.RunnerPolicy{Status: "active"}
|
||||
|
||||
if !singleSourceProtectionActive(candidates, 1, runnerPolicy) {
|
||||
t.Fatal("maxPlatforms=1 should make this execution a single-source run")
|
||||
}
|
||||
if singleSourceProtectionActive(candidates, 2, runnerPolicy) {
|
||||
t.Fatal("multiple effective sources should allow disable and cooldown actions")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFailoverTimeBudgetExceeded(t *testing.T) {
|
||||
if !failoverTimeBudgetExceeded(time.Now().Add(-601*time.Second), 10*time.Minute) {
|
||||
t.Fatal("failover time budget should stop retries after the configured duration")
|
||||
|
||||
@ -9,7 +9,7 @@ import (
|
||||
"github.com/easyai/easyai-ai-gateway/apps/api/internal/store"
|
||||
)
|
||||
|
||||
func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, cause error, simulated bool) {
|
||||
func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, cause error, simulated bool, singleSourceProtected bool) {
|
||||
code := clients.ErrorCode(cause)
|
||||
message := ""
|
||||
if cause != nil {
|
||||
@ -18,7 +18,9 @@ func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID stri
|
||||
|
||||
autoDisablePolicy := effectiveRuntimePolicy(candidate.AutoDisablePolicy, candidate.RuntimePolicyOverride, "autoDisablePolicy")
|
||||
if failurePolicyMatches(autoDisablePolicy, code, message) && intFromPolicy(autoDisablePolicy, "threshold") <= 1 {
|
||||
if err := s.store.DisableCandidatePlatform(ctx, candidate.PlatformID); err == nil {
|
||||
if singleSourceProtected {
|
||||
s.emitSingleSourceProtected(ctx, taskID, candidate, "auto_disable", code, simulated)
|
||||
} else if err := s.store.DisableCandidatePlatform(ctx, candidate.PlatformID); err == nil {
|
||||
_ = s.emit(ctx, taskID, "task.policy.auto_disabled", "running", "auto_disable", 0.48, "candidate platform disabled by failure policy", map[string]any{
|
||||
"platformId": candidate.PlatformID,
|
||||
"platformModelId": candidate.PlatformModelID,
|
||||
@ -30,7 +32,9 @@ func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID stri
|
||||
degradePolicy := effectiveRuntimePolicy(candidate.DegradePolicy, candidate.RuntimePolicyOverride, "degradePolicy")
|
||||
if failurePolicyMatches(degradePolicy, code, message) {
|
||||
cooldownSeconds := intFromPolicy(degradePolicy, "cooldownSeconds")
|
||||
if err := s.store.CooldownCandidatePlatformModel(ctx, candidate.PlatformModelID, cooldownSeconds); err == nil {
|
||||
if singleSourceProtected {
|
||||
s.emitSingleSourceProtected(ctx, taskID, candidate, "degrade_cooldown", code, simulated)
|
||||
} else if err := s.store.CooldownCandidatePlatformModel(ctx, candidate.PlatformModelID, cooldownSeconds); err == nil {
|
||||
_ = s.emit(ctx, taskID, "task.policy.degraded", "running", "degrade", 0.5, "candidate model cooled down by failure policy", map[string]any{
|
||||
"platformId": candidate.PlatformID,
|
||||
"platformModelId": candidate.PlatformModelID,
|
||||
@ -41,9 +45,13 @@ func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID stri
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, decision failoverDecision, simulated bool) {
|
||||
func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, decision failoverDecision, simulated bool, singleSourceProtected bool) {
|
||||
switch decision.Action {
|
||||
case "disable_and_next":
|
||||
if singleSourceProtected {
|
||||
s.emitSingleSourceProtected(ctx, taskID, candidate, decision.Action, decision.Info.Code, simulated)
|
||||
return
|
||||
}
|
||||
if err := s.store.DisableCandidatePlatform(ctx, candidate.PlatformID); err == nil {
|
||||
_ = s.emit(ctx, taskID, "task.policy.failover_disabled", "running", "failover", 0.51, "candidate platform disabled by runner failover policy", addPolicyTracePayload(map[string]any{
|
||||
"platformId": candidate.PlatformID,
|
||||
@ -53,6 +61,10 @@ func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candid
|
||||
}, decision.Match, decision.Info), simulated)
|
||||
}
|
||||
case "cooldown_and_next":
|
||||
if singleSourceProtected {
|
||||
s.emitSingleSourceProtected(ctx, taskID, candidate, decision.Action, decision.Info.Code, simulated)
|
||||
return
|
||||
}
|
||||
if err := s.store.CooldownCandidatePlatformModel(ctx, candidate.PlatformModelID, decision.CooldownSeconds); err == nil {
|
||||
_ = s.emit(ctx, taskID, "task.policy.failover_cooled_down", "running", "failover", 0.51, "candidate model cooled down by runner failover policy", addPolicyTracePayload(map[string]any{
|
||||
"platformId": candidate.PlatformID,
|
||||
@ -65,6 +77,15 @@ func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candid
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) emitSingleSourceProtected(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, action string, code string, simulated bool) {
|
||||
_ = s.emit(ctx, taskID, "task.policy.single_source_protected", "running", "policy_guard", 0.5, "single source protected from disable or cooldown", map[string]any{
|
||||
"platformId": candidate.PlatformID,
|
||||
"platformModelId": candidate.PlatformModelID,
|
||||
"action": action,
|
||||
"code": code,
|
||||
}, simulated)
|
||||
}
|
||||
|
||||
func (s *Service) applyPriorityDemotePolicy(ctx context.Context, taskID string, attemptNo int, runnerPolicy store.RunnerPolicy, candidate store.RuntimeModelCandidate, requestedModel string, cause error, simulated bool) {
|
||||
if errors.Is(cause, store.ErrRateLimited) {
|
||||
return
|
||||
|
||||
@ -240,6 +240,7 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
|
||||
}
|
||||
maxPlatforms := maxPlatformsForCandidates(candidates, runnerPolicy)
|
||||
maxFailoverDuration := maxFailoverDurationForCandidates(candidates, runnerPolicy)
|
||||
singleSourceProtected := singleSourceProtectionActive(candidates, maxPlatforms, runnerPolicy)
|
||||
var lastErr error
|
||||
var lastCandidate store.RuntimeModelCandidate
|
||||
var lastPreprocessing *parameterPreprocessingLog
|
||||
@ -275,7 +276,7 @@ candidatesLoop:
|
||||
break candidatesLoop
|
||||
}
|
||||
candidateBody := preprocessing.Body
|
||||
response, err := s.runCandidate(ctx, task, user, candidateBody, preprocessing.Log, candidate, nextAttemptNo, onDelta)
|
||||
response, err := s.runCandidate(ctx, task, user, candidateBody, preprocessing.Log, candidate, nextAttemptNo, onDelta, singleSourceProtected)
|
||||
if err == nil {
|
||||
attemptNo = nextAttemptNo
|
||||
billings := s.billings(ctx, user, task.Kind, candidateBody, candidate, response, isSimulation(task, candidate))
|
||||
@ -445,7 +446,7 @@ candidatesLoop:
|
||||
if !decision.Retry {
|
||||
break
|
||||
}
|
||||
s.applyFailoverAction(ctx, task.ID, candidate, decision, isSimulation(task, candidate))
|
||||
s.applyFailoverAction(ctx, task.ID, candidate, decision, isSimulation(task, candidate), singleSourceProtected)
|
||||
if err := s.emit(ctx, task.ID, "task.retrying", "running", "retry", 0.55, "retrying next client", addPolicyTracePayload(map[string]any{
|
||||
"attempt": attemptNo,
|
||||
"action": decision.Action,
|
||||
@ -486,7 +487,7 @@ candidatesLoop:
|
||||
return Result{Task: failed, Output: failed.Result}, lastErr
|
||||
}
|
||||
|
||||
func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, preprocessing parameterPreprocessingLog, candidate store.RuntimeModelCandidate, attemptNo int, onDelta clients.StreamDelta) (clients.Response, error) {
|
||||
func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, preprocessing parameterPreprocessingLog, candidate store.RuntimeModelCandidate, attemptNo int, onDelta clients.StreamDelta, singleSourceProtected bool) (clients.Response, error) {
|
||||
simulated := isSimulation(task, candidate)
|
||||
baseAttemptMetrics := mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), parameterPreprocessingMetrics(preprocessing))
|
||||
reservations := s.rateLimitReservations(ctx, user, candidate, body)
|
||||
@ -638,7 +639,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
ErrorMessage: err.Error(),
|
||||
})
|
||||
_ = s.emit(ctx, task.ID, "task.attempt.failed", "running", "attempt_failed", 0.45, err.Error(), map[string]any{"attempt": attemptNo, "retryable": retryable, "requestId": requestID, "statusCode": clients.ErrorResponseMetadata(err).StatusCode, "metrics": metrics}, simulated)
|
||||
s.applyCandidateFailurePolicies(ctx, task.ID, candidate, err, simulated)
|
||||
s.applyCandidateFailurePolicies(ctx, task.ID, candidate, err, simulated, singleSourceProtected)
|
||||
return clients.Response{}, err
|
||||
}
|
||||
uploadedResult, err := s.uploadGeneratedAssets(ctx, task.ID, task.Kind, response.Result)
|
||||
@ -1099,6 +1100,42 @@ func maxPlatformsForCandidates(candidates []store.RuntimeModelCandidate, runnerP
|
||||
return maxPlatforms
|
||||
}
|
||||
|
||||
func singleSourceProtectionActive(candidates []store.RuntimeModelCandidate, maxPlatforms int, runnerPolicy store.RunnerPolicy) bool {
|
||||
if !runnerSingleSourceProtectionEnabled(runnerPolicy) {
|
||||
return false
|
||||
}
|
||||
return runtimeCandidateSourceCount(candidates, maxPlatforms) <= 1
|
||||
}
|
||||
|
||||
func runnerSingleSourceProtectionEnabled(runnerPolicy store.RunnerPolicy) bool {
|
||||
if enabled, ok := runnerPolicy.SingleSourcePolicy["enabled"].(bool); ok {
|
||||
return enabled
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func runtimeCandidateSourceCount(candidates []store.RuntimeModelCandidate, maxPlatforms int) int {
|
||||
limit := len(candidates)
|
||||
if maxPlatforms > 0 && maxPlatforms < limit {
|
||||
limit = maxPlatforms
|
||||
}
|
||||
sources := make(map[string]bool, limit)
|
||||
for index := 0; index < limit; index++ {
|
||||
key := strings.TrimSpace(candidates[index].PlatformID)
|
||||
if key == "" {
|
||||
key = strings.TrimSpace(candidates[index].PlatformModelID)
|
||||
}
|
||||
if key == "" {
|
||||
key = strings.TrimSpace(candidates[index].ClientID)
|
||||
}
|
||||
if key == "" {
|
||||
key = strconv.Itoa(index)
|
||||
}
|
||||
sources[key] = true
|
||||
}
|
||||
return len(sources)
|
||||
}
|
||||
|
||||
func maxFailoverDurationForCandidates(candidates []store.RuntimeModelCandidate, runnerPolicy store.RunnerPolicy) time.Duration {
|
||||
seconds := intFromPolicy(runnerPolicy.FailoverPolicy, "maxDurationSeconds")
|
||||
if seconds <= 0 {
|
||||
|
||||
@ -277,6 +277,7 @@ type RunnerPolicy struct {
|
||||
FailoverPolicy map[string]any `json:"failoverPolicy,omitempty"`
|
||||
HardStopPolicy map[string]any `json:"hardStopPolicy,omitempty"`
|
||||
PriorityDemotePolicy map[string]any `json:"priorityDemotePolicy,omitempty"`
|
||||
SingleSourcePolicy map[string]any `json:"singleSourcePolicy,omitempty"`
|
||||
Metadata map[string]any `json:"metadata,omitempty"`
|
||||
Status string `json:"status"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
|
||||
@ -11,7 +11,7 @@ import (
|
||||
|
||||
const runnerPolicyColumns = `
|
||||
id::text, policy_key, name, COALESCE(description, ''), failover_policy,
|
||||
hard_stop_policy, priority_demote_policy, metadata, status, created_at, updated_at`
|
||||
hard_stop_policy, priority_demote_policy, single_source_policy, metadata, status, created_at, updated_at`
|
||||
|
||||
type RunnerPolicyInput struct {
|
||||
PolicyKey string `json:"policyKey"`
|
||||
@ -20,6 +20,7 @@ type RunnerPolicyInput struct {
|
||||
FailoverPolicy map[string]any `json:"failoverPolicy"`
|
||||
HardStopPolicy map[string]any `json:"hardStopPolicy"`
|
||||
PriorityDemotePolicy map[string]any `json:"priorityDemotePolicy"`
|
||||
SingleSourcePolicy map[string]any `json:"singleSourcePolicy"`
|
||||
Metadata map[string]any `json:"metadata"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
@ -50,23 +51,25 @@ func (s *Store) UpsertDefaultRunnerPolicy(ctx context.Context, input RunnerPolic
|
||||
failoverPolicy, _ := json.Marshal(emptyObjectIfNil(input.FailoverPolicy))
|
||||
hardStopPolicy, _ := json.Marshal(emptyObjectIfNil(input.HardStopPolicy))
|
||||
priorityDemotePolicy, _ := json.Marshal(emptyObjectIfNil(input.PriorityDemotePolicy))
|
||||
singleSourcePolicy, _ := json.Marshal(emptyObjectIfNil(input.SingleSourcePolicy))
|
||||
metadata, _ := json.Marshal(emptyObjectIfNil(input.Metadata))
|
||||
return scanRunnerPolicy(s.pool.QueryRow(ctx, `
|
||||
INSERT INTO gateway_runner_policies (
|
||||
policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, metadata, status
|
||||
policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, single_source_policy, metadata, status
|
||||
)
|
||||
VALUES ($1, $2, NULLIF($3, ''), $4, $5, $6, $7, $8)
|
||||
VALUES ($1, $2, NULLIF($3, ''), $4, $5, $6, $7, $8, $9)
|
||||
ON CONFLICT (policy_key) DO UPDATE
|
||||
SET name = EXCLUDED.name,
|
||||
description = EXCLUDED.description,
|
||||
failover_policy = EXCLUDED.failover_policy,
|
||||
hard_stop_policy = EXCLUDED.hard_stop_policy,
|
||||
priority_demote_policy = EXCLUDED.priority_demote_policy,
|
||||
single_source_policy = EXCLUDED.single_source_policy,
|
||||
metadata = EXCLUDED.metadata,
|
||||
status = EXCLUDED.status,
|
||||
updated_at = now()
|
||||
RETURNING `+runnerPolicyColumns,
|
||||
input.PolicyKey, input.Name, input.Description, failoverPolicy, hardStopPolicy, priorityDemotePolicy, metadata, input.Status,
|
||||
input.PolicyKey, input.Name, input.Description, failoverPolicy, hardStopPolicy, priorityDemotePolicy, singleSourcePolicy, metadata, input.Status,
|
||||
))
|
||||
}
|
||||
|
||||
@ -75,6 +78,7 @@ func scanRunnerPolicy(scanner runnerPolicyScanner) (RunnerPolicy, error) {
|
||||
var failoverPolicy []byte
|
||||
var hardStopPolicy []byte
|
||||
var priorityDemotePolicy []byte
|
||||
var singleSourcePolicy []byte
|
||||
var metadata []byte
|
||||
if err := scanner.Scan(
|
||||
&item.ID,
|
||||
@ -84,6 +88,7 @@ func scanRunnerPolicy(scanner runnerPolicyScanner) (RunnerPolicy, error) {
|
||||
&failoverPolicy,
|
||||
&hardStopPolicy,
|
||||
&priorityDemotePolicy,
|
||||
&singleSourcePolicy,
|
||||
&metadata,
|
||||
&item.Status,
|
||||
&item.CreatedAt,
|
||||
@ -94,6 +99,7 @@ func scanRunnerPolicy(scanner runnerPolicyScanner) (RunnerPolicy, error) {
|
||||
item.FailoverPolicy = decodeObject(failoverPolicy)
|
||||
item.HardStopPolicy = decodeObject(hardStopPolicy)
|
||||
item.PriorityDemotePolicy = decodeObject(priorityDemotePolicy)
|
||||
item.SingleSourcePolicy = decodeObject(singleSourcePolicy)
|
||||
item.Metadata = decodeObject(metadata)
|
||||
return item, nil
|
||||
}
|
||||
@ -124,6 +130,7 @@ func defaultRunnerPolicy() RunnerPolicy {
|
||||
FailoverPolicy: defaultRunnerFailoverPolicy(),
|
||||
HardStopPolicy: defaultRunnerHardStopPolicy(),
|
||||
PriorityDemotePolicy: defaultRunnerPriorityDemotePolicy(),
|
||||
SingleSourcePolicy: defaultRunnerSingleSourcePolicy(),
|
||||
Metadata: map[string]any{"source": "code-default"},
|
||||
Status: "active",
|
||||
CreatedAt: now,
|
||||
@ -141,6 +148,12 @@ func defaultRunnerPriorityDemotePolicy() map[string]any {
|
||||
}
|
||||
}
|
||||
|
||||
func defaultRunnerSingleSourcePolicy() map[string]any {
|
||||
return map[string]any{
|
||||
"enabled": true,
|
||||
}
|
||||
}
|
||||
|
||||
func defaultRunnerFailoverPolicy() map[string]any {
|
||||
return map[string]any{
|
||||
"enabled": true,
|
||||
|
||||
@ -6,6 +6,7 @@ CREATE TABLE IF NOT EXISTS gateway_runner_policies (
|
||||
failover_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
hard_stop_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
priority_demote_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
single_source_policy jsonb NOT NULL DEFAULT '{"enabled":true}'::jsonb,
|
||||
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
status text NOT NULL DEFAULT 'active',
|
||||
created_at timestamptz NOT NULL DEFAULT now(),
|
||||
@ -15,8 +16,11 @@ CREATE TABLE IF NOT EXISTS gateway_runner_policies (
|
||||
ALTER TABLE IF EXISTS gateway_runner_policies
|
||||
ADD COLUMN IF NOT EXISTS priority_demote_policy jsonb NOT NULL DEFAULT '{}'::jsonb;
|
||||
|
||||
ALTER TABLE IF EXISTS gateway_runner_policies
|
||||
ADD COLUMN IF NOT EXISTS single_source_policy jsonb NOT NULL DEFAULT '{"enabled":true}'::jsonb;
|
||||
|
||||
INSERT INTO gateway_runner_policies (
|
||||
policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, metadata, status
|
||||
policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, single_source_policy, metadata, status
|
||||
)
|
||||
VALUES (
|
||||
'default-runner-v1',
|
||||
@ -54,6 +58,7 @@ VALUES (
|
||||
"statusCodes": [408, 429, 500, 502, 503, 504],
|
||||
"keywords": ["timeout", "network", "rate_limit", "overloaded", "temporarily_unavailable", "server_error", "429", "5xx"]
|
||||
}'::jsonb,
|
||||
'{"enabled": true}'::jsonb,
|
||||
'{"seed":"0026_runner_policies"}'::jsonb,
|
||||
'active'
|
||||
)
|
||||
@ -63,6 +68,7 @@ SET name = EXCLUDED.name,
|
||||
failover_policy = gateway_runner_policies.failover_policy || EXCLUDED.failover_policy,
|
||||
hard_stop_policy = gateway_runner_policies.hard_stop_policy || EXCLUDED.hard_stop_policy,
|
||||
priority_demote_policy = gateway_runner_policies.priority_demote_policy || EXCLUDED.priority_demote_policy,
|
||||
single_source_policy = gateway_runner_policies.single_source_policy || EXCLUDED.single_source_policy,
|
||||
metadata = gateway_runner_policies.metadata || EXCLUDED.metadata,
|
||||
updated_at = now();
|
||||
|
||||
|
||||
9
apps/api/migrations/0050_runner_single_source_policy.sql
Normal file
9
apps/api/migrations/0050_runner_single_source_policy.sql
Normal file
@ -0,0 +1,9 @@
|
||||
ALTER TABLE IF EXISTS gateway_runner_policies
|
||||
ADD COLUMN IF NOT EXISTS single_source_policy jsonb NOT NULL DEFAULT '{"enabled":true}'::jsonb;
|
||||
|
||||
UPDATE gateway_runner_policies
|
||||
SET single_source_policy = COALESCE(single_source_policy, '{}'::jsonb) || '{"enabled":true}'::jsonb,
|
||||
metadata = metadata || jsonb_build_object('singleSourcePolicy', '0050_runner_single_source_policy'),
|
||||
updated_at = now()
|
||||
WHERE COALESCE(single_source_policy, '{}'::jsonb) = '{}'::jsonb
|
||||
OR NOT (COALESCE(single_source_policy, '{}'::jsonb) ? 'enabled');
|
||||
@ -6,7 +6,7 @@ import { Badge, Button, Card, CardContent, CardHeader, CardTitle, ConfirmDialog,
|
||||
import type { LoadState } from '../../types';
|
||||
|
||||
type RuntimePanelTab = 'model' | 'runner';
|
||||
type RunnerPolicyStrategy = 'failover' | 'hardStop' | 'priorityDemote';
|
||||
type RunnerPolicyStrategy = 'failover' | 'hardStop' | 'priorityDemote' | 'singleSource';
|
||||
|
||||
type RuntimePolicyForm = {
|
||||
policyKey: string;
|
||||
@ -54,6 +54,7 @@ type RunnerPolicyForm = {
|
||||
priorityDemoteCodes: string[];
|
||||
priorityDemoteStatusCodes: string[];
|
||||
priorityDemoteKeywords: string[];
|
||||
singleSourceProtectionEnabled: boolean;
|
||||
metadataJson: string;
|
||||
status: string;
|
||||
};
|
||||
@ -353,6 +354,13 @@ function RunnerPolicyEditor(props: {
|
||||
enabled: props.form.priorityDemoteEnabled,
|
||||
icon: <Gauge size={15} />,
|
||||
},
|
||||
{
|
||||
value: 'singleSource' as const,
|
||||
title: '单一源保护',
|
||||
description: '只有一个可用源时跳过自动禁用和冷却',
|
||||
enabled: props.form.singleSourceProtectionEnabled,
|
||||
icon: <ShieldCheck size={15} />,
|
||||
},
|
||||
];
|
||||
const activeDefinition = strategyDefinitions.find((item) => item.value === activeStrategy) ?? strategyDefinitions[0];
|
||||
|
||||
@ -455,6 +463,17 @@ function RunnerPolicyEditor(props: {
|
||||
<KeywordField label="降级关键词" value={props.form.priorityDemoteKeywords} onChange={(value) => patch({ priorityDemoteKeywords: value })} />
|
||||
</div>
|
||||
)}
|
||||
|
||||
{activeStrategy === 'singleSource' && (
|
||||
<div className="runtimePolicyRows runnerPolicyDetailRows">
|
||||
<Toggle
|
||||
checked={props.form.singleSourceProtectionEnabled}
|
||||
label="启用单一源保护"
|
||||
onChange={(checked) => patch({ singleSourceProtectionEnabled: checked })}
|
||||
/>
|
||||
<span className="runtimeFieldHint spanTwo">本次调度只有一个可用源时,命中自动禁用、运行策略冷却或故障切换冷却动作都只记录保护事件,不修改平台或模型状态。</span>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@ -554,6 +573,7 @@ function runnerPolicyToForm(policy: GatewayRunnerPolicy | null): RunnerPolicyFor
|
||||
const failover = readObject(policy?.failoverPolicy);
|
||||
const hardStop = readObject(policy?.hardStopPolicy);
|
||||
const priorityDemote = readObject(policy?.priorityDemotePolicy);
|
||||
const singleSource = readObject(policy?.singleSourcePolicy);
|
||||
return {
|
||||
name: policy?.name ?? '默认全局调度策略',
|
||||
description: policy?.description ?? '控制多个候选平台之间的故障切换;模型运行策略只可覆盖 failoverPolicy,不能覆盖 hardStopPolicy。',
|
||||
@ -579,6 +599,7 @@ function runnerPolicyToForm(policy: GatewayRunnerPolicy | null): RunnerPolicyFor
|
||||
priorityDemoteCodes: tagsFromValue(priorityDemote.codes ?? ['network', 'timeout', 'stream_read_error', 'rate_limit', 'server_error', 'overloaded']),
|
||||
priorityDemoteStatusCodes: tagsFromValue(priorityDemote.statusCodes ?? [408, 429, 500, 502, 503, 504]),
|
||||
priorityDemoteKeywords: tagsFromValue(priorityDemote.keywords ?? ['timeout', 'network', 'rate_limit', 'overloaded', 'temporarily_unavailable', 'server_error', '429', '5xx']),
|
||||
singleSourceProtectionEnabled: readBool(singleSource.enabled, true),
|
||||
metadataJson: JSON.stringify(policy?.metadata ?? {}, null, 2),
|
||||
status: policy?.status ?? 'active',
|
||||
};
|
||||
@ -617,6 +638,9 @@ function runnerFormToPayload(form: RunnerPolicyForm): GatewayRunnerPolicyUpsertR
|
||||
statusCodes: parseNumberTags(form.priorityDemoteStatusCodes),
|
||||
keywords: cleanTags(form.priorityDemoteKeywords),
|
||||
},
|
||||
singleSourcePolicy: {
|
||||
enabled: form.singleSourceProtectionEnabled,
|
||||
},
|
||||
metadata: parseJson(form.metadataJson),
|
||||
status: form.status.trim() || 'active',
|
||||
};
|
||||
|
||||
@ -282,6 +282,7 @@ export interface GatewayRunnerPolicy {
|
||||
failoverPolicy?: Record<string, unknown>;
|
||||
hardStopPolicy?: Record<string, unknown>;
|
||||
priorityDemotePolicy?: Record<string, unknown>;
|
||||
singleSourcePolicy?: Record<string, unknown>;
|
||||
metadata?: Record<string, unknown>;
|
||||
status: 'active' | 'disabled' | string;
|
||||
createdAt: string;
|
||||
@ -295,6 +296,7 @@ export interface GatewayRunnerPolicyUpsertRequest {
|
||||
failoverPolicy?: Record<string, unknown>;
|
||||
hardStopPolicy?: Record<string, unknown>;
|
||||
priorityDemotePolicy?: Record<string, unknown>;
|
||||
singleSourcePolicy?: Record<string, unknown>;
|
||||
metadata?: Record<string, unknown>;
|
||||
status?: 'active' | 'disabled' | string;
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user