feat: add runner failover policies and traces

This commit is contained in:
wangbo 2026-05-12 02:16:42 +08:00
parent be31923e74
commit 05632172d0
26 changed files with 2033 additions and 140 deletions

View File

@ -19,6 +19,31 @@ func (s *Server) listRuntimePolicySets(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{"items": items})
}
func (s *Server) getRunnerPolicy(w http.ResponseWriter, r *http.Request) {
item, err := s.store.GetActiveRunnerPolicy(r.Context())
if err != nil {
s.logger.Error("get runner policy failed", "error", err)
writeError(w, http.StatusInternalServerError, "get runner policy failed")
return
}
writeJSON(w, http.StatusOK, item)
}
func (s *Server) updateRunnerPolicy(w http.ResponseWriter, r *http.Request) {
var input store.RunnerPolicyInput
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
item, err := s.store.UpsertDefaultRunnerPolicy(r.Context(), input)
if err != nil {
s.logger.Error("update runner policy failed", "error", err)
writeError(w, http.StatusInternalServerError, "update runner policy failed")
return
}
writeJSON(w, http.StatusOK, item)
}
func (s *Server) createRuntimePolicySet(w http.ResponseWriter, r *http.Request) {
var input store.RuntimePolicySetInput
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {

View File

@ -90,6 +90,8 @@ func NewServer(cfg config.Config, db *store.Store, logger *slog.Logger) http.Han
mux.Handle("POST /api/admin/runtime/policy-sets", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.createRuntimePolicySet)))
mux.Handle("PATCH /api/admin/runtime/policy-sets/{policySetID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updateRuntimePolicySet)))
mux.Handle("DELETE /api/admin/runtime/policy-sets/{policySetID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.deleteRuntimePolicySet)))
mux.Handle("GET /api/admin/runtime/runner-policy", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.getRunnerPolicy)))
mux.Handle("PATCH /api/admin/runtime/runner-policy", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updateRunnerPolicy)))
mux.Handle("GET /api/admin/config/network-proxy", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.getNetworkProxyConfig)))
mux.Handle("GET /api/admin/platforms", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listPlatforms)))
mux.Handle("POST /api/admin/platforms", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.createPlatform)))

View File

@ -182,9 +182,12 @@ func failureMetrics(err error, simulated bool) (string, map[string]any, time.Tim
metrics := map[string]any{
"simulated": simulated,
}
retryable := clients.IsRetryable(err)
if err != nil {
info := failureInfoFromError(err)
metrics["error"] = err.Error()
metrics["retryable"] = clients.IsRetryable(err)
metrics["errorCategory"] = info.Category
metrics["retryable"] = retryable
}
if meta.StatusCode > 0 {
metrics["statusCode"] = meta.StatusCode
@ -195,6 +198,9 @@ func failureMetrics(err error, simulated bool) (string, map[string]any, time.Tim
if meta.ResponseDurationMS > 0 {
metrics["responseDurationMs"] = meta.ResponseDurationMS
}
if err != nil {
metrics["trace"] = []any{failureTraceEntry(err, retryable)}
}
return meta.RequestID, metrics, meta.ResponseStartedAt, meta.ResponseFinishedAt, meta.ResponseDurationMS
}
@ -227,12 +233,16 @@ func summarizeAttempts(attempts []store.TaskAttempt) []map[string]any {
"requestId": attempt.RequestID,
"retryable": attempt.Retryable,
"simulated": attempt.Simulated,
"statusCode": attempt.StatusCode,
"errorCode": attempt.ErrorCode,
"errorMessage": attempt.ErrorMessage,
"responseDurationMs": attempt.ResponseDurationMS,
"startedAt": attempt.StartedAt,
"finishedAt": attempt.FinishedAt,
}
if trace, ok := attempt.Metrics["trace"]; ok {
item["trace"] = trace
}
items = append(items, item)
}
return items

View File

@ -0,0 +1,446 @@
package runner
import (
"fmt"
"strings"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/clients"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/store"
)
type failureInfo struct {
Code string
Message string
Status int
Category string
Target string
}
type policyRuleMatch struct {
Source string
Policy string
Rule string
Value string
}
type retryDecision struct {
Retry bool
Reason string
Match policyRuleMatch
Info failureInfo
}
type failoverDecision struct {
Retry bool
Action string
Reason string
CooldownSeconds int
Match policyRuleMatch
Info failureInfo
}
type priorityDemoteDecision struct {
Demote bool
Reason string
Step int
Match policyRuleMatch
Info failureInfo
}
func shouldRetrySameClient(candidate store.RuntimeModelCandidate, err error) bool {
return retryDecisionForCandidate(candidate, err).Retry
}
func retryDecisionForCandidate(candidate store.RuntimeModelCandidate, err error) retryDecision {
policy := effectiveRetryPolicy(candidate)
info := failureInfoFromError(err)
if !boolFromPolicy(policy, "enabled", true) {
return retryDecision{Retry: false, Reason: "retry_disabled", Match: policyRuleMatch{Source: "model_runtime_policy_sets.retry_policy", Policy: "retryPolicy", Rule: "enabled", Value: "false"}, Info: info}
}
if match, ok := retryPolicyDenyMatch(policy, info, "model_runtime_policy_sets.retry_policy", "retryPolicy"); ok {
return retryDecision{Retry: false, Reason: "retry_deny_policy", Match: match, Info: info}
}
if match, ok := retryPolicyAllowMatch(policy, info, "model_runtime_policy_sets.retry_policy", "retryPolicy"); ok {
return retryDecision{Retry: true, Reason: "retry_allow_policy", Match: match, Info: info}
}
if clients.IsRetryable(err) {
return retryDecision{Retry: true, Reason: "client_retryable", Match: policyRuleMatch{Source: "provider_client", Policy: "ClientError", Rule: "Retryable", Value: "true"}, Info: info}
}
return retryDecision{Retry: false, Reason: "client_non_retryable", Match: policyRuleMatch{Source: "provider_client", Policy: "ClientError", Rule: "Retryable", Value: "false"}, Info: info}
}
func failoverDecisionForCandidate(runnerPolicy store.RunnerPolicy, candidate store.RuntimeModelCandidate, err error) failoverDecision {
info := failureInfoFromError(err)
if strings.TrimSpace(runnerPolicy.Status) != "" && runnerPolicy.Status != "active" {
return failoverDecision{Retry: false, Action: "stop", Reason: "runner_policy_disabled", Match: policyRuleMatch{Source: "gateway_runner_policies", Policy: "runnerPolicy", Rule: "status", Value: runnerPolicy.Status}, Info: info}
}
if match, ok := hardStopPolicyMatch(runnerPolicy.HardStopPolicy, info); ok {
return failoverDecision{Retry: false, Action: "stop", Reason: "hard_stop_policy", Match: match, Info: info}
}
overridePolicy := failoverOverridePolicy(candidate.RuntimePolicyOverride)
policy := effectiveFailoverPolicy(runnerPolicy.FailoverPolicy, candidate.RuntimePolicyOverride)
if !boolFromPolicy(policy, "enabled", true) {
source := "gateway_runner_policies.failover_policy"
if _, ok := overridePolicy["enabled"]; ok {
source = "runtime_policy_override.failoverPolicy"
}
return failoverDecision{Retry: false, Action: "stop", Reason: "failover_disabled", Match: policyRuleMatch{Source: source, Policy: "failoverPolicy", Rule: "enabled", Value: "false"}, Info: info}
}
if match, ok := failoverDenyMatchWithSources(runnerPolicy.FailoverPolicy, overridePolicy, info); ok {
return failoverDecision{Retry: false, Action: "stop", Reason: "failover_deny_policy", Match: match, Info: info}
}
action := failoverAction(policy, info)
cooldownSeconds := intFromPolicy(policy, "cooldownSeconds")
if cooldownSeconds <= 0 {
cooldownSeconds = 300
}
if match, ok := failoverAllowMatchWithSources(runnerPolicy.FailoverPolicy, overridePolicy, info); ok {
return failoverDecision{Retry: true, Action: action, Reason: "failover_allow_policy", CooldownSeconds: cooldownSeconds, Match: match, Info: info}
}
if clients.IsRetryable(err) {
return failoverDecision{Retry: true, Action: action, Reason: "client_retryable", CooldownSeconds: cooldownSeconds, Match: policyRuleMatch{Source: "provider_client", Policy: "ClientError", Rule: "Retryable", Value: "true"}, Info: info}
}
return failoverDecision{Retry: false, Action: "stop", Reason: "client_non_retryable", Match: policyRuleMatch{Source: "provider_client", Policy: "ClientError", Rule: "Retryable", Value: "false"}, Info: info}
}
func shouldDemoteCandidatePriority(runnerPolicy store.RunnerPolicy, err error) (bool, int) {
decision := priorityDemoteDecisionForCandidate(runnerPolicy, err)
return decision.Demote, decision.Step
}
func priorityDemoteDecisionForCandidate(runnerPolicy store.RunnerPolicy, err error) priorityDemoteDecision {
info := failureInfoFromError(err)
if strings.TrimSpace(runnerPolicy.Status) != "" && runnerPolicy.Status != "active" {
return priorityDemoteDecision{Demote: false, Reason: "runner_policy_disabled", Info: info}
}
if hardStopPolicyMatches(runnerPolicy.HardStopPolicy, info) {
return priorityDemoteDecision{Demote: false, Reason: "hard_stop_policy", Info: info}
}
policy := runnerPolicy.PriorityDemotePolicy
if !boolFromPolicy(policy, "enabled", false) {
return priorityDemoteDecision{Demote: false, Reason: "priority_demote_disabled", Info: info}
}
if match, ok := priorityDemotePolicyMatch(policy, info); ok {
step := intFromPolicy(policy, "demoteStep")
if step <= 0 {
step = 100
}
return priorityDemoteDecision{Demote: true, Reason: "priority_demote_policy", Step: step, Match: match, Info: info}
}
return priorityDemoteDecision{Demote: false, Reason: "priority_demote_no_match", Info: info}
}
func effectiveFailoverPolicy(base map[string]any, override map[string]any) map[string]any {
policy := base
if nested := failoverOverridePolicy(override); len(nested) > 0 {
policy = mergeMap(policy, nested)
}
return policy
}
func failoverOverridePolicy(override map[string]any) map[string]any {
if nested, ok := override["failoverPolicy"].(map[string]any); ok {
return nested
}
return nil
}
func failureInfoFromError(err error) failureInfo {
code := strings.ToLower(strings.TrimSpace(clients.ErrorCode(err)))
message := ""
if err != nil {
message = err.Error()
}
status := clients.ErrorResponseMetadata(err).StatusCode
category := failureCategory(code, status, message)
target := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%s %s %d %s", code, category, status, message)))
return failureInfo{
Code: code,
Message: message,
Status: status,
Category: category,
Target: target,
}
}
func failureCategory(code string, status int, message string) string {
target := strings.ToLower(code + " " + message)
switch {
case code == "insufficient_balance":
return "insufficient_balance"
case code == "rate_limit" || status == 429:
return "rate_limit"
case code == "network":
return "network"
case code == "timeout" || status == 408 || strings.Contains(target, "timeout"):
return "timeout"
case code == "stream_read_error":
return "stream_error"
case code == "overloaded" || strings.Contains(target, "overloaded"):
return "provider_overloaded"
case status >= 500 || code == "server_error":
return "provider_5xx"
case code == "permission_denied":
return "user_permission"
case code == "auth_failed" || code == "invalid_api_key" || code == "missing_credentials" || status == 401 || status == 403 || providerAuthMessage(target):
return "auth_error"
case strings.Contains(code, "unsupported"):
return "unsupported_model"
case status == 400 || code == "bad_request" || code == "invalid_request" || code == "invalid_parameter" || code == "missing_required":
return "request_error"
case status > 400 && status < 500:
return "request_error"
default:
return "client_error"
}
}
func providerAuthMessage(target string) bool {
return strings.Contains(target, "api key") ||
strings.Contains(target, "apikey") ||
strings.Contains(target, "unauthorized") ||
strings.Contains(target, "authentication") ||
strings.Contains(target, "auth failed") ||
strings.Contains(target, "credential")
}
func hardStopPolicyMatches(policy map[string]any, info failureInfo) bool {
_, ok := hardStopPolicyMatch(policy, info)
return ok
}
func hardStopPolicyMatch(policy map[string]any, info failureInfo) (policyRuleMatch, bool) {
if !boolFromPolicy(policy, "enabled", true) {
return policyRuleMatch{}, false
}
return firstPolicyMatch(policy, info, "gateway_runner_policies.hard_stop_policy", "hardStopPolicy", []policyMatchSpec{
{Key: "categories", Value: info.Category, Kind: "string"},
{Key: "codes", Value: info.Code, Kind: "string"},
{Key: "statusCodes", IntValue: info.Status, Kind: "int"},
{Key: "keywords", Value: info.Target, Kind: "keyword"},
})
}
func retryPolicyDenyMatches(policy map[string]any, info failureInfo) bool {
_, ok := retryPolicyDenyMatch(policy, info, "", "")
return ok
}
func retryPolicyDenyMatch(policy map[string]any, info failureInfo, source string, policyName string) (policyRuleMatch, bool) {
return firstPolicyMatch(policy, info, firstNonEmptyString(source, "effective_retry_policy"), firstNonEmptyString(policyName, "retryPolicy"), []policyMatchSpec{
{Key: "denyCategories", Value: info.Category, Kind: "string"},
{Key: "denyCodes", Value: info.Code, Kind: "string"},
{Key: "denyStatusCodes", IntValue: info.Status, Kind: "int"},
{Key: "denyKeywords", Value: info.Target, Kind: "keyword"},
})
}
func retryPolicyAllowMatches(policy map[string]any, info failureInfo) bool {
_, ok := retryPolicyAllowMatch(policy, info, "", "")
return ok
}
func retryPolicyAllowMatch(policy map[string]any, info failureInfo, source string, policyName string) (policyRuleMatch, bool) {
return firstPolicyMatch(policy, info, firstNonEmptyString(source, "effective_retry_policy"), firstNonEmptyString(policyName, "retryPolicy"), []policyMatchSpec{
{Key: "allowCategories", Value: info.Category, Kind: "string"},
{Key: "allowCodes", Value: info.Code, Kind: "string"},
{Key: "allowStatusCodes", IntValue: info.Status, Kind: "int"},
{Key: "allowKeywords", Value: info.Target, Kind: "keyword"},
})
}
func failoverDenyMatches(policy map[string]any, info failureInfo) bool {
_, ok := failoverDenyMatch(policy, info)
return ok
}
func failoverDenyMatch(policy map[string]any, info failureInfo) (policyRuleMatch, bool) {
return retryPolicyDenyMatch(policy, info, "gateway_runner_policies.failover_policy", "failoverPolicy")
}
func failoverDenyMatchWithSources(base map[string]any, override map[string]any, info failureInfo) (policyRuleMatch, bool) {
return retryPolicyMatchWithSources(base, override, "gateway_runner_policies.failover_policy", "runtime_policy_override.failoverPolicy", "failoverPolicy", []policyMatchSpec{
{Key: "denyCategories", Value: info.Category, Kind: "string"},
{Key: "denyCodes", Value: info.Code, Kind: "string"},
{Key: "denyStatusCodes", IntValue: info.Status, Kind: "int"},
{Key: "denyKeywords", Value: info.Target, Kind: "keyword"},
})
}
func failoverAllowMatches(policy map[string]any, info failureInfo) bool {
_, ok := failoverAllowMatch(policy, info)
return ok
}
func failoverAllowMatch(policy map[string]any, info failureInfo) (policyRuleMatch, bool) {
return retryPolicyAllowMatch(policy, info, "gateway_runner_policies.failover_policy", "failoverPolicy")
}
func failoverAllowMatchWithSources(base map[string]any, override map[string]any, info failureInfo) (policyRuleMatch, bool) {
return retryPolicyMatchWithSources(base, override, "gateway_runner_policies.failover_policy", "runtime_policy_override.failoverPolicy", "failoverPolicy", []policyMatchSpec{
{Key: "allowCategories", Value: info.Category, Kind: "string"},
{Key: "allowCodes", Value: info.Code, Kind: "string"},
{Key: "allowStatusCodes", IntValue: info.Status, Kind: "int"},
{Key: "allowKeywords", Value: info.Target, Kind: "keyword"},
})
}
func priorityDemotePolicyMatch(policy map[string]any, info failureInfo) (policyRuleMatch, bool) {
return firstPolicyMatch(policy, info, "gateway_runner_policies.priority_demote_policy", "priorityDemotePolicy", []policyMatchSpec{
{Key: "categories", Value: info.Category, Kind: "string"},
{Key: "codes", Value: info.Code, Kind: "string"},
{Key: "statusCodes", IntValue: info.Status, Kind: "int"},
{Key: "keywords", Value: info.Target, Kind: "keyword"},
})
}
func failoverAction(policy map[string]any, info failureInfo) string {
actions, _ := policy["actions"].(map[string]any)
if action := stringFromAny(actions[info.Category]); action != "" {
return action
}
return "next"
}
func boolFromPolicy(policy map[string]any, key string, fallback bool) bool {
value, ok := policy[key].(bool)
if !ok {
return fallback
}
return value
}
type policyMatchSpec struct {
Key string
Kind string
Value string
IntValue int
}
func firstPolicyMatch(policy map[string]any, info failureInfo, source string, policyName string, specs []policyMatchSpec) (policyRuleMatch, bool) {
for _, spec := range specs {
switch spec.Kind {
case "string":
if value, ok := matchingStringListValue(policy, spec.Key, spec.Value); ok {
return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: value}, true
}
case "int":
if value, ok := matchingIntListValue(policy, spec.Key, spec.IntValue); ok {
return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: fmt.Sprintf("%d", value)}, true
}
case "keyword":
if value, ok := matchingKeywordValue(policy, spec.Key, spec.Value); ok {
return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: value}, true
}
}
}
return policyRuleMatch{}, false
}
func retryPolicyMatchWithSources(base map[string]any, override map[string]any, baseSource string, overrideSource string, policyName string, specs []policyMatchSpec) (policyRuleMatch, bool) {
for _, spec := range specs {
if _, ok := override[spec.Key]; ok {
if match, matched := policyMatchSpecValue(override, spec, overrideSource, policyName); matched {
return match, true
}
continue
}
if match, matched := policyMatchSpecValue(base, spec, baseSource, policyName); matched {
return match, true
}
}
return policyRuleMatch{}, false
}
func policyMatchSpecValue(policy map[string]any, spec policyMatchSpec, source string, policyName string) (policyRuleMatch, bool) {
switch spec.Kind {
case "string":
if value, ok := matchingStringListValue(policy, spec.Key, spec.Value); ok {
return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: value}, true
}
case "int":
if value, ok := matchingIntListValue(policy, spec.Key, spec.IntValue); ok {
return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: fmt.Sprintf("%d", value)}, true
}
case "keyword":
if value, ok := matchingKeywordValue(policy, spec.Key, spec.Value); ok {
return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: value}, true
}
}
return policyRuleMatch{}, false
}
func stringListContains(policy map[string]any, key string, value string) bool {
_, ok := matchingStringListValue(policy, key, value)
return ok
}
func matchingStringListValue(policy map[string]any, key string, value string) (string, bool) {
value = strings.ToLower(strings.TrimSpace(value))
if value == "" {
return "", false
}
for _, item := range stringListFromPolicy(policy, key) {
item = strings.TrimSpace(item)
if strings.ToLower(item) == value {
return item, true
}
}
return "", false
}
func keywordListMatches(policy map[string]any, key string, target string) bool {
_, ok := matchingKeywordValue(policy, key, target)
return ok
}
func matchingKeywordValue(policy map[string]any, key string, target string) (string, bool) {
target = strings.ToLower(strings.TrimSpace(target))
if target == "" {
return "", false
}
for _, keyword := range stringListFromPolicy(policy, key) {
keyword = strings.TrimSpace(keyword)
if keyword != "" && strings.Contains(target, strings.ToLower(keyword)) {
return keyword, true
}
}
return "", false
}
func intListContains(policy map[string]any, key string, value int) bool {
_, ok := matchingIntListValue(policy, key, value)
return ok
}
func matchingIntListValue(policy map[string]any, key string, value int) (int, bool) {
if value == 0 {
return 0, false
}
for _, item := range intListFromPolicy(policy, key) {
if item == value {
return item, true
}
}
return 0, false
}
func intListFromPolicy(policy map[string]any, key string) []int {
raw, ok := policy[key].([]any)
if !ok {
if typed, ok := policy[key].([]int); ok {
return typed
}
return nil
}
out := make([]int, 0, len(raw))
for _, item := range raw {
switch typed := item.(type) {
case int:
out = append(out, typed)
case float64:
out = append(out, int(typed))
}
}
return out
}

View File

@ -0,0 +1,152 @@
package runner
import (
"testing"
"time"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/clients"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/store"
)
func TestShouldRetrySameClientUsesRuntimeRetryPolicyKeywords(t *testing.T) {
candidate := store.RuntimeModelCandidate{
ModelRetryPolicy: map[string]any{
"enabled": true,
"allowKeywords": []any{"temporary vendor blip"},
"denyKeywords": []any{"bad request"},
},
}
if shouldRetrySameClient(candidate, &clients.ClientError{Code: "bad_request", Message: "bad request timeout", Retryable: true}) {
t.Fatal("deny keywords should block same-client retry even when the client marks the error retryable")
}
if !shouldRetrySameClient(candidate, &clients.ClientError{Code: "custom_error", Message: "temporary vendor blip", Retryable: false}) {
t.Fatal("allow keywords should allow same-client retry when the client does not mark the error retryable")
}
}
func TestFailoverBudgetDefaults(t *testing.T) {
candidates := make([]store.RuntimeModelCandidate, 150)
runnerPolicy := store.RunnerPolicy{Status: "active", FailoverPolicy: map[string]any{"enabled": true}}
if got := maxPlatformsForCandidates(candidates, runnerPolicy); got != 99 {
t.Fatalf("default max platform budget should be 99, got %d", got)
}
if got := maxFailoverDurationForCandidates(candidates, runnerPolicy); got != 10*time.Minute {
t.Fatalf("default max failover duration should be 10 minutes, got %s", got)
}
}
func TestFailoverTimeBudgetExceeded(t *testing.T) {
if !failoverTimeBudgetExceeded(time.Now().Add(-601*time.Second), 10*time.Minute) {
t.Fatal("failover time budget should stop retries after the configured duration")
}
if failoverTimeBudgetExceeded(time.Now().Add(-590*time.Second), 10*time.Minute) {
t.Fatal("failover time budget should allow retries before the configured duration")
}
}
func TestFailoverHardStopBeatsModelOverride(t *testing.T) {
runnerPolicy := store.RunnerPolicy{
Status: "active",
FailoverPolicy: map[string]any{
"enabled": true,
"allowCategories": []any{"request_error"},
},
HardStopPolicy: map[string]any{
"enabled": true,
"categories": []any{"request_error"},
},
}
candidate := store.RuntimeModelCandidate{
RuntimePolicyOverride: map[string]any{
"failoverPolicy": map[string]any{
"enabled": true,
"allowCategories": []any{"request_error"},
},
},
}
decision := failoverDecisionForCandidate(runnerPolicy, candidate, &clients.ClientError{Code: "bad_request", StatusCode: 400, Retryable: true})
if decision.Retry || decision.Reason != "hard_stop_policy" {
t.Fatalf("hard stop should block model-level failover override, got %+v", decision)
}
}
func TestFailoverPolicyAllowsModelOverride(t *testing.T) {
runnerPolicy := store.RunnerPolicy{
Status: "active",
FailoverPolicy: map[string]any{"enabled": true},
HardStopPolicy: map[string]any{"enabled": true, "categories": []any{"request_error"}},
}
candidate := store.RuntimeModelCandidate{
RuntimePolicyOverride: map[string]any{
"failoverPolicy": map[string]any{
"allowKeywords": []any{"temporary upstream outage"},
},
},
}
decision := failoverDecisionForCandidate(runnerPolicy, candidate, &clients.ClientError{Code: "custom_error", Message: "temporary upstream outage", Retryable: false})
if !decision.Retry || decision.Reason != "failover_allow_policy" {
t.Fatalf("model failoverPolicy override should allow cross-platform failover, got %+v", decision)
}
}
func TestProviderAuthErrorsFailOverInsteadOfHardStop(t *testing.T) {
runnerPolicy := store.RunnerPolicy{
Status: "active",
FailoverPolicy: map[string]any{
"enabled": true,
"allowCategories": []any{"auth_error"},
"allowCodes": []any{"auth_failed", "invalid_api_key", "missing_credentials"},
"allowStatusCodes": []any{
401,
403,
},
"actions": map[string]any{"auth_error": "disable_and_next"},
},
HardStopPolicy: map[string]any{
"enabled": true,
"categories": []any{"request_error", "unsupported_model", "user_permission", "insufficient_balance"},
"codes": []any{"bad_request", "invalid_request", "invalid_parameter", "missing_required", "permission_denied"},
"statusCodes": []any{},
"keywords": []any{"invalid_parameter", "missing required", "bad request", "insufficient balance"},
},
}
decision := failoverDecisionForCandidate(runnerPolicy, store.RuntimeModelCandidate{}, &clients.ClientError{Code: "auth_failed", StatusCode: 401, Retryable: false})
if !decision.Retry || decision.Action != "disable_and_next" || decision.Reason != "failover_allow_policy" {
t.Fatalf("provider auth failures should switch platform, got %+v", decision)
}
decision = failoverDecisionForCandidate(runnerPolicy, store.RuntimeModelCandidate{}, &clients.ClientError{Code: "http_400", Message: "invalid api key", StatusCode: 400, Retryable: false})
if !decision.Retry || decision.Info.Category != "auth_error" {
t.Fatalf("provider auth-looking 400 should switch platform, got %+v", decision)
}
}
func TestPriorityDemotePolicyIsKeywordGatedAndHardStopSafe(t *testing.T) {
runnerPolicy := store.RunnerPolicy{
Status: "active",
HardStopPolicy: map[string]any{
"enabled": true,
"categories": []any{"request_error"},
},
PriorityDemotePolicy: map[string]any{
"enabled": true,
"demoteStep": 25,
"keywords": []any{"rate_limit"},
},
}
shouldDemote, step := shouldDemoteCandidatePriority(runnerPolicy, &clients.ClientError{Code: "rate_limit", Message: "rate_limit from upstream", Retryable: true})
if !shouldDemote || step != 25 {
t.Fatalf("priority demotion should be enabled only by matched policy, got shouldDemote=%v step=%d", shouldDemote, step)
}
shouldDemote, _ = shouldDemoteCandidatePriority(runnerPolicy, &clients.ClientError{Code: "bad_request", StatusCode: 400, Retryable: true})
if shouldDemote {
t.Fatal("priority demotion should not run for hard-stop request errors")
}
}

View File

@ -40,6 +40,46 @@ func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID stri
}
}
func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, decision failoverDecision, simulated bool) {
switch decision.Action {
case "disable_and_next":
if err := s.store.DisableCandidatePlatform(ctx, candidate.PlatformID); err == nil {
_ = s.emit(ctx, taskID, "task.policy.failover_disabled", "running", "failover", 0.51, "candidate platform disabled by runner failover policy", addPolicyTracePayload(map[string]any{
"platformId": candidate.PlatformID,
"platformModelId": candidate.PlatformModelID,
"action": decision.Action,
"reason": decision.Reason,
}, decision.Match, decision.Info), simulated)
}
case "cooldown_and_next":
if err := s.store.CooldownCandidatePlatform(ctx, candidate.PlatformID, decision.CooldownSeconds); err == nil {
_ = s.emit(ctx, taskID, "task.policy.failover_cooled_down", "running", "failover", 0.51, "candidate platform cooled down by runner failover policy", addPolicyTracePayload(map[string]any{
"platformId": candidate.PlatformID,
"platformModelId": candidate.PlatformModelID,
"cooldownSeconds": decision.CooldownSeconds,
"action": decision.Action,
"reason": decision.Reason,
}, decision.Match, decision.Info), simulated)
}
}
}
func (s *Service) applyPriorityDemotePolicy(ctx context.Context, taskID string, attemptNo int, runnerPolicy store.RunnerPolicy, candidate store.RuntimeModelCandidate, cause error, simulated bool) {
decision := priorityDemoteDecisionForCandidate(runnerPolicy, cause)
if !decision.Demote {
return
}
if err := s.store.DemoteCandidatePlatformPriority(ctx, candidate.PlatformID, decision.Step); err == nil {
s.recordAttemptTrace(ctx, taskID, attemptNo, priorityDemoteTraceEntry(decision, candidate.PlatformID, candidate.PlatformModelID))
_ = s.emit(ctx, taskID, "task.policy.priority_demoted", "running", "priority_demote", 0.52, "candidate platform priority demoted by runner policy", addPolicyTracePayload(map[string]any{
"platformId": candidate.PlatformID,
"platformModelId": candidate.PlatformModelID,
"demoteStep": decision.Step,
"code": clients.ErrorCode(cause),
}, decision.Match, decision.Info), simulated)
}
}
func effectiveRuntimePolicy(base map[string]any, override map[string]any, key string) map[string]any {
policy := base
if nested, ok := override[key].(map[string]any); ok && len(nested) > 0 {

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"log/slog"
"strconv"
"strings"
"time"
@ -51,6 +52,7 @@ func (s *Service) ExecuteStream(ctx context.Context, task store.GatewayTask, use
}
func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *auth.User, onDelta clients.StreamDelta) (Result, error) {
executeStartedAt := time.Now()
body := normalizeRequest(task.Kind, task.Request)
modelType := modelTypeFromKind(task.Kind, body)
if err := validateRequest(task.Kind, body); err != nil {
@ -88,64 +90,159 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
return Result{}, err
}
maxAttempts := maxAttemptsForCandidates(candidates)
runnerPolicy, err := s.store.GetActiveRunnerPolicy(ctx)
if err != nil {
return Result{}, err
}
maxPlatforms := maxPlatformsForCandidates(candidates, runnerPolicy)
maxFailoverDuration := maxFailoverDurationForCandidates(candidates, runnerPolicy)
attemptNo := 0
var lastErr error
for index, candidate := range candidates {
if index >= maxAttempts {
if index >= maxPlatforms {
break
}
attemptNo := index + 1
response, err := s.runCandidate(ctx, task, user, body, candidate, attemptNo, onDelta)
if err == nil {
billings := s.billings(ctx, user, task.Kind, body, candidate, response, isSimulation(task, candidate))
record := buildSuccessRecord(task, user, body, candidate, response, billings, isSimulation(task, candidate))
record.Metrics = s.withAttemptHistory(ctx, task.ID, record.Metrics)
finished, finishErr := s.store.FinishTaskSuccess(ctx, store.FinishTaskSuccessInput{
TaskID: task.ID,
Result: response.Result,
Billings: billings,
RequestID: record.RequestID,
ResolvedModel: record.ResolvedModel,
Usage: record.Usage,
Metrics: record.Metrics,
BillingSummary: record.BillingSummary,
FinalChargeAmount: record.FinalChargeAmount,
ResponseStartedAt: record.ResponseStartedAt,
ResponseFinishedAt: record.ResponseFinishedAt,
ResponseDurationMS: record.ResponseDurationMS,
})
if finishErr != nil {
return Result{}, finishErr
}
if settleErr := s.store.SettleTaskBilling(ctx, finished); settleErr != nil {
return Result{}, settleErr
}
if finished.FinalChargeAmount > 0 {
if err := s.emit(ctx, task.ID, "task.billing.settled", "succeeded", "billing", 0.98, "task billing settled", map[string]any{
"amount": finished.FinalChargeAmount,
"currency": stringFromAny(record.BillingSummary["currency"]),
clientAttempts := clientAttemptsForCandidate(candidate)
var candidateErr error
for clientAttempt := 1; clientAttempt <= clientAttempts; clientAttempt++ {
attemptNo++
response, err := s.runCandidate(ctx, task, user, body, candidate, attemptNo, onDelta)
if err == nil {
billings := s.billings(ctx, user, task.Kind, body, candidate, response, isSimulation(task, candidate))
record := buildSuccessRecord(task, user, body, candidate, response, billings, isSimulation(task, candidate))
record.Metrics = s.withAttemptHistory(ctx, task.ID, record.Metrics)
finished, finishErr := s.store.FinishTaskSuccess(ctx, store.FinishTaskSuccessInput{
TaskID: task.ID,
Result: response.Result,
Billings: billings,
RequestID: record.RequestID,
ResolvedModel: record.ResolvedModel,
Usage: record.Usage,
Metrics: record.Metrics,
BillingSummary: record.BillingSummary,
FinalChargeAmount: record.FinalChargeAmount,
ResponseStartedAt: record.ResponseStartedAt,
ResponseFinishedAt: record.ResponseFinishedAt,
ResponseDurationMS: record.ResponseDurationMS,
})
if finishErr != nil {
return Result{}, finishErr
}
if settleErr := s.store.SettleTaskBilling(ctx, finished); settleErr != nil {
return Result{}, settleErr
}
if finished.FinalChargeAmount > 0 {
if err := s.emit(ctx, task.ID, "task.billing.settled", "succeeded", "billing", 0.98, "task billing settled", map[string]any{
"amount": finished.FinalChargeAmount,
"currency": stringFromAny(record.BillingSummary["currency"]),
}, isSimulation(task, candidate)); err != nil {
return Result{}, err
}
}
if err := s.emit(ctx, task.ID, "task.completed", "succeeded", "completed", 1, "task completed", map[string]any{
"result": response.Result,
"billings": billings,
"usage": record.Usage,
"metrics": record.Metrics,
"billingSummary": record.BillingSummary,
"requestId": record.RequestID,
}, isSimulation(task, candidate)); err != nil {
return Result{}, err
}
return Result{Task: finished, Output: response.Result}, nil
}
if err := s.emit(ctx, task.ID, "task.completed", "succeeded", "completed", 1, "task completed", map[string]any{
"result": response.Result,
"billings": billings,
"usage": record.Usage,
"metrics": record.Metrics,
"billingSummary": record.BillingSummary,
"requestId": record.RequestID,
lastErr = err
candidateErr = err
retryDecision := retryDecisionForCandidate(candidate, err)
retryAction := "retry"
if !retryDecision.Retry {
retryAction = "stop"
}
if clientAttempt >= clientAttempts {
retryDecision.Retry = false
retryDecision.Reason = "same_client_max_attempts"
retryDecision.Match = policyRuleMatch{
Source: "model_runtime_policy_sets.retry_policy",
Policy: "retryPolicy",
Rule: "maxAttempts",
Value: strconv.Itoa(clientAttempts),
}
retryDecision.Info = failureInfoFromError(err)
retryAction = "stop"
}
if failoverTimeBudgetExceeded(executeStartedAt, maxFailoverDuration) {
retryDecision.Retry = false
retryDecision.Reason = "failover_time_budget_exceeded"
retryDecision.Match = policyRuleMatch{
Source: "gateway_runner_policies.failover_policy",
Policy: "failoverPolicy",
Rule: "maxDurationSeconds",
Value: strconv.Itoa(int(maxFailoverDuration.Seconds())),
}
retryDecision.Info = failureInfoFromError(err)
retryAction = "stop"
}
s.recordAttemptTrace(ctx, task.ID, attemptNo, retryTraceEntry(retryDecision, retryAction, clientAttempt, clientAttempts))
if !retryDecision.Retry {
break
}
if err := s.emit(ctx, task.ID, "task.retrying", "running", "retry", 0.45, "retrying same client", addPolicyTracePayload(map[string]any{
"attempt": attemptNo,
"clientAttempt": clientAttempt,
"clientId": candidate.ClientID,
"error": err.Error(),
"reason": retryDecision.Reason,
"scope": "same_client",
}, retryDecision.Match, retryDecision.Info), isSimulation(task, candidate)); err != nil {
return Result{}, err
}
}
if candidateErr == nil || index+1 >= len(candidates) || index+1 >= maxPlatforms {
if candidateErr != nil {
s.applyPriorityDemotePolicy(ctx, task.ID, attemptNo, runnerPolicy, candidate, candidateErr, isSimulation(task, candidate))
decision := failoverDecisionForCandidate(runnerPolicy, candidate, candidateErr)
if decision.Retry {
decision.Retry = false
decision.Action = "stop"
decision.Reason = "no_next_platform"
decision.Match = policyRuleMatch{Source: "runner_candidates", Policy: "candidateSelection", Rule: "candidateCount", Value: strconv.Itoa(len(candidates))}
if index+1 >= maxPlatforms {
decision.Reason = "max_platforms_reached"
decision.Match = policyRuleMatch{Source: "gateway_runner_policies.failover_policy", Policy: "failoverPolicy", Rule: "maxPlatforms", Value: strconv.Itoa(maxPlatforms)}
}
}
s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision))
}
break
}
s.applyPriorityDemotePolicy(ctx, task.ID, attemptNo, runnerPolicy, candidate, candidateErr, isSimulation(task, candidate))
if failoverTimeBudgetExceeded(executeStartedAt, maxFailoverDuration) {
elapsedSeconds := int(time.Since(executeStartedAt).Seconds())
maxDurationSeconds := int(maxFailoverDuration.Seconds())
s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTimeBudgetTraceEntry(elapsedSeconds, maxDurationSeconds, failureInfoFromError(candidateErr)))
if err := s.emit(ctx, task.ID, "task.failover.stopped", "running", "retry", 0.55, "failover time budget exceeded", map[string]any{
"elapsedSeconds": elapsedSeconds,
"maxDurationSeconds": maxDurationSeconds,
"scope": "next_platform",
"statusCode": clients.ErrorResponseMetadata(candidateErr).StatusCode,
}, isSimulation(task, candidate)); err != nil {
return Result{}, err
}
return Result{Task: finished, Output: response.Result}, nil
}
lastErr = err
retryable := clients.IsRetryable(err)
if !retryable || !retryEnabled(candidate) || attemptNo >= maxAttempts {
break
}
if err := s.emit(ctx, task.ID, "task.retrying", "running", "retry", 0.55, "retrying next client", map[string]any{"attempt": attemptNo, "error": err.Error()}, isSimulation(task, candidate)); err != nil {
decision := failoverDecisionForCandidate(runnerPolicy, candidate, candidateErr)
s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision))
if !decision.Retry {
break
}
s.applyFailoverAction(ctx, task.ID, candidate, decision, isSimulation(task, candidate))
if err := s.emit(ctx, task.ID, "task.retrying", "running", "retry", 0.55, "retrying next client", addPolicyTracePayload(map[string]any{
"attempt": attemptNo,
"action": decision.Action,
"error": candidateErr.Error(),
"reason": decision.Reason,
"scope": "next_platform",
}, decision.Match, decision.Info), isSimulation(task, candidate)); err != nil {
return Result{}, err
}
}
@ -184,15 +281,16 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
reservations := s.rateLimitReservations(ctx, user, candidate, body)
limitResult, err := s.store.ReserveRateLimits(ctx, task.ID, reservations)
if err != nil {
clientErr := &clients.ClientError{Code: "rate_limit", Message: err.Error(), Retryable: false}
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
AttemptID: attemptID,
Status: "failed",
Retryable: false,
Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false}),
Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false, "trace": []any{failureTraceEntry(clientErr, false)}}),
ErrorCode: "rate_limit",
ErrorMessage: err.Error(),
})
return clients.Response{}, &clients.ClientError{Code: "rate_limit", Message: err.Error(), Retryable: false}
return clients.Response{}, clientErr
}
defer s.store.ReleaseConcurrencyLeases(context.WithoutCancel(ctx), limitResult.LeaseIDs)
@ -207,7 +305,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
AttemptID: attemptID,
Status: "failed",
Retryable: false,
Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false}),
Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false, "trace": []any{failureTraceEntry(err, false)}}),
ErrorCode: clients.ErrorCode(err),
ErrorMessage: err.Error(),
})
@ -266,7 +364,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
ErrorCode: clients.ErrorCode(err),
ErrorMessage: err.Error(),
})
_ = s.emit(ctx, task.ID, "task.attempt.failed", "running", "attempt_failed", 0.45, err.Error(), map[string]any{"attempt": attemptNo, "retryable": retryable, "requestId": requestID, "metrics": metrics}, simulated)
_ = s.emit(ctx, task.ID, "task.attempt.failed", "running", "attempt_failed", 0.45, err.Error(), map[string]any{"attempt": attemptNo, "retryable": retryable, "requestId": requestID, "statusCode": clients.ErrorResponseMetadata(err).StatusCode, "metrics": metrics}, simulated)
s.applyCandidateFailurePolicies(ctx, task.ID, candidate, err, simulated)
return clients.Response{}, err
}
@ -275,6 +373,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
metrics := mergeMetrics(taskMetrics(task, user, body, candidate, response, simulated), map[string]any{
"error": err.Error(),
"retryable": clients.IsRetryable(err),
"trace": []any{failureTraceEntry(err, clients.IsRetryable(err))},
})
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
AttemptID: attemptID,
@ -429,20 +528,54 @@ func retryEnabled(candidate store.RuntimeModelCandidate) bool {
return true
}
func maxAttemptsForCandidates(candidates []store.RuntimeModelCandidate) int {
func clientAttemptsForCandidate(candidate store.RuntimeModelCandidate) int {
if !retryEnabled(candidate) {
return 1
}
if value := intFromPolicy(effectiveRetryPolicy(candidate), "maxAttempts"); value > 0 {
return value
}
return 1
}
func maxPlatformsForCandidates(candidates []store.RuntimeModelCandidate, runnerPolicy store.RunnerPolicy) int {
if len(candidates) == 0 {
return 0
}
maxAttempts := len(candidates)
maxPlatforms := len(candidates)
if value := intFromPolicy(runnerPolicy.FailoverPolicy, "maxPlatforms"); value > 0 {
if value < maxPlatforms {
maxPlatforms = value
}
} else if maxPlatforms > 99 {
maxPlatforms = 99
}
for _, candidate := range candidates {
if value := intFromPolicy(effectiveRetryPolicy(candidate), "maxAttempts"); value > 0 && value < maxAttempts {
maxAttempts = value
if value := intFromPolicy(effectiveFailoverPolicy(runnerPolicy.FailoverPolicy, candidate.RuntimePolicyOverride), "maxPlatforms"); value > 0 && value < maxPlatforms {
maxPlatforms = value
}
}
if maxAttempts <= 0 {
if maxPlatforms <= 0 {
return 1
}
return maxAttempts
return maxPlatforms
}
func maxFailoverDurationForCandidates(candidates []store.RuntimeModelCandidate, runnerPolicy store.RunnerPolicy) time.Duration {
seconds := intFromPolicy(runnerPolicy.FailoverPolicy, "maxDurationSeconds")
if seconds <= 0 {
seconds = 600
}
for _, candidate := range candidates {
if value := intFromPolicy(effectiveFailoverPolicy(runnerPolicy.FailoverPolicy, candidate.RuntimePolicyOverride), "maxDurationSeconds"); value > 0 && value < seconds {
seconds = value
}
}
return time.Duration(seconds) * time.Second
}
func failoverTimeBudgetExceeded(start time.Time, maxDuration time.Duration) bool {
return maxDuration > 0 && time.Since(start) >= maxDuration
}
func normalizeRequest(kind string, body map[string]any) map[string]any {

View File

@ -0,0 +1,128 @@
package runner
import (
"context"
"fmt"
"time"
)
func failureTraceEntry(err error, retryable bool) map[string]any {
info := failureInfoFromError(err)
entry := policyTraceEntry("failure", "client", "failed", "client_call_failed", policyRuleMatch{}, info)
entry["retryable"] = retryable
return entry
}
func retryTraceEntry(decision retryDecision, action string, clientAttempt int, maxAttempts int) map[string]any {
entry := policyTraceEntry("same_client_retry", "same_client", action, decision.Reason, decision.Match, decision.Info)
entry["retry"] = decision.Retry
entry["clientAttempt"] = clientAttempt
entry["maxAttempts"] = maxAttempts
return entry
}
func failoverTraceEntry(decision failoverDecision) map[string]any {
event := "failover_next"
if !decision.Retry {
event = "failover_stop"
}
entry := policyTraceEntry(event, "next_platform", decision.Action, decision.Reason, decision.Match, decision.Info)
entry["retry"] = decision.Retry
if decision.CooldownSeconds > 0 {
entry["cooldownSeconds"] = decision.CooldownSeconds
}
return entry
}
func priorityDemoteTraceEntry(decision priorityDemoteDecision, platformID string, platformModelID string) map[string]any {
entry := policyTraceEntry("priority_demoted", "priority_demote", "demote", decision.Reason, decision.Match, decision.Info)
entry["demote"] = decision.Demote
entry["demoteStep"] = decision.Step
entry["platformId"] = platformID
entry["platformModelId"] = platformModelID
return entry
}
func failoverTimeBudgetTraceEntry(elapsedSeconds int, maxDurationSeconds int, info failureInfo) map[string]any {
entry := policyTraceEntry("failover_stop", "next_platform", "stop", "failover_time_budget_exceeded", policyRuleMatch{
Source: "gateway_runner_policies.failover_policy",
Policy: "failoverPolicy",
Rule: "maxDurationSeconds",
Value: fmt.Sprintf("%d", maxDurationSeconds),
}, info)
entry["elapsedSeconds"] = elapsedSeconds
entry["maxDurationSeconds"] = maxDurationSeconds
return entry
}
func policyTraceEntry(event string, scope string, action string, reason string, match policyRuleMatch, info failureInfo) map[string]any {
entry := map[string]any{
"event": event,
"scope": scope,
"action": action,
"reason": reason,
"createdAt": time.Now().Format(time.RFC3339Nano),
}
if info.Code != "" {
entry["errorCode"] = info.Code
}
if info.Message != "" {
entry["message"] = info.Message
}
if info.Status > 0 {
entry["statusCode"] = info.Status
}
if info.Category != "" {
entry["category"] = info.Category
}
if match.Source != "" {
entry["policySource"] = match.Source
}
if match.Policy != "" {
entry["policy"] = match.Policy
}
if match.Rule != "" {
entry["policyRule"] = match.Rule
}
if match.Value != "" {
entry["matchedValue"] = match.Value
}
return entry
}
func addPolicyTracePayload(payload map[string]any, match policyRuleMatch, info failureInfo) map[string]any {
if payload == nil {
payload = map[string]any{}
}
if info.Code != "" {
payload["errorCode"] = info.Code
}
if info.Status > 0 {
payload["statusCode"] = info.Status
}
if info.Category != "" {
payload["category"] = info.Category
}
if match.Source != "" {
payload["policySource"] = match.Source
}
if match.Policy != "" {
payload["policy"] = match.Policy
}
if match.Rule != "" {
payload["policyRule"] = match.Rule
}
if match.Value != "" {
payload["matchedValue"] = match.Value
}
return payload
}
func (s *Service) recordAttemptTrace(ctx context.Context, taskID string, attemptNo int, entry map[string]any) {
if attemptNo <= 0 || len(entry) == 0 {
return
}
if err := s.store.AppendTaskAttemptTrace(ctx, taskID, attemptNo, entry); err != nil {
s.logger.Warn("append task attempt trace failed", "taskID", taskID, "attempt", attemptNo, "error", err)
}
}

View File

@ -229,6 +229,20 @@ type RuntimePolicySet struct {
UpdatedAt time.Time `json:"updatedAt"`
}
type RunnerPolicy struct {
ID string `json:"id"`
PolicyKey string `json:"policyKey"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
FailoverPolicy map[string]any `json:"failoverPolicy,omitempty"`
HardStopPolicy map[string]any `json:"hardStopPolicy,omitempty"`
PriorityDemotePolicy map[string]any `json:"priorityDemotePolicy,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Status string `json:"status"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type PricingRule struct {
ID string `json:"id"`
RuleSetID string `json:"ruleSetId,omitempty"`
@ -446,6 +460,7 @@ type TaskAttempt struct {
Retryable bool `json:"retryable"`
Simulated bool `json:"simulated"`
RequestID string `json:"requestId,omitempty"`
StatusCode int `json:"statusCode,omitempty"`
Usage map[string]any `json:"usage,omitempty"`
Metrics map[string]any `json:"metrics,omitempty"`
RequestSnapshot map[string]any `json:"requestSnapshot,omitempty"`
@ -1796,6 +1811,11 @@ func IsUniqueViolation(err error) bool {
return isUniqueViolation(err)
}
func IsUndefinedDatabaseObject(err error) bool {
var pgErr *pgconn.PgError
return errors.As(err, &pgErr) && (pgErr.Code == "42P01" || pgErr.Code == "42703")
}
func isUniqueViolation(err error) bool {
var pgErr *pgconn.PgError
return errors.As(err, &pgErr) && pgErr.Code == "23505"

View File

@ -0,0 +1,168 @@
package store
import (
"context"
"encoding/json"
"strings"
"time"
"github.com/jackc/pgx/v5"
)
const runnerPolicyColumns = `
id::text, policy_key, name, COALESCE(description, ''), failover_policy,
hard_stop_policy, priority_demote_policy, metadata, status, created_at, updated_at`
type RunnerPolicyInput struct {
PolicyKey string `json:"policyKey"`
Name string `json:"name"`
Description string `json:"description"`
FailoverPolicy map[string]any `json:"failoverPolicy"`
HardStopPolicy map[string]any `json:"hardStopPolicy"`
PriorityDemotePolicy map[string]any `json:"priorityDemotePolicy"`
Metadata map[string]any `json:"metadata"`
Status string `json:"status"`
}
type runnerPolicyScanner interface {
Scan(dest ...any) error
}
func (s *Store) GetActiveRunnerPolicy(ctx context.Context) (RunnerPolicy, error) {
item, err := scanRunnerPolicy(s.pool.QueryRow(ctx, `
SELECT `+runnerPolicyColumns+`
FROM gateway_runner_policies
ORDER BY CASE WHEN policy_key = 'default-runner-v1' THEN 0 ELSE 1 END,
CASE WHEN status = 'active' THEN 0 ELSE 1 END,
updated_at DESC
LIMIT 1`))
if err != nil {
if err == pgx.ErrNoRows || IsUndefinedDatabaseObject(err) {
return defaultRunnerPolicy(), nil
}
return RunnerPolicy{}, err
}
return item, nil
}
func (s *Store) UpsertDefaultRunnerPolicy(ctx context.Context, input RunnerPolicyInput) (RunnerPolicy, error) {
input = normalizeRunnerPolicyInput(input)
failoverPolicy, _ := json.Marshal(emptyObjectIfNil(input.FailoverPolicy))
hardStopPolicy, _ := json.Marshal(emptyObjectIfNil(input.HardStopPolicy))
priorityDemotePolicy, _ := json.Marshal(emptyObjectIfNil(input.PriorityDemotePolicy))
metadata, _ := json.Marshal(emptyObjectIfNil(input.Metadata))
return scanRunnerPolicy(s.pool.QueryRow(ctx, `
INSERT INTO gateway_runner_policies (
policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, metadata, status
)
VALUES ($1, $2, NULLIF($3, ''), $4, $5, $6, $7, $8)
ON CONFLICT (policy_key) DO UPDATE
SET name = EXCLUDED.name,
description = EXCLUDED.description,
failover_policy = EXCLUDED.failover_policy,
hard_stop_policy = EXCLUDED.hard_stop_policy,
priority_demote_policy = EXCLUDED.priority_demote_policy,
metadata = EXCLUDED.metadata,
status = EXCLUDED.status,
updated_at = now()
RETURNING `+runnerPolicyColumns,
input.PolicyKey, input.Name, input.Description, failoverPolicy, hardStopPolicy, priorityDemotePolicy, metadata, input.Status,
))
}
func scanRunnerPolicy(scanner runnerPolicyScanner) (RunnerPolicy, error) {
var item RunnerPolicy
var failoverPolicy []byte
var hardStopPolicy []byte
var priorityDemotePolicy []byte
var metadata []byte
if err := scanner.Scan(
&item.ID,
&item.PolicyKey,
&item.Name,
&item.Description,
&failoverPolicy,
&hardStopPolicy,
&priorityDemotePolicy,
&metadata,
&item.Status,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return RunnerPolicy{}, err
}
item.FailoverPolicy = decodeObject(failoverPolicy)
item.HardStopPolicy = decodeObject(hardStopPolicy)
item.PriorityDemotePolicy = decodeObject(priorityDemotePolicy)
item.Metadata = decodeObject(metadata)
return item, nil
}
func normalizeRunnerPolicyInput(input RunnerPolicyInput) RunnerPolicyInput {
input.PolicyKey = strings.TrimSpace(input.PolicyKey)
if input.PolicyKey == "" {
input.PolicyKey = "default-runner-v1"
}
input.Name = strings.TrimSpace(input.Name)
if input.Name == "" {
input.Name = "默认全局调度策略"
}
input.Description = strings.TrimSpace(input.Description)
input.Status = strings.TrimSpace(input.Status)
if input.Status == "" {
input.Status = "active"
}
return input
}
func defaultRunnerPolicy() RunnerPolicy {
now := time.Now()
return RunnerPolicy{
PolicyKey: "default-runner-v1",
Name: "默认全局调度策略",
Description: "控制多个候选平台之间的故障切换;模型运行策略只可覆盖 failoverPolicy不能覆盖 hardStopPolicy。",
FailoverPolicy: defaultRunnerFailoverPolicy(),
HardStopPolicy: defaultRunnerHardStopPolicy(),
PriorityDemotePolicy: defaultRunnerPriorityDemotePolicy(),
Metadata: map[string]any{"source": "code-default"},
Status: "active",
CreatedAt: now,
UpdatedAt: now,
}
}
func defaultRunnerPriorityDemotePolicy() map[string]any {
return map[string]any{
"enabled": true,
"demoteStep": 100,
"categories": []any{"network", "timeout", "stream_error", "rate_limit", "provider_5xx", "provider_overloaded"},
"codes": []any{"network", "timeout", "stream_read_error", "rate_limit", "server_error", "overloaded"},
"statusCodes": []any{408, 429, 500, 502, 503, 504},
"keywords": []any{"timeout", "network", "rate_limit", "overloaded", "temporarily_unavailable", "server_error", "429", "5xx"},
}
}
func defaultRunnerFailoverPolicy() map[string]any {
return map[string]any{
"enabled": true,
"maxPlatforms": 99,
"maxDurationSeconds": 600,
"allowCategories": []any{"network", "timeout", "stream_error", "rate_limit", "provider_5xx", "provider_overloaded", "auth_error"},
"denyCategories": []any{"request_error", "unsupported_model", "user_permission", "insufficient_balance"},
"allowCodes": []any{"auth_failed", "invalid_api_key", "missing_credentials"},
"allowKeywords": []any{"timeout", "network", "rate_limit", "overloaded", "temporarily_unavailable", "server_error", "auth_failed", "invalid_api_key", "missing_credentials", "unauthorized", "forbidden", "429", "5xx"},
"denyKeywords": []any{"invalid_parameter", "missing required", "bad request"},
"allowStatusCodes": []any{401, 403, 408, 429, 500, 502, 503, 504},
"denyStatusCodes": []any{},
}
}
func defaultRunnerHardStopPolicy() map[string]any {
return map[string]any{
"enabled": true,
"categories": []any{"request_error", "unsupported_model", "user_permission", "insufficient_balance"},
"codes": []any{"bad_request", "invalid_request", "invalid_parameter", "missing_required", "unsupported_kind", "unsupported_model", "insufficient_balance", "permission_denied"},
"statusCodes": []any{},
"keywords": []any{"invalid_parameter", "missing required", "bad request", "insufficient balance"},
}
}

View File

@ -134,6 +134,28 @@ WHERE id = $1::uuid`, platformID, cooldownSeconds)
return err
}
func (s *Store) DemoteCandidatePlatformPriority(ctx context.Context, platformID string, demoteStep int) error {
if strings.TrimSpace(platformID) == "" {
return nil
}
if demoteStep <= 0 {
demoteStep = 100
}
_, err := s.pool.Exec(ctx, `
UPDATE integration_platforms target
SET dynamic_priority = GREATEST(
COALESCE(target.dynamic_priority, target.priority),
COALESCE((
SELECT MAX(COALESCE(peer.dynamic_priority, peer.priority))
FROM integration_platforms peer
WHERE peer.deleted_at IS NULL
), target.priority) + $2::int
),
updated_at = now()
WHERE target.id = $1::uuid`, platformID, demoteStep)
return err
}
func scanRuntimePolicySet(scanner runtimePolicyScanner) (RuntimePolicySet, error) {
var item RuntimePolicySet
var rateLimitPolicy []byte

View File

@ -3,6 +3,7 @@ package store
import (
"context"
"encoding/json"
"strconv"
"strings"
"time"
@ -234,6 +235,27 @@ func (s *Store) ListTaskAttempts(ctx context.Context, taskID string) ([]TaskAtte
return attemptsByTaskID[taskID], nil
}
func (s *Store) AppendTaskAttemptTrace(ctx context.Context, taskID string, attemptNo int, entry map[string]any) error {
entryJSON, _ := json.Marshal(emptyObjectIfNil(entry))
_, err := s.pool.Exec(ctx, `
UPDATE gateway_task_attempts
SET metrics = jsonb_set(
COALESCE(metrics, '{}'::jsonb),
'{trace}',
(
CASE
WHEN jsonb_typeof(COALESCE(metrics->'trace', '[]'::jsonb)) = 'array'
THEN COALESCE(metrics->'trace', '[]'::jsonb)
ELSE '[]'::jsonb
END
) || jsonb_build_array($3::jsonb),
true
)
WHERE task_id = $1::uuid
AND attempt_no = $2`, taskID, attemptNo, string(entryJSON))
return err
}
func (s *Store) listTaskAttemptsByTaskIDs(ctx context.Context, taskIDs []string) (map[string][]TaskAttempt, error) {
itemsByTaskID := map[string][]TaskAttempt{}
if len(taskIDs) == 0 {
@ -331,6 +353,7 @@ func enrichTaskAttemptFromMetrics(item *TaskAttempt) {
item.ModelAlias = firstNonEmpty(item.ModelAlias, taskAttemptMetricString(item.Metrics, "modelAlias"))
item.ModelType = firstNonEmpty(item.ModelType, taskAttemptMetricString(item.Metrics, "modelType"))
item.ClientID = firstNonEmpty(item.ClientID, taskAttemptMetricString(item.Metrics, "clientId"))
item.StatusCode = taskAttemptMetricInt(item.Metrics, "statusCode")
}
func taskAttemptMetricString(metrics map[string]any, key string) string {
@ -338,6 +361,25 @@ func taskAttemptMetricString(metrics map[string]any, key string) string {
return strings.TrimSpace(value)
}
func taskAttemptMetricInt(metrics map[string]any, key string) int {
switch value := metrics[key].(type) {
case int:
return value
case int64:
return int(value)
case float64:
return int(value)
case json.Number:
next, _ := value.Int64()
return int(next)
case string:
next, _ := strconv.Atoi(strings.TrimSpace(value))
return next
default:
return 0
}
}
func (s *Store) FinishTaskAttempt(ctx context.Context, input FinishTaskAttemptInput) error {
responseJSON, _ := json.Marshal(emptyObjectIfNil(input.ResponseSnapshot))
usageJSON, _ := json.Marshal(emptyObjectIfNil(input.Usage))

View File

@ -0,0 +1,73 @@
CREATE TABLE IF NOT EXISTS gateway_runner_policies (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
policy_key text NOT NULL UNIQUE,
name text NOT NULL,
description text,
failover_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
hard_stop_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
priority_demote_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'active',
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
ALTER TABLE IF EXISTS gateway_runner_policies
ADD COLUMN IF NOT EXISTS priority_demote_policy jsonb NOT NULL DEFAULT '{}'::jsonb;
INSERT INTO gateway_runner_policies (
policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, metadata, status
)
VALUES (
'default-runner-v1',
'默认全局调度策略',
'控制多个候选平台之间的故障切换;模型运行策略只可覆盖 failoverPolicy不能覆盖 hardStopPolicy。',
'{
"enabled": true,
"maxPlatforms": 99,
"maxDurationSeconds": 600,
"allowCategories": ["network", "timeout", "stream_error", "rate_limit", "provider_5xx", "provider_overloaded", "auth_error"],
"denyCategories": ["request_error", "unsupported_model", "user_permission", "insufficient_balance"],
"allowCodes": ["auth_failed", "invalid_api_key", "missing_credentials"],
"allowKeywords": ["timeout", "network", "rate_limit", "overloaded", "temporarily_unavailable", "server_error", "auth_failed", "invalid_api_key", "missing_credentials", "unauthorized", "forbidden", "429", "5xx"],
"denyKeywords": ["invalid_parameter", "missing required", "bad request"],
"allowStatusCodes": [401, 403, 408, 429, 500, 502, 503, 504],
"denyStatusCodes": [],
"actions": {
"auth_error": "disable_and_next",
"rate_limit": "cooldown_and_next",
"provider_5xx": "next",
"request_error": "stop"
}
}'::jsonb,
'{
"enabled": true,
"categories": ["request_error", "unsupported_model", "user_permission", "insufficient_balance"],
"codes": ["bad_request", "invalid_request", "invalid_parameter", "missing_required", "unsupported_kind", "unsupported_model", "insufficient_balance", "permission_denied"],
"statusCodes": [],
"keywords": ["invalid_parameter", "missing required", "bad request", "insufficient balance"]
}'::jsonb,
'{
"enabled": true,
"demoteStep": 100,
"categories": ["network", "timeout", "stream_error", "rate_limit", "provider_5xx", "provider_overloaded"],
"codes": ["network", "timeout", "stream_read_error", "rate_limit", "server_error", "overloaded"],
"statusCodes": [408, 429, 500, 502, 503, 504],
"keywords": ["timeout", "network", "rate_limit", "overloaded", "temporarily_unavailable", "server_error", "429", "5xx"]
}'::jsonb,
'{"seed":"0026_runner_policies"}'::jsonb,
'active'
)
ON CONFLICT (policy_key) DO UPDATE
SET name = EXCLUDED.name,
description = EXCLUDED.description,
failover_policy = gateway_runner_policies.failover_policy || EXCLUDED.failover_policy,
hard_stop_policy = gateway_runner_policies.hard_stop_policy || EXCLUDED.hard_stop_policy,
priority_demote_policy = gateway_runner_policies.priority_demote_policy || EXCLUDED.priority_demote_policy,
metadata = gateway_runner_policies.metadata || EXCLUDED.metadata,
updated_at = now();
UPDATE model_runtime_policy_sets
SET description = '默认包含 TPM/RPM/并发、平台内调用重试、自动禁用和优先级降级关键词。'
WHERE policy_key = 'default-runtime-v1'
AND description = '默认包含 TPM/RPM/并发、失败重试、自动禁用和优先级降级关键词。';

View File

@ -0,0 +1,17 @@
UPDATE gateway_runner_policies
SET failover_policy = jsonb_set(
jsonb_set(
failover_policy,
'{maxDurationSeconds}',
COALESCE(failover_policy -> 'maxDurationSeconds', '600'::jsonb),
true
),
'{maxPlatforms}',
CASE
WHEN COALESCE(failover_policy ->> 'maxPlatforms', '') IN ('', '3') THEN '99'::jsonb
ELSE failover_policy -> 'maxPlatforms'
END,
true
),
updated_at = now()
WHERE policy_key = 'default-runner-v1';

View File

@ -0,0 +1,22 @@
UPDATE gateway_runner_policies
SET failover_policy = failover_policy
|| jsonb_build_object(
'allowCategories', jsonb_build_array('network', 'timeout', 'stream_error', 'rate_limit', 'provider_5xx', 'provider_overloaded', 'auth_error'),
'denyCategories', jsonb_build_array('request_error', 'unsupported_model', 'user_permission', 'insufficient_balance'),
'allowCodes', jsonb_build_array('auth_failed', 'invalid_api_key', 'missing_credentials'),
'allowKeywords', jsonb_build_array('timeout', 'network', 'rate_limit', 'overloaded', 'temporarily_unavailable', 'server_error', 'auth_failed', 'invalid_api_key', 'missing_credentials', 'unauthorized', 'forbidden', '429', '5xx'),
'denyKeywords', jsonb_build_array('invalid_parameter', 'missing required', 'bad request'),
'allowStatusCodes', jsonb_build_array(401, 403, 408, 429, 500, 502, 503, 504),
'denyStatusCodes', '[]'::jsonb,
'actions', COALESCE(failover_policy->'actions', '{}'::jsonb) || jsonb_build_object('auth_error', 'disable_and_next')
),
hard_stop_policy = hard_stop_policy
|| jsonb_build_object(
'categories', jsonb_build_array('request_error', 'unsupported_model', 'user_permission', 'insufficient_balance'),
'codes', jsonb_build_array('bad_request', 'invalid_request', 'invalid_parameter', 'missing_required', 'unsupported_kind', 'unsupported_model', 'insufficient_balance', 'permission_denied'),
'statusCodes', '[]'::jsonb,
'keywords', jsonb_build_array('invalid_parameter', 'missing required', 'bad request', 'insufficient balance')
),
metadata = metadata || jsonb_build_object('platformAuthFailoverPolicy', '0028_runner_policy_platform_auth_failover'),
updated_at = now()
WHERE policy_key = 'default-runner-v1';

View File

@ -8,6 +8,7 @@ import type {
GatewayApiKey,
GatewayAuditLog,
GatewayNetworkProxyConfig,
GatewayRunnerPolicy,
GatewayTenantUpsertRequest,
GatewayTask,
GatewayUserUpsertRequest,
@ -43,6 +44,7 @@ import {
deleteUserGroup,
getHealth,
getNetworkProxyConfig,
getRunnerPolicy,
getTask,
getWalletSummary,
listAccessRules,
@ -58,11 +60,11 @@ import {
listPlatforms,
listPricingRules,
listPricingRuleSets,
listPublicBaseModels,
listPublicCatalogProviders,
listRuntimePolicySets,
listTasks,
listWalletTransactions,
listPublicBaseModels,
listPublicCatalogProviders,
listRateLimitWindows,
listTenants,
listUserGroups,
@ -135,6 +137,7 @@ type DataKey =
| 'baseModels'
| 'pricingRules'
| 'pricingRuleSets'
| 'runnerPolicy'
| 'runtimePolicySets'
| 'rateLimitWindows'
| 'tenants'
@ -175,6 +178,7 @@ export function App() {
const [baseModels, setBaseModels] = useState<BaseModelCatalogItem[]>([]);
const [pricingRules, setPricingRules] = useState<PricingRule[]>([]);
const [pricingRuleSets, setPricingRuleSets] = useState<PricingRuleSet[]>([]);
const [runnerPolicy, setRunnerPolicy] = useState<GatewayRunnerPolicy | null>(null);
const [runtimePolicySets, setRuntimePolicySets] = useState<RuntimePolicySet[]>([]);
const [accessRules, setAccessRules] = useState<GatewayAccessRule[]>([]);
const [auditLogs, setAuditLogs] = useState<GatewayAuditLog[]>([]);
@ -217,9 +221,10 @@ export function App() {
setPricingRuleSets,
token,
});
const { removeRuntimePolicySet, saveRuntimePolicySet } = useRuntimePolicySetOperations({
const { removeRuntimePolicySet, saveRunnerPolicy, saveRuntimePolicySet } = useRuntimePolicySetOperations({
setCoreMessage,
setCoreState,
setRunnerPolicy,
setRuntimePolicySets,
token,
});
@ -269,6 +274,7 @@ export function App() {
modelCatalog,
models,
networkProxyConfig,
runnerPolicy,
platforms,
pricingRules,
pricingRuleSets,
@ -282,7 +288,7 @@ export function App() {
users,
walletAccounts,
walletTransactions,
}), [accessRules, apiKeys, auditLogs, baseModels, modelCatalog, models, networkProxyConfig, platforms, pricingRuleSets, pricingRules, providers, rateLimitWindows, runtimePolicySets, taskResult, tasks, tenants, userGroups, users, walletAccounts, walletTransactions]);
}), [accessRules, apiKeys, auditLogs, baseModels, modelCatalog, models, networkProxyConfig, platforms, pricingRuleSets, pricingRules, providers, rateLimitWindows, runnerPolicy, runtimePolicySets, taskResult, tasks, tenants, userGroups, users, walletAccounts, walletTransactions]);
async function refresh(nextToken = token) {
await ensureRouteData(nextToken, true);
@ -377,6 +383,9 @@ export function App() {
case 'pricingRuleSets':
setPricingRuleSets((await listPricingRuleSets(nextToken)).items);
return;
case 'runnerPolicy':
setRunnerPolicy(await getRunnerPolicy(nextToken));
return;
case 'runtimePolicySets':
setRuntimePolicySets((await listRuntimePolicySets(nextToken)).items);
return;
@ -774,6 +783,7 @@ export function App() {
setBaseModels([]);
setPricingRules([]);
setPricingRuleSets([]);
setRunnerPolicy(null);
setRuntimePolicySets([]);
setAccessRules([]);
setAuditLogs([]);
@ -948,6 +958,7 @@ export function App() {
onSavePlatform={savePlatformWithModels}
onSaveProvider={saveProvider}
onSavePricingRuleSet={savePricingRuleSet}
onSaveRunnerPolicy={saveRunnerPolicy}
onSaveRuntimePolicySet={saveRuntimePolicySet}
onBatchAccessRules={batchSaveAccessRules}
onSaveAccessRule={saveAccessRule}
@ -1127,7 +1138,7 @@ function dataKeysForRoute(
case 'pricing':
return ['pricingRuleSets'];
case 'runtime':
return ['runtimePolicySets'];
return ['runtimePolicySets', 'runnerPolicy'];
case 'baseModels':
return ['baseModels', 'providers', 'pricingRuleSets', 'runtimePolicySets'];
case 'platforms':

View File

@ -10,6 +10,8 @@ import type {
GatewayAccessRuleUpsertRequest,
GatewayApiKey,
GatewayAuditLog,
GatewayRunnerPolicy,
GatewayRunnerPolicyUpsertRequest,
GatewayTenant,
GatewayTenantUpsertRequest,
GatewayNetworkProxyConfig,
@ -239,6 +241,21 @@ export async function listRuntimePolicySets(token: string): Promise<ListResponse
return request<ListResponse<RuntimePolicySet>>('/api/admin/runtime/policy-sets', { token });
}
export async function getRunnerPolicy(token: string): Promise<GatewayRunnerPolicy> {
return request<GatewayRunnerPolicy>('/api/admin/runtime/runner-policy', { token });
}
export async function updateRunnerPolicy(
token: string,
input: GatewayRunnerPolicyUpsertRequest,
): Promise<GatewayRunnerPolicy> {
return request<GatewayRunnerPolicy>('/api/admin/runtime/runner-policy', {
body: input,
method: 'PATCH',
token,
});
}
export async function createRuntimePolicySet(
token: string,
input: RuntimePolicySetUpsertRequest,

View File

@ -5,6 +5,7 @@ import type {
GatewayApiKey,
GatewayAuditLog,
GatewayNetworkProxyConfig,
GatewayRunnerPolicy,
GatewayTask,
GatewayTenant,
GatewayUser,
@ -28,6 +29,7 @@ export interface ConsoleData {
modelCatalog: ModelCatalogResponse;
models: PlatformModel[];
networkProxyConfig: GatewayNetworkProxyConfig | null;
runnerPolicy: GatewayRunnerPolicy | null;
platforms: IntegrationPlatform[];
pricingRules: PricingRule[];
pricingRuleSets: PricingRuleSet[];

View File

@ -1,14 +1,31 @@
import type { Dispatch, SetStateAction } from 'react';
import type { RuntimePolicySet, RuntimePolicySetUpsertRequest } from '@easyai-ai-gateway/contracts';
import { createRuntimePolicySet, deleteRuntimePolicySet, updateRuntimePolicySet } from '../api';
import type { GatewayRunnerPolicy, GatewayRunnerPolicyUpsertRequest, RuntimePolicySet, RuntimePolicySetUpsertRequest } from '@easyai-ai-gateway/contracts';
import { createRuntimePolicySet, deleteRuntimePolicySet, updateRunnerPolicy, updateRuntimePolicySet } from '../api';
import type { LoadState } from '../types';
export function useRuntimePolicySetOperations(input: {
setCoreMessage: Dispatch<SetStateAction<string>>;
setCoreState: Dispatch<SetStateAction<LoadState>>;
setRunnerPolicy: Dispatch<SetStateAction<GatewayRunnerPolicy | null>>;
setRuntimePolicySets: Dispatch<SetStateAction<RuntimePolicySet[]>>;
token: string;
}) {
async function saveRunnerPolicy(payload: GatewayRunnerPolicyUpsertRequest) {
if (!input.token) throw new Error('请先登录后再维护调度策略');
input.setCoreState('loading');
input.setCoreMessage('');
try {
const policy = await updateRunnerPolicy(input.token, payload);
input.setRunnerPolicy(policy);
input.setCoreState('ready');
input.setCoreMessage('全局调度策略已更新。');
} catch (err) {
input.setCoreState('error');
input.setCoreMessage(err instanceof Error ? err.message : '全局调度策略保存失败');
throw err;
}
}
async function saveRuntimePolicySet(payload: RuntimePolicySetUpsertRequest, policySetId?: string) {
if (!input.token) throw new Error('请先登录后再维护运行策略');
input.setCoreState('loading');
@ -43,5 +60,5 @@ export function useRuntimePolicySetOperations(input: {
}
}
return { removeRuntimePolicySet, saveRuntimePolicySet };
return { removeRuntimePolicySet, saveRunnerPolicy, saveRuntimePolicySet };
}

View File

@ -6,6 +6,7 @@ import type {
GatewayAccessRuleBatchRequest,
GatewayAccessRuleUpsertRequest,
GatewayTenantUpsertRequest,
GatewayRunnerPolicyUpsertRequest,
GatewayUserUpsertRequest,
PricingRuleSetUpsertRequest,
RuntimePolicySetUpsertRequest,
@ -62,6 +63,7 @@ export function AdminPage(props: {
onSavePlatform: (input: PlatformWithModelsInput) => Promise<void>;
onSaveProvider: (input: CatalogProviderUpsertRequest, providerId?: string) => Promise<void>;
onSavePricingRuleSet: (input: PricingRuleSetUpsertRequest, ruleSetId?: string) => Promise<void>;
onSaveRunnerPolicy: (input: GatewayRunnerPolicyUpsertRequest) => Promise<void>;
onSaveRuntimePolicySet: (input: RuntimePolicySetUpsertRequest, policySetId?: string) => Promise<void>;
onSaveAccessRule: (input: GatewayAccessRuleUpsertRequest, ruleId?: string) => Promise<void>;
onSaveTenant: (input: GatewayTenantUpsertRequest, tenantId?: string) => Promise<void>;
@ -102,9 +104,11 @@ export function AdminPage(props: {
{props.section === 'runtime' && (
<RuntimePoliciesPanel
message={props.operationMessage}
runnerPolicy={props.data.runnerPolicy}
runtimePolicySets={props.data.runtimePolicySets}
state={props.state}
onDeleteRuntimePolicySet={props.onDeleteRuntimePolicySet}
onSaveRunnerPolicy={props.onSaveRunnerPolicy}
onSaveRuntimePolicySet={props.onSaveRuntimePolicySet}
/>
)}

View File

@ -851,6 +851,15 @@ function TaskAttemptPopoverContent(props: { task: GatewayTask }) {
</span>
<small>{taskAttemptMeta(attempt)}</small>
{attempt.status === 'failed' && <span className="taskRecordAttemptError">{taskAttemptFailureReason(attempt)}</span>}
{taskAttemptTrace(attempt).length > 0 && (
<span className="taskRecordAttemptTrace">
{taskAttemptTrace(attempt).map((entry, index) => (
<span key={`${attempt.id || attempt.attemptNo}-trace-${index}`} className="taskRecordAttemptTraceItem">
{taskAttemptTraceText(entry)}
</span>
))}
</span>
)}
</span>
))}
</span>
@ -879,9 +888,11 @@ function taskAttemptStatusText(status: string) {
}
function taskAttemptMeta(attempt: NonNullable<GatewayTask['attempts']>[number]) {
const statusCode = taskAttemptStatusCode(attempt);
const values = [
attempt.providerModelName || attempt.modelName || attempt.modelAlias,
attempt.requestId ? `RequestID ${attempt.requestId}` : '',
statusCode ? `状态码 ${statusCode}` : '',
attempt.responseDurationMs ? formatDuration(attempt.responseDurationMs) : '',
].filter(Boolean);
return values.join(' · ') || attempt.clientId || '-';
@ -894,8 +905,97 @@ function taskAttemptFailureReason(attempt: NonNullable<GatewayTask['attempts']>[
metadataString(attempt.metrics, 'message'),
);
const code = firstText(attempt.errorCode, metadataString(attempt.metrics, 'errorCode'));
if (detail && code && detail !== code) return `${detail}${code}`;
return detail || code || '失败';
const statusCode = taskAttemptStatusCode(attempt);
const statusText = statusCode ? `状态码 ${statusCode}` : '';
const category = taskAttemptFailureCategory(attempt);
const categoryText = category ? `错误分类 ${category}` : '';
if (detail && code && detail !== code) return [detail, code, statusText, categoryText].filter(Boolean).join(' · ');
return [detail || code || '失败', statusText, categoryText].filter(Boolean).join(' · ');
}
function taskAttemptStatusCode(attempt: NonNullable<GatewayTask['attempts']>[number]) {
const value = attempt.statusCode ?? metadataNumber(attempt.metrics, 'statusCode');
return value && Number.isFinite(value) ? Math.trunc(value) : null;
}
function taskAttemptTrace(attempt: NonNullable<GatewayTask['attempts']>[number]) {
const raw = attempt.metrics?.trace;
if (!Array.isArray(raw)) return [];
return raw.filter((item): item is Record<string, unknown> => Boolean(item) && typeof item === 'object' && !Array.isArray(item));
}
function taskAttemptTraceText(entry: Record<string, unknown>) {
const event = objectString(entry, 'event');
const action = objectString(entry, 'action');
const reason = objectString(entry, 'reason');
const statusCode = metadataNumber(entry, 'statusCode');
const errorCode = objectString(entry, 'errorCode');
const category = objectString(entry, 'category');
const policy = taskAttemptTracePolicy(entry);
const values = [
taskAttemptTraceEventLabel(event, action),
statusCode ? `状态码 ${Math.trunc(statusCode)}` : '',
errorCode ? `错误 ${errorCode}` : '',
category ? `错误分类 ${category}` : '',
policy,
reason ? `原因 ${taskAttemptTraceReasonLabel(reason)}` : '',
].filter(Boolean);
return values.join(' · ');
}
function taskAttemptFailureCategory(attempt: NonNullable<GatewayTask['attempts']>[number]) {
const category = firstText(
metadataString(attempt.metrics, 'errorCategory'),
metadataString(attempt.metrics, 'category'),
);
if (category) return category;
for (const entry of taskAttemptTrace(attempt)) {
const traceCategory = objectString(entry, 'category');
if (traceCategory) return traceCategory;
}
return '';
}
function taskAttemptTracePolicy(entry: Record<string, unknown>) {
const source = objectString(entry, 'policySource');
const policy = objectString(entry, 'policy');
const rule = objectString(entry, 'policyRule');
const matchedValue = objectString(entry, 'matchedValue');
const sourceLabel = source || policy;
const policyPath = [sourceLabel, rule].filter(Boolean).join('.');
if (!policyPath) return '';
return matchedValue ? `策略 ${policyPath}=${matchedValue}` : `策略 ${policyPath}`;
}
function taskAttemptTraceEventLabel(event: string, action: string) {
if (event === 'failure') return '失败';
if (event === 'same_client_retry') return action === 'retry' ? '本平台重试' : '本平台停止重试';
if (event === 'failover_next') return '切换下个平台';
if (event === 'failover_stop') return '停止切换平台';
if (event === 'priority_demoted') return '优先级降级';
return event || action || '链路事件';
}
function taskAttemptTraceReasonLabel(reason: string) {
const labels: Record<string, string> = {
client_call_failed: '客户端调用失败',
retry_disabled: '模型重试关闭',
retry_deny_policy: '命中模型拒绝重试规则',
retry_allow_policy: '命中模型允许重试规则',
client_retryable: '客户端标记可重试',
client_non_retryable: '客户端标记不可重试',
same_client_max_attempts: '达到本平台最大尝试次数',
failover_time_budget_exceeded: '超过全局切换时间预算',
runner_policy_disabled: '全局调度策略停用',
hard_stop_policy: '命中硬拒绝规则',
failover_disabled: '平台切换关闭',
failover_deny_policy: '命中拒绝切换规则',
failover_allow_policy: '命中允许切换规则',
max_platforms_reached: '达到最大平台数',
no_next_platform: '没有更多候选平台',
priority_demote_policy: '命中优先级降级规则',
};
return labels[reason] ?? reason;
}
function formatCellValue(value: unknown) {

View File

@ -158,7 +158,7 @@ export function PlatformManagementPanel(props: {
<CardHeader>
<div>
<CardTitle></CardTitle>
<p className="mutedText">Base URL</p>
<p className="mutedText">Base URL</p>
</div>
<Button type="button" onClick={openCreateDialog}>
<Plus size={15} />
@ -303,7 +303,7 @@ export function PlatformManagementPanel(props: {
<Input value={form.defaultDiscountFactor} inputMode="decimal" onChange={(event) => setForm({ ...form, defaultDiscountFactor: event.target.value })} />
</Label>
<ToggleField checked={form.retryEnabled} label="失败后重试下一个客户端" onChange={(checked) => setForm({ ...form, retryEnabled: checked })} />
<ToggleField checked={form.retryEnabled} label="失败后同平台重试" onChange={(checked) => setForm({ ...form, retryEnabled: checked })} />
<Label>
<Input value={form.retryMaxAttempts} inputMode="numeric" disabled={!form.retryEnabled} onChange={(event) => setForm({ ...form, retryMaxAttempts: event.target.value })} />
@ -370,7 +370,7 @@ function ModelBindingPolicy(props: { form: PlatformWizardForm; onChange: (value:
<ToggleField checked={form.modelOverrideRetry} label="覆盖模型重试策略" onChange={(checked) => onChange({ ...form, modelOverrideRetry: checked })} />
{form.modelOverrideRetry && (
<>
<ToggleField checked={form.modelRetryEnabled} label="模型失败重试" onChange={(checked) => onChange({ ...form, modelRetryEnabled: checked })} />
<ToggleField checked={form.modelRetryEnabled} label="模型同平台重试" onChange={(checked) => onChange({ ...form, modelRetryEnabled: checked })} />
<Label>
<Input value={form.modelRetryMaxAttempts} inputMode="numeric" disabled={!form.modelRetryEnabled} onChange={(event) => onChange({ ...form, modelRetryMaxAttempts: event.target.value })} />
@ -1119,7 +1119,7 @@ function platformRuntimeSummary(platform: IntegrationPlatform) {
const retryPolicy = platform.retryPolicy ?? {};
const retryEnabled = readBoolean(retryPolicy, 'enabled', true);
const maxAttempts = readNumber(retryPolicy, 'maxAttempts') ?? 2;
return `优先级 ${platform.priority} · ${retryEnabled ? `最多重${maxAttempts}` : '不重试'} · ${proxyModeText(readNetworkProxyConfig(platform.config ?? {}).proxyMode)}`;
return `优先级 ${platform.priority} · ${retryEnabled ? `同平台最多尝${maxAttempts}` : '同平台不重试'} · ${proxyModeText(readNetworkProxyConfig(platform.config ?? {}).proxyMode)}`;
}
function proxyModeText(mode: PlatformWizardForm['proxyMode']) {

View File

@ -1,9 +1,13 @@
import { useState, type FormEvent } from 'react';
import { Gauge, Pencil, Plus, RotateCcw, ShieldCheck, Trash2 } from 'lucide-react';
import type { RuntimePolicySet, RuntimePolicySetUpsertRequest } from '@easyai-ai-gateway/contracts';
import { Badge, Button, Card, CardContent, CardHeader, CardTitle, ConfirmDialog, FormDialog, Input, Label, Textarea } from '../../components/ui';
import { useEffect, useState, type FormEvent } from 'react';
import { Select as AntSelect } from 'antd';
import { Gauge, Pencil, Plus, RotateCcw, Route, Save, ShieldCheck, Trash2 } from 'lucide-react';
import type { GatewayRunnerPolicy, GatewayRunnerPolicyUpsertRequest, RuntimePolicySet, RuntimePolicySetUpsertRequest } from '@easyai-ai-gateway/contracts';
import { Badge, Button, Card, CardContent, CardHeader, CardTitle, ConfirmDialog, FormDialog, Input, Label, Select, Tabs, Textarea } from '../../components/ui';
import type { LoadState } from '../../types';
type RuntimePanelTab = 'model' | 'runner';
type RunnerPolicyStrategy = 'failover' | 'hardStop' | 'priorityDemote';
type RuntimePolicyForm = {
policyKey: string;
name: string;
@ -13,31 +17,69 @@ type RuntimePolicyForm = {
concurrency: string;
retryEnabled: boolean;
retryMaxAttempts: string;
retryAllowKeywords: string;
retryDenyKeywords: string;
retryAllowKeywords: string[];
retryDenyKeywords: string[];
autoDisableEnabled: boolean;
autoDisableThreshold: string;
autoDisableKeywords: string;
autoDisableKeywords: string[];
degradeEnabled: boolean;
degradeCooldownSeconds: string;
degradeKeywords: string;
degradeKeywords: string[];
metadataJson: string;
status: string;
};
type RunnerPolicyForm = {
name: string;
description: string;
failoverEnabled: boolean;
maxPlatforms: string;
maxDurationSeconds: string;
allowCategories: string[];
denyCategories: string[];
allowCodes: string[];
denyCodes: string[];
allowKeywords: string[];
denyKeywords: string[];
allowStatusCodes: string[];
denyStatusCodes: string[];
failoverActions: Record<string, unknown>;
hardStopEnabled: boolean;
hardStopCategories: string[];
hardStopCodes: string[];
hardStopStatusCodes: string[];
hardStopKeywords: string[];
priorityDemoteEnabled: boolean;
priorityDemoteStep: string;
priorityDemoteCategories: string[];
priorityDemoteCodes: string[];
priorityDemoteStatusCodes: string[];
priorityDemoteKeywords: string[];
metadataJson: string;
status: string;
};
export function RuntimePoliciesPanel(props: {
message: string;
runnerPolicy: GatewayRunnerPolicy | null;
runtimePolicySets: RuntimePolicySet[];
state: LoadState;
onDeleteRuntimePolicySet: (policySetId: string) => Promise<void>;
onSaveRunnerPolicy: (input: GatewayRunnerPolicyUpsertRequest) => Promise<void>;
onSaveRuntimePolicySet: (input: RuntimePolicySetUpsertRequest, policySetId?: string) => Promise<void>;
}) {
const [activeTab, setActiveTab] = useState<RuntimePanelTab>('model');
const [dialogOpen, setDialogOpen] = useState(false);
const [editingId, setEditingId] = useState('');
const [form, setForm] = useState<RuntimePolicyForm>(() => createDefaultForm());
const [runnerForm, setRunnerForm] = useState<RunnerPolicyForm>(() => runnerPolicyToForm(null));
const [localError, setLocalError] = useState('');
const [pendingDeletePolicy, setPendingDeletePolicy] = useState<RuntimePolicySet | null>(null);
useEffect(() => {
setRunnerForm(runnerPolicyToForm(props.runnerPolicy));
}, [props.runnerPolicy?.id, props.runnerPolicy?.updatedAt]);
function openCreateDialog() {
setEditingId('');
setLocalError('');
@ -80,62 +122,91 @@ export function RuntimePoliciesPanel(props: {
}
}
async function submitRunnerPolicy(event: FormEvent<HTMLFormElement>) {
event.preventDefault();
setLocalError('');
try {
await props.onSaveRunnerPolicy(runnerFormToPayload(runnerForm));
} catch (err) {
setLocalError(err instanceof Error ? err.message : '全局调度策略保存失败');
}
}
return (
<div className="pageStack">
<Card>
<CardHeader>
<div>
<CardTitle></CardTitle>
<p className="mutedText"></p>
<p className="mutedText"></p>
</div>
<Button type="button" onClick={openCreateDialog}>
{activeTab === 'model' && <Button type="button" onClick={openCreateDialog}>
<Plus size={15} />
</Button>
</Button>}
</CardHeader>
<CardContent>
{(props.message || localError) && <p className="formMessage">{localError || props.message}</p>}
<Tabs
value={activeTab}
tabs={[
{ value: 'model', label: '模型运行策略', icon: <ShieldCheck size={15} /> },
{ value: 'runner', label: '全局调度策略', icon: <Route size={15} /> },
]}
onValueChange={setActiveTab}
/>
</CardContent>
</Card>
<section className="runtimePolicyGrid">
{props.runtimePolicySets.map((policy) => (
<article className="runtimePolicyCard" key={policy.id}>
<header>
<div className="iconBox"><ShieldCheck size={18} /></div>
<div>
<strong>{policy.name}</strong>
<span>{policy.policyKey}</span>
{activeTab === 'runner' && (
<RunnerPolicyEditor
form={runnerForm}
loading={props.state === 'loading'}
onChange={setRunnerForm}
onSubmit={submitRunnerPolicy}
/>
)}
{activeTab === 'model' && (
<section className="runtimePolicyGrid">
{props.runtimePolicySets.map((policy) => (
<article className="runtimePolicyCard" key={policy.id}>
<header>
<div className="iconBox"><ShieldCheck size={18} /></div>
<div>
<strong>{policy.name}</strong>
<span>{policy.policyKey}</span>
</div>
<Badge variant={policy.status === 'active' ? 'success' : 'secondary'}>{policy.status}</Badge>
</header>
{policy.description && <p>{policy.description}</p>}
<div className="runtimePolicySummary">
<span><Gauge size={13} />{rateLimitSummary(policy)}</span>
<span>{retrySummary(policy)}</span>
<span>{autoDisableSummary(policy)}</span>
<span>{degradeSummary(policy)}</span>
</div>
<Badge variant={policy.status === 'active' ? 'success' : 'secondary'}>{policy.status}</Badge>
</header>
{policy.description && <p>{policy.description}</p>}
<div className="runtimePolicySummary">
<span><Gauge size={13} />{rateLimitSummary(policy)}</span>
<span>{retrySummary(policy)}</span>
<span>{autoDisableSummary(policy)}</span>
<span>{degradeSummary(policy)}</span>
</div>
<footer>
<Button type="button" variant="outline" size="sm" onClick={() => editPolicy(policy)}>
<Pencil size={14} />
</Button>
<Button
type="button"
variant="destructive"
size="sm"
disabled={isDefaultPolicy(policy)}
title={isDefaultPolicy(policy) ? '默认运行策略不能删除' : undefined}
onClick={() => setPendingDeletePolicy(policy)}
>
<Trash2 size={14} />
</Button>
</footer>
</article>
))}
</section>
<footer>
<Button type="button" variant="outline" size="sm" onClick={() => editPolicy(policy)}>
<Pencil size={14} />
</Button>
<Button
type="button"
variant="destructive"
size="sm"
disabled={isDefaultPolicy(policy)}
title={isDefaultPolicy(policy) ? '默认运行策略不能删除' : undefined}
onClick={() => setPendingDeletePolicy(policy)}
>
<Trash2 size={14} />
</Button>
</footer>
</article>
))}
</section>
)}
<FormDialog
bodyClassName="runtimePolicyFormBody"
@ -172,9 +243,9 @@ export function RuntimePoliciesPanel(props: {
</section>
<section className="runtimePolicySection spanTwo">
<header><strong></strong><span>/</span></header>
<header><strong></strong><span>/</span></header>
<div className="runtimePolicyRows">
<Toggle checked={form.retryEnabled} label="允许失败重试" onChange={(checked) => setForm({ ...form, retryEnabled: checked })} />
<Toggle checked={form.retryEnabled} label="允许同平台重试" onChange={(checked) => setForm({ ...form, retryEnabled: checked })} />
<Label><Input value={form.retryMaxAttempts} inputMode="numeric" onChange={(event) => setForm({ ...form, retryMaxAttempts: event.target.value })} /></Label>
<KeywordField label="允许重试关键词" value={form.retryAllowKeywords} onChange={(value) => setForm({ ...form, retryAllowKeywords: value })} />
<KeywordField label="拒绝重试关键词" value={form.retryDenyKeywords} onChange={(value) => setForm({ ...form, retryDenyKeywords: value })} />
@ -218,15 +289,244 @@ function Toggle(props: { checked: boolean; label: string; onChange: (checked: bo
);
}
function KeywordField(props: { label: string; value: string; onChange: (value: string) => void }) {
function RunnerPolicyEditor(props: {
form: RunnerPolicyForm;
loading: boolean;
onChange: (form: RunnerPolicyForm) => void;
onSubmit: (event: FormEvent<HTMLFormElement>) => void;
}) {
const [activeStrategy, setActiveStrategy] = useState<RunnerPolicyStrategy>('failover');
const patch = (next: Partial<RunnerPolicyForm>) => props.onChange({ ...props.form, ...next });
const strategyDefinitions = [
{
value: 'failover' as const,
title: '平台间故障切换',
description: '当前平台内部重试耗尽后是否尝试下一个候选平台',
enabled: props.form.failoverEnabled,
icon: <Route size={15} />,
},
{
value: 'hardStop' as const,
title: '硬拒绝规则',
description: '参数、余额、权限等错误直接失败,不受模型覆盖影响',
enabled: props.form.hardStopEnabled,
icon: <ShieldCheck size={15} />,
},
{
value: 'priorityDemote' as const,
title: '优先级降级',
description: '命中后将失败平台的动态优先级调整到当前最后',
enabled: props.form.priorityDemoteEnabled,
icon: <Gauge size={15} />,
},
];
const activeDefinition = strategyDefinitions.find((item) => item.value === activeStrategy) ?? strategyDefinitions[0];
return (
<Label className="spanTwo">
<Card>
<CardHeader className="runnerPolicyHeader">
<div className="runnerPolicyHeaderText">
<CardTitle>{props.form.name}</CardTitle>
<p className="mutedText">{props.form.description}</p>
</div>
<Label className="runnerPolicyHeaderStatus">
<Select value={props.form.status} onChange={(event) => patch({ status: event.target.value })}>
<option value="active"></option>
<option value="disabled"></option>
</Select>
</Label>
</CardHeader>
<CardContent>
<form className="runtimePolicyFormBody runnerPolicyForm" onSubmit={props.onSubmit}>
<div className="capabilityWorkbench runnerPolicyWorkbench spanTwo">
<aside className="capabilitySidebar">
<div className="capabilitySidebarTitle">
<Route size={15} />
<strong></strong>
</div>
<div className="capabilityList">
{strategyDefinitions.map((item) => (
<button className="capabilityListItem" data-active={item.value === activeStrategy} key={item.value} type="button" onClick={() => setActiveStrategy(item.value)}>
<span>
<strong>{item.title}</strong>
<small>{item.description}</small>
</span>
<Badge variant={item.enabled ? 'success' : 'secondary'}>{item.enabled ? '启用' : '关闭'}</Badge>
</button>
))}
</div>
</aside>
<div className="capabilityFormPanel runnerPolicyDetailPanel">
<header>
{activeDefinition.icon}
<div>
<strong>{activeDefinition.title}</strong>
<small>{activeDefinition.description}</small>
</div>
<span>{activeDefinition.enabled ? '启用' : '关闭'}</span>
</header>
{activeStrategy === 'failover' && (
<div className="runtimePolicyRows runnerPolicyDetailRows">
<Toggle checked={props.form.failoverEnabled} label="启用平台间切换" onChange={(checked) => patch({ failoverEnabled: checked })} />
<Label>
<Input value={props.form.maxPlatforms} inputMode="numeric" onChange={(event) => patch({ maxPlatforms: event.target.value })} />
<span className="runtimeFieldHint"> 99</span>
</Label>
<Label>
<Input value={props.form.maxDurationSeconds} inputMode="numeric" onChange={(event) => patch({ maxDurationSeconds: event.target.value })} />
<span className="runtimeFieldHint"> 600 </span>
</Label>
<KeywordField label="允许分类" value={props.form.allowCategories} onChange={(value) => patch({ allowCategories: value })} />
<KeywordField label="拒绝分类" value={props.form.denyCategories} onChange={(value) => patch({ denyCategories: value })} />
<KeywordField label="允许错误码" value={props.form.allowCodes} onChange={(value) => patch({ allowCodes: value })} />
<KeywordField label="拒绝错误码" value={props.form.denyCodes} onChange={(value) => patch({ denyCodes: value })} />
<KeywordField label="允许关键词" value={props.form.allowKeywords} onChange={(value) => patch({ allowKeywords: value })} />
<KeywordField label="拒绝关键词" value={props.form.denyKeywords} onChange={(value) => patch({ denyKeywords: value })} />
<KeywordField label="允许状态码" value={props.form.allowStatusCodes} onChange={(value) => patch({ allowStatusCodes: value })} />
<KeywordField label="拒绝状态码" value={props.form.denyStatusCodes} onChange={(value) => patch({ denyStatusCodes: value })} />
</div>
)}
{activeStrategy === 'hardStop' && (
<div className="runtimePolicyRows runnerPolicyDetailRows">
<Toggle checked={props.form.hardStopEnabled} label="启用硬拒绝" onChange={(checked) => patch({ hardStopEnabled: checked })} />
<KeywordField label="硬拒绝分类" value={props.form.hardStopCategories} onChange={(value) => patch({ hardStopCategories: value })} />
<KeywordField label="硬拒绝错误码" value={props.form.hardStopCodes} onChange={(value) => patch({ hardStopCodes: value })} />
<KeywordField label="硬拒绝状态码" value={props.form.hardStopStatusCodes} onChange={(value) => patch({ hardStopStatusCodes: value })} />
<span className="runtimeFieldHint spanTwo">401/403 </span>
<KeywordField label="硬拒绝关键词" value={props.form.hardStopKeywords} onChange={(value) => patch({ hardStopKeywords: value })} />
</div>
)}
{activeStrategy === 'priorityDemote' && (
<div className="runtimePolicyRows runnerPolicyDetailRows">
<Toggle checked={props.form.priorityDemoteEnabled} label="启用优先级降级" onChange={(checked) => patch({ priorityDemoteEnabled: checked })} />
<Label>
<Input value={props.form.priorityDemoteStep} inputMode="numeric" onChange={(event) => patch({ priorityDemoteStep: event.target.value })} />
<span className="runtimeFieldHint"> 100</span>
</Label>
<KeywordField label="降级分类" value={props.form.priorityDemoteCategories} onChange={(value) => patch({ priorityDemoteCategories: value })} />
<KeywordField label="降级错误码" value={props.form.priorityDemoteCodes} onChange={(value) => patch({ priorityDemoteCodes: value })} />
<KeywordField label="降级状态码" value={props.form.priorityDemoteStatusCodes} onChange={(value) => patch({ priorityDemoteStatusCodes: value })} />
<KeywordField label="降级关键词" value={props.form.priorityDemoteKeywords} onChange={(value) => patch({ priorityDemoteKeywords: value })} />
</div>
)}
</div>
</div>
<Label className="spanTwo"> JSON<Textarea value={props.form.metadataJson} rows={4} onChange={(event) => patch({ metadataJson: event.target.value })} /></Label>
<div className="runtimePolicyActions spanTwo">
<Button type="submit" disabled={props.loading}>
<Save size={15} />
</Button>
</div>
</form>
</CardContent>
</Card>
);
}
function KeywordField(props: { label: string; value: string[]; onChange: (value: string[]) => void }) {
const options = props.value.map((item) => ({ label: item, value: item }));
return (
<Label className="spanTwo runtimeTagField">
{props.label}
<Input value={props.value} placeholder="多个关键词用逗号或换行分隔" onChange={(event) => props.onChange(event.target.value)} />
<AntSelect
allowClear
className="runtimeTagInput"
maxTagCount="responsive"
mode="tags"
options={options}
placeholder="输入后回车生成标签"
tokenSeparators={[',', '\n']}
value={props.value}
onChange={(value) => props.onChange(cleanTags(value))}
/>
</Label>
);
}
function runnerPolicyToForm(policy: GatewayRunnerPolicy | null): RunnerPolicyForm {
const failover = readObject(policy?.failoverPolicy);
const hardStop = readObject(policy?.hardStopPolicy);
const priorityDemote = readObject(policy?.priorityDemotePolicy);
return {
name: policy?.name ?? '默认全局调度策略',
description: policy?.description ?? '控制多个候选平台之间的故障切换;模型运行策略只可覆盖 failoverPolicy不能覆盖 hardStopPolicy。',
failoverEnabled: readBool(failover.enabled, true),
maxPlatforms: String(readNumber(failover.maxPlatforms, 99)),
maxDurationSeconds: String(readNumber(failover.maxDurationSeconds, 600)),
allowCategories: tagsFromValue(failover.allowCategories ?? ['network', 'timeout', 'stream_error', 'rate_limit', 'provider_5xx', 'provider_overloaded', 'auth_error']),
denyCategories: tagsFromValue(failover.denyCategories ?? ['request_error', 'unsupported_model', 'user_permission', 'insufficient_balance']),
allowCodes: tagsFromValue(failover.allowCodes ?? ['auth_failed', 'invalid_api_key', 'missing_credentials']),
denyCodes: tagsFromValue(failover.denyCodes ?? []),
allowKeywords: tagsFromValue(failover.allowKeywords ?? ['timeout', 'network', 'rate_limit', 'overloaded', 'temporarily_unavailable', 'server_error', 'auth_failed', 'invalid_api_key', 'missing_credentials', 'unauthorized', 'forbidden', '429', '5xx']),
denyKeywords: tagsFromValue(failover.denyKeywords ?? ['invalid_parameter', 'missing required', 'bad request']),
allowStatusCodes: tagsFromValue(failover.allowStatusCodes ?? [401, 403, 408, 429, 500, 502, 503, 504]),
denyStatusCodes: tagsFromValue(failover.denyStatusCodes ?? []),
failoverActions: Object.keys(readObject(failover.actions)).length > 0 ? readObject(failover.actions) : defaultFailoverActions(),
hardStopEnabled: readBool(hardStop.enabled, true),
hardStopCategories: tagsFromValue(hardStop.categories ?? ['request_error', 'unsupported_model', 'user_permission', 'insufficient_balance']),
hardStopCodes: tagsFromValue(hardStop.codes ?? ['bad_request', 'invalid_request', 'invalid_parameter', 'missing_required', 'unsupported_kind', 'unsupported_model', 'insufficient_balance', 'permission_denied']),
hardStopStatusCodes: tagsFromValue(hardStop.statusCodes ?? []),
hardStopKeywords: tagsFromValue(hardStop.keywords ?? ['invalid_parameter', 'missing required', 'bad request', 'insufficient balance']),
priorityDemoteEnabled: readBool(priorityDemote.enabled, true),
priorityDemoteStep: String(readNumber(priorityDemote.demoteStep, 100)),
priorityDemoteCategories: tagsFromValue(priorityDemote.categories ?? ['network', 'timeout', 'stream_error', 'rate_limit', 'provider_5xx', 'provider_overloaded']),
priorityDemoteCodes: tagsFromValue(priorityDemote.codes ?? ['network', 'timeout', 'stream_read_error', 'rate_limit', 'server_error', 'overloaded']),
priorityDemoteStatusCodes: tagsFromValue(priorityDemote.statusCodes ?? [408, 429, 500, 502, 503, 504]),
priorityDemoteKeywords: tagsFromValue(priorityDemote.keywords ?? ['timeout', 'network', 'rate_limit', 'overloaded', 'temporarily_unavailable', 'server_error', '429', '5xx']),
metadataJson: JSON.stringify(policy?.metadata ?? {}, null, 2),
status: policy?.status ?? 'active',
};
}
function runnerFormToPayload(form: RunnerPolicyForm): GatewayRunnerPolicyUpsertRequest {
return {
policyKey: 'default-runner-v1',
name: form.name.trim() || '默认全局调度策略',
description: form.description.trim() || undefined,
failoverPolicy: {
enabled: form.failoverEnabled,
maxPlatforms: positiveInt(form.maxPlatforms, 99),
maxDurationSeconds: positiveInt(form.maxDurationSeconds, 600),
allowCategories: cleanTags(form.allowCategories),
denyCategories: cleanTags(form.denyCategories),
allowCodes: cleanTags(form.allowCodes),
denyCodes: cleanTags(form.denyCodes),
allowKeywords: cleanTags(form.allowKeywords),
denyKeywords: cleanTags(form.denyKeywords),
allowStatusCodes: parseNumberTags(form.allowStatusCodes),
denyStatusCodes: parseNumberTags(form.denyStatusCodes),
actions: Object.keys(form.failoverActions).length > 0 ? form.failoverActions : defaultFailoverActions(),
},
hardStopPolicy: {
enabled: form.hardStopEnabled,
categories: cleanTags(form.hardStopCategories),
codes: cleanTags(form.hardStopCodes),
statusCodes: parseNumberTags(form.hardStopStatusCodes),
keywords: cleanTags(form.hardStopKeywords),
},
priorityDemotePolicy: {
enabled: form.priorityDemoteEnabled,
demoteStep: positiveInt(form.priorityDemoteStep, 100),
categories: cleanTags(form.priorityDemoteCategories),
codes: cleanTags(form.priorityDemoteCodes),
statusCodes: parseNumberTags(form.priorityDemoteStatusCodes),
keywords: cleanTags(form.priorityDemoteKeywords),
},
metadata: parseJson(form.metadataJson),
status: form.status.trim() || 'active',
};
}
function createDefaultForm(policyKey = 'default-runtime-v1'): RuntimePolicyForm {
return {
policyKey,
@ -237,19 +537,28 @@ function createDefaultForm(policyKey = 'default-runtime-v1'): RuntimePolicyForm
concurrency: '6',
retryEnabled: true,
retryMaxAttempts: '2',
retryAllowKeywords: 'rate_limit, timeout, server_error, network, 429, 5xx',
retryDenyKeywords: 'invalid_api_key, insufficient_quota, billing_not_active, permission_denied',
retryAllowKeywords: ['rate_limit', 'timeout', 'server_error', 'network', '429', '5xx'],
retryDenyKeywords: ['invalid_api_key', 'insufficient_quota', 'billing_not_active', 'permission_denied'],
autoDisableEnabled: false,
autoDisableThreshold: '3',
autoDisableKeywords: 'invalid_api_key, account_deactivated, permission_denied, billing_not_active',
autoDisableKeywords: ['invalid_api_key', 'account_deactivated', 'permission_denied', 'billing_not_active'],
degradeEnabled: true,
degradeCooldownSeconds: '300',
degradeKeywords: 'rate_limit, quota, timeout, temporarily_unavailable, overloaded',
degradeKeywords: ['rate_limit', 'quota', 'timeout', 'temporarily_unavailable', 'overloaded'],
metadataJson: '{}',
status: 'active',
};
}
function defaultFailoverActions(): Record<string, unknown> {
return {
auth_error: 'disable_and_next',
rate_limit: 'cooldown_and_next',
provider_5xx: 'next',
request_error: 'stop',
};
}
function policyToForm(policy: RuntimePolicySet): RuntimePolicyForm {
const rateRules = Array.isArray(policy.rateLimitPolicy?.rules) ? policy.rateLimitPolicy.rules : [];
const retry = readObject(policy.retryPolicy);
@ -264,14 +573,14 @@ function policyToForm(policy: RuntimePolicySet): RuntimePolicyForm {
concurrency: String(readRateLimit(rateRules, 'concurrent') || ''),
retryEnabled: readBool(retry.enabled, true),
retryMaxAttempts: String(readNumber(retry.maxAttempts, 2)),
retryAllowKeywords: stringifyKeywords(retry.allowKeywords),
retryDenyKeywords: stringifyKeywords(retry.denyKeywords),
retryAllowKeywords: tagsFromValue(retry.allowKeywords),
retryDenyKeywords: tagsFromValue(retry.denyKeywords),
autoDisableEnabled: readBool(disable.enabled, false),
autoDisableThreshold: String(readNumber(disable.threshold, 3)),
autoDisableKeywords: stringifyKeywords(disable.keywords),
autoDisableKeywords: tagsFromValue(disable.keywords),
degradeEnabled: readBool(degrade.enabled, true),
degradeCooldownSeconds: String(readNumber(degrade.cooldownSeconds, 300)),
degradeKeywords: stringifyKeywords(degrade.keywords),
degradeKeywords: tagsFromValue(degrade.keywords),
metadataJson: JSON.stringify(policy.metadata ?? {}, null, 2),
status: policy.status || 'active',
};
@ -286,18 +595,18 @@ function formToPayload(form: RuntimePolicyForm): RuntimePolicySetUpsertRequest {
retryPolicy: {
enabled: form.retryEnabled,
maxAttempts: positiveInt(form.retryMaxAttempts, 2),
allowKeywords: parseKeywords(form.retryAllowKeywords),
denyKeywords: parseKeywords(form.retryDenyKeywords),
allowKeywords: cleanTags(form.retryAllowKeywords),
denyKeywords: cleanTags(form.retryDenyKeywords),
},
autoDisablePolicy: {
enabled: form.autoDisableEnabled,
threshold: positiveInt(form.autoDisableThreshold, 3),
keywords: parseKeywords(form.autoDisableKeywords),
keywords: cleanTags(form.autoDisableKeywords),
},
degradePolicy: {
enabled: form.degradeEnabled,
cooldownSeconds: positiveInt(form.degradeCooldownSeconds, 300),
keywords: parseKeywords(form.degradeKeywords),
keywords: cleanTags(form.degradeKeywords),
},
metadata: parseJson(form.metadataJson),
status: form.status.trim() || 'active',
@ -332,7 +641,7 @@ function rateLimitSummary(policy: RuntimePolicySet) {
function retrySummary(policy: RuntimePolicySet) {
const retry = readObject(policy.retryPolicy);
return readBool(retry.enabled, false) ? `${readNumber(retry.maxAttempts, 2)}` : '不重试';
return readBool(retry.enabled, false) ? `同平台尝${readNumber(retry.maxAttempts, 2)}` : '同平台不重试';
}
function autoDisableSummary(policy: RuntimePolicySet) {
@ -350,12 +659,31 @@ function readRateLimit(rules: unknown[], metric: string) {
return readNumber(readObject(rule).limit, 0);
}
function parseKeywords(value: string) {
return value.split(/[,\n]/).map((item) => item.trim()).filter(Boolean);
function stringifyKeywords(value: unknown) {
return tagsFromValue(value).join(', ');
}
function stringifyKeywords(value: unknown) {
return Array.isArray(value) ? value.map(String).join(', ') : '';
function tagsFromValue(value: unknown) {
if (Array.isArray(value)) return cleanTags(value.map((item) => String(item)));
return typeof value === 'string' ? cleanTags([value]) : [];
}
function cleanTags(value: string[]) {
const tags: string[] = [];
const seen = new Set<string>();
for (const raw of value) {
for (const item of raw.split(/[,\n]/)) {
const tag = item.trim();
if (!tag || seen.has(tag)) continue;
seen.add(tag);
tags.push(tag);
}
}
return tags;
}
function parseNumberTags(value: string[]) {
return cleanTags(value).map((item) => Number.parseInt(item, 10)).filter((item) => Number.isFinite(item) && item > 0);
}
function positiveInt(value: string, fallback: number) {

View File

@ -424,6 +424,19 @@ strong {
overflow-wrap: anywhere;
}
.taskRecordAttemptTrace {
display: grid;
gap: 0.25rem;
padding-top: 0.1rem;
}
.taskRecordAttemptTraceItem {
color: var(--text-soft);
font-size: var(--font-size-xs);
line-height: 1.45;
overflow-wrap: anywhere;
}
.taskRecordJsonButton {
width: 100%;
justify-content: flex-start;

View File

@ -1536,6 +1536,70 @@
gap: 10px;
}
.runtimeTagInput {
width: 100%;
}
.runtimeTagInput .ant-select-selector {
min-height: 2.375rem;
border-color: var(--border) !important;
border-radius: 0.5rem !important;
background: var(--background) !important;
box-shadow: none !important;
}
.runtimeTagInput .ant-select-selection-placeholder {
color: var(--muted-foreground);
}
.runtimeTagInput .ant-select-selection-item {
border-color: var(--border);
border-radius: 999px;
background: var(--surface-subtle);
}
.runtimeFieldHint {
color: var(--muted-foreground);
font-size: var(--font-size-xs);
font-weight: var(--font-weight-regular);
line-height: 1.45;
}
.runnerPolicyForm {
padding: 0;
background: transparent;
}
.runnerPolicyHeaderText {
min-width: 0;
}
.runnerPolicyHeaderText .mutedText {
margin: 4px 0 0;
}
.runnerPolicyHeaderStatus {
min-width: 220px;
max-width: 240px;
}
.runnerPolicyWorkbench {
grid-template-columns: 240px minmax(0, 1fr);
}
.runnerPolicyDetailPanel {
min-height: 360px;
}
.runnerPolicyDetailRows {
padding: 8px 10px;
}
.runtimePolicyActions {
display: flex;
justify-content: flex-end;
}
@media (max-width: 1180px) {
.modelCards,
.providerCatalogGrid,
@ -1549,6 +1613,16 @@
}
@media (max-width: 860px) {
.runnerPolicyHeader {
align-items: flex-start;
flex-direction: column;
}
.runnerPolicyHeaderStatus {
width: 100%;
max-width: none;
}
.subPageLayout,
.modelsPage {
grid-template-columns: 1fr;

View File

@ -266,10 +266,36 @@ export interface RuntimePolicySetUpsertRequest {
export interface RuntimePolicyOverride {
rateLimitPolicy?: RateLimitPolicy | Record<string, unknown>;
retryPolicy?: Record<string, unknown>;
failoverPolicy?: Record<string, unknown>;
autoDisablePolicy?: Record<string, unknown>;
degradePolicy?: Record<string, unknown>;
}
export interface GatewayRunnerPolicy {
id: string;
policyKey: string;
name: string;
description?: string;
failoverPolicy?: Record<string, unknown>;
hardStopPolicy?: Record<string, unknown>;
priorityDemotePolicy?: Record<string, unknown>;
metadata?: Record<string, unknown>;
status: 'active' | 'disabled' | string;
createdAt: string;
updatedAt: string;
}
export interface GatewayRunnerPolicyUpsertRequest {
policyKey?: string;
name?: string;
description?: string;
failoverPolicy?: Record<string, unknown>;
hardStopPolicy?: Record<string, unknown>;
priorityDemotePolicy?: Record<string, unknown>;
metadata?: Record<string, unknown>;
status?: 'active' | 'disabled' | string;
}
export interface GatewayUser {
id: string;
userKey: string;
@ -787,6 +813,7 @@ export interface GatewayTaskAttempt {
retryable: boolean;
simulated: boolean;
requestId?: string;
statusCode?: number;
usage?: Record<string, unknown>;
metrics?: Record<string, unknown>;
requestSnapshot?: Record<string, unknown>;