From 05632172d0971244c2851421335ae84c441bc3f9 Mon Sep 17 00:00:00 2001 From: wangbo Date: Tue, 12 May 2026 02:16:42 +0800 Subject: [PATCH] feat: add runner failover policies and traces --- .../httpapi/runtime_policy_handlers.go | 25 + apps/api/internal/httpapi/server.go | 2 + apps/api/internal/runner/recording.go | 12 +- apps/api/internal/runner/retry_decision.go | 446 +++++++++++++++++ .../internal/runner/retry_decision_test.go | 152 ++++++ apps/api/internal/runner/runtime_policy.go | 40 ++ apps/api/internal/runner/service.go | 243 +++++++-- apps/api/internal/runner/trace.go | 128 +++++ apps/api/internal/store/postgres.go | 20 + apps/api/internal/store/runner_policies.go | 168 +++++++ apps/api/internal/store/runtime_policies.go | 22 + apps/api/internal/store/tasks_runtime.go | 42 ++ apps/api/migrations/0026_runner_policies.sql | 73 +++ .../0027_runner_policy_failover_budget.sql | 17 + ...8_runner_policy_platform_auth_failover.sql | 22 + apps/web/src/App.tsx | 21 +- apps/web/src/api.ts | 17 + apps/web/src/app-state.ts | 2 + .../hooks/useRuntimePolicySetOperations.ts | 23 +- apps/web/src/pages/AdminPage.tsx | 4 + apps/web/src/pages/WorkspacePage.tsx | 104 +++- .../pages/admin/PlatformManagementPanel.tsx | 8 +- .../src/pages/admin/RuntimePoliciesPanel.tsx | 468 +++++++++++++++--- apps/web/src/styles.css | 13 + apps/web/src/styles/pages.css | 74 +++ packages/contracts/src/index.ts | 27 + 26 files changed, 2033 insertions(+), 140 deletions(-) create mode 100644 apps/api/internal/runner/retry_decision.go create mode 100644 apps/api/internal/runner/retry_decision_test.go create mode 100644 apps/api/internal/runner/trace.go create mode 100644 apps/api/internal/store/runner_policies.go create mode 100644 apps/api/migrations/0026_runner_policies.sql create mode 100644 apps/api/migrations/0027_runner_policy_failover_budget.sql create mode 100644 apps/api/migrations/0028_runner_policy_platform_auth_failover.sql diff --git a/apps/api/internal/httpapi/runtime_policy_handlers.go b/apps/api/internal/httpapi/runtime_policy_handlers.go index 716fd24..236f76b 100644 --- a/apps/api/internal/httpapi/runtime_policy_handlers.go +++ b/apps/api/internal/httpapi/runtime_policy_handlers.go @@ -19,6 +19,31 @@ func (s *Server) listRuntimePolicySets(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]any{"items": items}) } +func (s *Server) getRunnerPolicy(w http.ResponseWriter, r *http.Request) { + item, err := s.store.GetActiveRunnerPolicy(r.Context()) + if err != nil { + s.logger.Error("get runner policy failed", "error", err) + writeError(w, http.StatusInternalServerError, "get runner policy failed") + return + } + writeJSON(w, http.StatusOK, item) +} + +func (s *Server) updateRunnerPolicy(w http.ResponseWriter, r *http.Request) { + var input store.RunnerPolicyInput + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + writeError(w, http.StatusBadRequest, "invalid json body") + return + } + item, err := s.store.UpsertDefaultRunnerPolicy(r.Context(), input) + if err != nil { + s.logger.Error("update runner policy failed", "error", err) + writeError(w, http.StatusInternalServerError, "update runner policy failed") + return + } + writeJSON(w, http.StatusOK, item) +} + func (s *Server) createRuntimePolicySet(w http.ResponseWriter, r *http.Request) { var input store.RuntimePolicySetInput if err := json.NewDecoder(r.Body).Decode(&input); err != nil { diff --git a/apps/api/internal/httpapi/server.go b/apps/api/internal/httpapi/server.go index 713ba76..15156e6 100644 --- a/apps/api/internal/httpapi/server.go +++ b/apps/api/internal/httpapi/server.go @@ -90,6 +90,8 @@ func NewServer(cfg config.Config, db *store.Store, logger *slog.Logger) http.Han mux.Handle("POST /api/admin/runtime/policy-sets", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.createRuntimePolicySet))) mux.Handle("PATCH /api/admin/runtime/policy-sets/{policySetID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updateRuntimePolicySet))) mux.Handle("DELETE /api/admin/runtime/policy-sets/{policySetID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.deleteRuntimePolicySet))) + mux.Handle("GET /api/admin/runtime/runner-policy", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.getRunnerPolicy))) + mux.Handle("PATCH /api/admin/runtime/runner-policy", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updateRunnerPolicy))) mux.Handle("GET /api/admin/config/network-proxy", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.getNetworkProxyConfig))) mux.Handle("GET /api/admin/platforms", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listPlatforms))) mux.Handle("POST /api/admin/platforms", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.createPlatform))) diff --git a/apps/api/internal/runner/recording.go b/apps/api/internal/runner/recording.go index 1ed1a54..d70a46c 100644 --- a/apps/api/internal/runner/recording.go +++ b/apps/api/internal/runner/recording.go @@ -182,9 +182,12 @@ func failureMetrics(err error, simulated bool) (string, map[string]any, time.Tim metrics := map[string]any{ "simulated": simulated, } + retryable := clients.IsRetryable(err) if err != nil { + info := failureInfoFromError(err) metrics["error"] = err.Error() - metrics["retryable"] = clients.IsRetryable(err) + metrics["errorCategory"] = info.Category + metrics["retryable"] = retryable } if meta.StatusCode > 0 { metrics["statusCode"] = meta.StatusCode @@ -195,6 +198,9 @@ func failureMetrics(err error, simulated bool) (string, map[string]any, time.Tim if meta.ResponseDurationMS > 0 { metrics["responseDurationMs"] = meta.ResponseDurationMS } + if err != nil { + metrics["trace"] = []any{failureTraceEntry(err, retryable)} + } return meta.RequestID, metrics, meta.ResponseStartedAt, meta.ResponseFinishedAt, meta.ResponseDurationMS } @@ -227,12 +233,16 @@ func summarizeAttempts(attempts []store.TaskAttempt) []map[string]any { "requestId": attempt.RequestID, "retryable": attempt.Retryable, "simulated": attempt.Simulated, + "statusCode": attempt.StatusCode, "errorCode": attempt.ErrorCode, "errorMessage": attempt.ErrorMessage, "responseDurationMs": attempt.ResponseDurationMS, "startedAt": attempt.StartedAt, "finishedAt": attempt.FinishedAt, } + if trace, ok := attempt.Metrics["trace"]; ok { + item["trace"] = trace + } items = append(items, item) } return items diff --git a/apps/api/internal/runner/retry_decision.go b/apps/api/internal/runner/retry_decision.go new file mode 100644 index 0000000..b32daaa --- /dev/null +++ b/apps/api/internal/runner/retry_decision.go @@ -0,0 +1,446 @@ +package runner + +import ( + "fmt" + "strings" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/clients" + "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" +) + +type failureInfo struct { + Code string + Message string + Status int + Category string + Target string +} + +type policyRuleMatch struct { + Source string + Policy string + Rule string + Value string +} + +type retryDecision struct { + Retry bool + Reason string + Match policyRuleMatch + Info failureInfo +} + +type failoverDecision struct { + Retry bool + Action string + Reason string + CooldownSeconds int + Match policyRuleMatch + Info failureInfo +} + +type priorityDemoteDecision struct { + Demote bool + Reason string + Step int + Match policyRuleMatch + Info failureInfo +} + +func shouldRetrySameClient(candidate store.RuntimeModelCandidate, err error) bool { + return retryDecisionForCandidate(candidate, err).Retry +} + +func retryDecisionForCandidate(candidate store.RuntimeModelCandidate, err error) retryDecision { + policy := effectiveRetryPolicy(candidate) + info := failureInfoFromError(err) + if !boolFromPolicy(policy, "enabled", true) { + return retryDecision{Retry: false, Reason: "retry_disabled", Match: policyRuleMatch{Source: "model_runtime_policy_sets.retry_policy", Policy: "retryPolicy", Rule: "enabled", Value: "false"}, Info: info} + } + if match, ok := retryPolicyDenyMatch(policy, info, "model_runtime_policy_sets.retry_policy", "retryPolicy"); ok { + return retryDecision{Retry: false, Reason: "retry_deny_policy", Match: match, Info: info} + } + if match, ok := retryPolicyAllowMatch(policy, info, "model_runtime_policy_sets.retry_policy", "retryPolicy"); ok { + return retryDecision{Retry: true, Reason: "retry_allow_policy", Match: match, Info: info} + } + if clients.IsRetryable(err) { + return retryDecision{Retry: true, Reason: "client_retryable", Match: policyRuleMatch{Source: "provider_client", Policy: "ClientError", Rule: "Retryable", Value: "true"}, Info: info} + } + return retryDecision{Retry: false, Reason: "client_non_retryable", Match: policyRuleMatch{Source: "provider_client", Policy: "ClientError", Rule: "Retryable", Value: "false"}, Info: info} +} + +func failoverDecisionForCandidate(runnerPolicy store.RunnerPolicy, candidate store.RuntimeModelCandidate, err error) failoverDecision { + info := failureInfoFromError(err) + if strings.TrimSpace(runnerPolicy.Status) != "" && runnerPolicy.Status != "active" { + return failoverDecision{Retry: false, Action: "stop", Reason: "runner_policy_disabled", Match: policyRuleMatch{Source: "gateway_runner_policies", Policy: "runnerPolicy", Rule: "status", Value: runnerPolicy.Status}, Info: info} + } + if match, ok := hardStopPolicyMatch(runnerPolicy.HardStopPolicy, info); ok { + return failoverDecision{Retry: false, Action: "stop", Reason: "hard_stop_policy", Match: match, Info: info} + } + overridePolicy := failoverOverridePolicy(candidate.RuntimePolicyOverride) + policy := effectiveFailoverPolicy(runnerPolicy.FailoverPolicy, candidate.RuntimePolicyOverride) + if !boolFromPolicy(policy, "enabled", true) { + source := "gateway_runner_policies.failover_policy" + if _, ok := overridePolicy["enabled"]; ok { + source = "runtime_policy_override.failoverPolicy" + } + return failoverDecision{Retry: false, Action: "stop", Reason: "failover_disabled", Match: policyRuleMatch{Source: source, Policy: "failoverPolicy", Rule: "enabled", Value: "false"}, Info: info} + } + if match, ok := failoverDenyMatchWithSources(runnerPolicy.FailoverPolicy, overridePolicy, info); ok { + return failoverDecision{Retry: false, Action: "stop", Reason: "failover_deny_policy", Match: match, Info: info} + } + action := failoverAction(policy, info) + cooldownSeconds := intFromPolicy(policy, "cooldownSeconds") + if cooldownSeconds <= 0 { + cooldownSeconds = 300 + } + if match, ok := failoverAllowMatchWithSources(runnerPolicy.FailoverPolicy, overridePolicy, info); ok { + return failoverDecision{Retry: true, Action: action, Reason: "failover_allow_policy", CooldownSeconds: cooldownSeconds, Match: match, Info: info} + } + if clients.IsRetryable(err) { + return failoverDecision{Retry: true, Action: action, Reason: "client_retryable", CooldownSeconds: cooldownSeconds, Match: policyRuleMatch{Source: "provider_client", Policy: "ClientError", Rule: "Retryable", Value: "true"}, Info: info} + } + return failoverDecision{Retry: false, Action: "stop", Reason: "client_non_retryable", Match: policyRuleMatch{Source: "provider_client", Policy: "ClientError", Rule: "Retryable", Value: "false"}, Info: info} +} + +func shouldDemoteCandidatePriority(runnerPolicy store.RunnerPolicy, err error) (bool, int) { + decision := priorityDemoteDecisionForCandidate(runnerPolicy, err) + return decision.Demote, decision.Step +} + +func priorityDemoteDecisionForCandidate(runnerPolicy store.RunnerPolicy, err error) priorityDemoteDecision { + info := failureInfoFromError(err) + if strings.TrimSpace(runnerPolicy.Status) != "" && runnerPolicy.Status != "active" { + return priorityDemoteDecision{Demote: false, Reason: "runner_policy_disabled", Info: info} + } + if hardStopPolicyMatches(runnerPolicy.HardStopPolicy, info) { + return priorityDemoteDecision{Demote: false, Reason: "hard_stop_policy", Info: info} + } + policy := runnerPolicy.PriorityDemotePolicy + if !boolFromPolicy(policy, "enabled", false) { + return priorityDemoteDecision{Demote: false, Reason: "priority_demote_disabled", Info: info} + } + if match, ok := priorityDemotePolicyMatch(policy, info); ok { + step := intFromPolicy(policy, "demoteStep") + if step <= 0 { + step = 100 + } + return priorityDemoteDecision{Demote: true, Reason: "priority_demote_policy", Step: step, Match: match, Info: info} + } + return priorityDemoteDecision{Demote: false, Reason: "priority_demote_no_match", Info: info} +} + +func effectiveFailoverPolicy(base map[string]any, override map[string]any) map[string]any { + policy := base + if nested := failoverOverridePolicy(override); len(nested) > 0 { + policy = mergeMap(policy, nested) + } + return policy +} + +func failoverOverridePolicy(override map[string]any) map[string]any { + if nested, ok := override["failoverPolicy"].(map[string]any); ok { + return nested + } + return nil +} + +func failureInfoFromError(err error) failureInfo { + code := strings.ToLower(strings.TrimSpace(clients.ErrorCode(err))) + message := "" + if err != nil { + message = err.Error() + } + status := clients.ErrorResponseMetadata(err).StatusCode + category := failureCategory(code, status, message) + target := strings.ToLower(strings.TrimSpace(fmt.Sprintf("%s %s %d %s", code, category, status, message))) + return failureInfo{ + Code: code, + Message: message, + Status: status, + Category: category, + Target: target, + } +} + +func failureCategory(code string, status int, message string) string { + target := strings.ToLower(code + " " + message) + switch { + case code == "insufficient_balance": + return "insufficient_balance" + case code == "rate_limit" || status == 429: + return "rate_limit" + case code == "network": + return "network" + case code == "timeout" || status == 408 || strings.Contains(target, "timeout"): + return "timeout" + case code == "stream_read_error": + return "stream_error" + case code == "overloaded" || strings.Contains(target, "overloaded"): + return "provider_overloaded" + case status >= 500 || code == "server_error": + return "provider_5xx" + case code == "permission_denied": + return "user_permission" + case code == "auth_failed" || code == "invalid_api_key" || code == "missing_credentials" || status == 401 || status == 403 || providerAuthMessage(target): + return "auth_error" + case strings.Contains(code, "unsupported"): + return "unsupported_model" + case status == 400 || code == "bad_request" || code == "invalid_request" || code == "invalid_parameter" || code == "missing_required": + return "request_error" + case status > 400 && status < 500: + return "request_error" + default: + return "client_error" + } +} + +func providerAuthMessage(target string) bool { + return strings.Contains(target, "api key") || + strings.Contains(target, "apikey") || + strings.Contains(target, "unauthorized") || + strings.Contains(target, "authentication") || + strings.Contains(target, "auth failed") || + strings.Contains(target, "credential") +} + +func hardStopPolicyMatches(policy map[string]any, info failureInfo) bool { + _, ok := hardStopPolicyMatch(policy, info) + return ok +} + +func hardStopPolicyMatch(policy map[string]any, info failureInfo) (policyRuleMatch, bool) { + if !boolFromPolicy(policy, "enabled", true) { + return policyRuleMatch{}, false + } + return firstPolicyMatch(policy, info, "gateway_runner_policies.hard_stop_policy", "hardStopPolicy", []policyMatchSpec{ + {Key: "categories", Value: info.Category, Kind: "string"}, + {Key: "codes", Value: info.Code, Kind: "string"}, + {Key: "statusCodes", IntValue: info.Status, Kind: "int"}, + {Key: "keywords", Value: info.Target, Kind: "keyword"}, + }) +} + +func retryPolicyDenyMatches(policy map[string]any, info failureInfo) bool { + _, ok := retryPolicyDenyMatch(policy, info, "", "") + return ok +} + +func retryPolicyDenyMatch(policy map[string]any, info failureInfo, source string, policyName string) (policyRuleMatch, bool) { + return firstPolicyMatch(policy, info, firstNonEmptyString(source, "effective_retry_policy"), firstNonEmptyString(policyName, "retryPolicy"), []policyMatchSpec{ + {Key: "denyCategories", Value: info.Category, Kind: "string"}, + {Key: "denyCodes", Value: info.Code, Kind: "string"}, + {Key: "denyStatusCodes", IntValue: info.Status, Kind: "int"}, + {Key: "denyKeywords", Value: info.Target, Kind: "keyword"}, + }) +} + +func retryPolicyAllowMatches(policy map[string]any, info failureInfo) bool { + _, ok := retryPolicyAllowMatch(policy, info, "", "") + return ok +} + +func retryPolicyAllowMatch(policy map[string]any, info failureInfo, source string, policyName string) (policyRuleMatch, bool) { + return firstPolicyMatch(policy, info, firstNonEmptyString(source, "effective_retry_policy"), firstNonEmptyString(policyName, "retryPolicy"), []policyMatchSpec{ + {Key: "allowCategories", Value: info.Category, Kind: "string"}, + {Key: "allowCodes", Value: info.Code, Kind: "string"}, + {Key: "allowStatusCodes", IntValue: info.Status, Kind: "int"}, + {Key: "allowKeywords", Value: info.Target, Kind: "keyword"}, + }) +} + +func failoverDenyMatches(policy map[string]any, info failureInfo) bool { + _, ok := failoverDenyMatch(policy, info) + return ok +} + +func failoverDenyMatch(policy map[string]any, info failureInfo) (policyRuleMatch, bool) { + return retryPolicyDenyMatch(policy, info, "gateway_runner_policies.failover_policy", "failoverPolicy") +} + +func failoverDenyMatchWithSources(base map[string]any, override map[string]any, info failureInfo) (policyRuleMatch, bool) { + return retryPolicyMatchWithSources(base, override, "gateway_runner_policies.failover_policy", "runtime_policy_override.failoverPolicy", "failoverPolicy", []policyMatchSpec{ + {Key: "denyCategories", Value: info.Category, Kind: "string"}, + {Key: "denyCodes", Value: info.Code, Kind: "string"}, + {Key: "denyStatusCodes", IntValue: info.Status, Kind: "int"}, + {Key: "denyKeywords", Value: info.Target, Kind: "keyword"}, + }) +} + +func failoverAllowMatches(policy map[string]any, info failureInfo) bool { + _, ok := failoverAllowMatch(policy, info) + return ok +} + +func failoverAllowMatch(policy map[string]any, info failureInfo) (policyRuleMatch, bool) { + return retryPolicyAllowMatch(policy, info, "gateway_runner_policies.failover_policy", "failoverPolicy") +} + +func failoverAllowMatchWithSources(base map[string]any, override map[string]any, info failureInfo) (policyRuleMatch, bool) { + return retryPolicyMatchWithSources(base, override, "gateway_runner_policies.failover_policy", "runtime_policy_override.failoverPolicy", "failoverPolicy", []policyMatchSpec{ + {Key: "allowCategories", Value: info.Category, Kind: "string"}, + {Key: "allowCodes", Value: info.Code, Kind: "string"}, + {Key: "allowStatusCodes", IntValue: info.Status, Kind: "int"}, + {Key: "allowKeywords", Value: info.Target, Kind: "keyword"}, + }) +} + +func priorityDemotePolicyMatch(policy map[string]any, info failureInfo) (policyRuleMatch, bool) { + return firstPolicyMatch(policy, info, "gateway_runner_policies.priority_demote_policy", "priorityDemotePolicy", []policyMatchSpec{ + {Key: "categories", Value: info.Category, Kind: "string"}, + {Key: "codes", Value: info.Code, Kind: "string"}, + {Key: "statusCodes", IntValue: info.Status, Kind: "int"}, + {Key: "keywords", Value: info.Target, Kind: "keyword"}, + }) +} + +func failoverAction(policy map[string]any, info failureInfo) string { + actions, _ := policy["actions"].(map[string]any) + if action := stringFromAny(actions[info.Category]); action != "" { + return action + } + return "next" +} + +func boolFromPolicy(policy map[string]any, key string, fallback bool) bool { + value, ok := policy[key].(bool) + if !ok { + return fallback + } + return value +} + +type policyMatchSpec struct { + Key string + Kind string + Value string + IntValue int +} + +func firstPolicyMatch(policy map[string]any, info failureInfo, source string, policyName string, specs []policyMatchSpec) (policyRuleMatch, bool) { + for _, spec := range specs { + switch spec.Kind { + case "string": + if value, ok := matchingStringListValue(policy, spec.Key, spec.Value); ok { + return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: value}, true + } + case "int": + if value, ok := matchingIntListValue(policy, spec.Key, spec.IntValue); ok { + return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: fmt.Sprintf("%d", value)}, true + } + case "keyword": + if value, ok := matchingKeywordValue(policy, spec.Key, spec.Value); ok { + return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: value}, true + } + } + } + return policyRuleMatch{}, false +} + +func retryPolicyMatchWithSources(base map[string]any, override map[string]any, baseSource string, overrideSource string, policyName string, specs []policyMatchSpec) (policyRuleMatch, bool) { + for _, spec := range specs { + if _, ok := override[spec.Key]; ok { + if match, matched := policyMatchSpecValue(override, spec, overrideSource, policyName); matched { + return match, true + } + continue + } + if match, matched := policyMatchSpecValue(base, spec, baseSource, policyName); matched { + return match, true + } + } + return policyRuleMatch{}, false +} + +func policyMatchSpecValue(policy map[string]any, spec policyMatchSpec, source string, policyName string) (policyRuleMatch, bool) { + switch spec.Kind { + case "string": + if value, ok := matchingStringListValue(policy, spec.Key, spec.Value); ok { + return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: value}, true + } + case "int": + if value, ok := matchingIntListValue(policy, spec.Key, spec.IntValue); ok { + return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: fmt.Sprintf("%d", value)}, true + } + case "keyword": + if value, ok := matchingKeywordValue(policy, spec.Key, spec.Value); ok { + return policyRuleMatch{Source: source, Policy: policyName, Rule: spec.Key, Value: value}, true + } + } + return policyRuleMatch{}, false +} + +func stringListContains(policy map[string]any, key string, value string) bool { + _, ok := matchingStringListValue(policy, key, value) + return ok +} + +func matchingStringListValue(policy map[string]any, key string, value string) (string, bool) { + value = strings.ToLower(strings.TrimSpace(value)) + if value == "" { + return "", false + } + for _, item := range stringListFromPolicy(policy, key) { + item = strings.TrimSpace(item) + if strings.ToLower(item) == value { + return item, true + } + } + return "", false +} + +func keywordListMatches(policy map[string]any, key string, target string) bool { + _, ok := matchingKeywordValue(policy, key, target) + return ok +} + +func matchingKeywordValue(policy map[string]any, key string, target string) (string, bool) { + target = strings.ToLower(strings.TrimSpace(target)) + if target == "" { + return "", false + } + for _, keyword := range stringListFromPolicy(policy, key) { + keyword = strings.TrimSpace(keyword) + if keyword != "" && strings.Contains(target, strings.ToLower(keyword)) { + return keyword, true + } + } + return "", false +} + +func intListContains(policy map[string]any, key string, value int) bool { + _, ok := matchingIntListValue(policy, key, value) + return ok +} + +func matchingIntListValue(policy map[string]any, key string, value int) (int, bool) { + if value == 0 { + return 0, false + } + for _, item := range intListFromPolicy(policy, key) { + if item == value { + return item, true + } + } + return 0, false +} + +func intListFromPolicy(policy map[string]any, key string) []int { + raw, ok := policy[key].([]any) + if !ok { + if typed, ok := policy[key].([]int); ok { + return typed + } + return nil + } + out := make([]int, 0, len(raw)) + for _, item := range raw { + switch typed := item.(type) { + case int: + out = append(out, typed) + case float64: + out = append(out, int(typed)) + } + } + return out +} diff --git a/apps/api/internal/runner/retry_decision_test.go b/apps/api/internal/runner/retry_decision_test.go new file mode 100644 index 0000000..ee1aca8 --- /dev/null +++ b/apps/api/internal/runner/retry_decision_test.go @@ -0,0 +1,152 @@ +package runner + +import ( + "testing" + "time" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/clients" + "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" +) + +func TestShouldRetrySameClientUsesRuntimeRetryPolicyKeywords(t *testing.T) { + candidate := store.RuntimeModelCandidate{ + ModelRetryPolicy: map[string]any{ + "enabled": true, + "allowKeywords": []any{"temporary vendor blip"}, + "denyKeywords": []any{"bad request"}, + }, + } + + if shouldRetrySameClient(candidate, &clients.ClientError{Code: "bad_request", Message: "bad request timeout", Retryable: true}) { + t.Fatal("deny keywords should block same-client retry even when the client marks the error retryable") + } + if !shouldRetrySameClient(candidate, &clients.ClientError{Code: "custom_error", Message: "temporary vendor blip", Retryable: false}) { + t.Fatal("allow keywords should allow same-client retry when the client does not mark the error retryable") + } +} + +func TestFailoverBudgetDefaults(t *testing.T) { + candidates := make([]store.RuntimeModelCandidate, 150) + runnerPolicy := store.RunnerPolicy{Status: "active", FailoverPolicy: map[string]any{"enabled": true}} + + if got := maxPlatformsForCandidates(candidates, runnerPolicy); got != 99 { + t.Fatalf("default max platform budget should be 99, got %d", got) + } + if got := maxFailoverDurationForCandidates(candidates, runnerPolicy); got != 10*time.Minute { + t.Fatalf("default max failover duration should be 10 minutes, got %s", got) + } +} + +func TestFailoverTimeBudgetExceeded(t *testing.T) { + if !failoverTimeBudgetExceeded(time.Now().Add(-601*time.Second), 10*time.Minute) { + t.Fatal("failover time budget should stop retries after the configured duration") + } + if failoverTimeBudgetExceeded(time.Now().Add(-590*time.Second), 10*time.Minute) { + t.Fatal("failover time budget should allow retries before the configured duration") + } +} + +func TestFailoverHardStopBeatsModelOverride(t *testing.T) { + runnerPolicy := store.RunnerPolicy{ + Status: "active", + FailoverPolicy: map[string]any{ + "enabled": true, + "allowCategories": []any{"request_error"}, + }, + HardStopPolicy: map[string]any{ + "enabled": true, + "categories": []any{"request_error"}, + }, + } + candidate := store.RuntimeModelCandidate{ + RuntimePolicyOverride: map[string]any{ + "failoverPolicy": map[string]any{ + "enabled": true, + "allowCategories": []any{"request_error"}, + }, + }, + } + + decision := failoverDecisionForCandidate(runnerPolicy, candidate, &clients.ClientError{Code: "bad_request", StatusCode: 400, Retryable: true}) + if decision.Retry || decision.Reason != "hard_stop_policy" { + t.Fatalf("hard stop should block model-level failover override, got %+v", decision) + } +} + +func TestFailoverPolicyAllowsModelOverride(t *testing.T) { + runnerPolicy := store.RunnerPolicy{ + Status: "active", + FailoverPolicy: map[string]any{"enabled": true}, + HardStopPolicy: map[string]any{"enabled": true, "categories": []any{"request_error"}}, + } + candidate := store.RuntimeModelCandidate{ + RuntimePolicyOverride: map[string]any{ + "failoverPolicy": map[string]any{ + "allowKeywords": []any{"temporary upstream outage"}, + }, + }, + } + + decision := failoverDecisionForCandidate(runnerPolicy, candidate, &clients.ClientError{Code: "custom_error", Message: "temporary upstream outage", Retryable: false}) + if !decision.Retry || decision.Reason != "failover_allow_policy" { + t.Fatalf("model failoverPolicy override should allow cross-platform failover, got %+v", decision) + } +} + +func TestProviderAuthErrorsFailOverInsteadOfHardStop(t *testing.T) { + runnerPolicy := store.RunnerPolicy{ + Status: "active", + FailoverPolicy: map[string]any{ + "enabled": true, + "allowCategories": []any{"auth_error"}, + "allowCodes": []any{"auth_failed", "invalid_api_key", "missing_credentials"}, + "allowStatusCodes": []any{ + 401, + 403, + }, + "actions": map[string]any{"auth_error": "disable_and_next"}, + }, + HardStopPolicy: map[string]any{ + "enabled": true, + "categories": []any{"request_error", "unsupported_model", "user_permission", "insufficient_balance"}, + "codes": []any{"bad_request", "invalid_request", "invalid_parameter", "missing_required", "permission_denied"}, + "statusCodes": []any{}, + "keywords": []any{"invalid_parameter", "missing required", "bad request", "insufficient balance"}, + }, + } + + decision := failoverDecisionForCandidate(runnerPolicy, store.RuntimeModelCandidate{}, &clients.ClientError{Code: "auth_failed", StatusCode: 401, Retryable: false}) + if !decision.Retry || decision.Action != "disable_and_next" || decision.Reason != "failover_allow_policy" { + t.Fatalf("provider auth failures should switch platform, got %+v", decision) + } + + decision = failoverDecisionForCandidate(runnerPolicy, store.RuntimeModelCandidate{}, &clients.ClientError{Code: "http_400", Message: "invalid api key", StatusCode: 400, Retryable: false}) + if !decision.Retry || decision.Info.Category != "auth_error" { + t.Fatalf("provider auth-looking 400 should switch platform, got %+v", decision) + } +} + +func TestPriorityDemotePolicyIsKeywordGatedAndHardStopSafe(t *testing.T) { + runnerPolicy := store.RunnerPolicy{ + Status: "active", + HardStopPolicy: map[string]any{ + "enabled": true, + "categories": []any{"request_error"}, + }, + PriorityDemotePolicy: map[string]any{ + "enabled": true, + "demoteStep": 25, + "keywords": []any{"rate_limit"}, + }, + } + + shouldDemote, step := shouldDemoteCandidatePriority(runnerPolicy, &clients.ClientError{Code: "rate_limit", Message: "rate_limit from upstream", Retryable: true}) + if !shouldDemote || step != 25 { + t.Fatalf("priority demotion should be enabled only by matched policy, got shouldDemote=%v step=%d", shouldDemote, step) + } + + shouldDemote, _ = shouldDemoteCandidatePriority(runnerPolicy, &clients.ClientError{Code: "bad_request", StatusCode: 400, Retryable: true}) + if shouldDemote { + t.Fatal("priority demotion should not run for hard-stop request errors") + } +} diff --git a/apps/api/internal/runner/runtime_policy.go b/apps/api/internal/runner/runtime_policy.go index ab0c378..42417c6 100644 --- a/apps/api/internal/runner/runtime_policy.go +++ b/apps/api/internal/runner/runtime_policy.go @@ -40,6 +40,46 @@ func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID stri } } +func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candidate store.RuntimeModelCandidate, decision failoverDecision, simulated bool) { + switch decision.Action { + case "disable_and_next": + if err := s.store.DisableCandidatePlatform(ctx, candidate.PlatformID); err == nil { + _ = s.emit(ctx, taskID, "task.policy.failover_disabled", "running", "failover", 0.51, "candidate platform disabled by runner failover policy", addPolicyTracePayload(map[string]any{ + "platformId": candidate.PlatformID, + "platformModelId": candidate.PlatformModelID, + "action": decision.Action, + "reason": decision.Reason, + }, decision.Match, decision.Info), simulated) + } + case "cooldown_and_next": + if err := s.store.CooldownCandidatePlatform(ctx, candidate.PlatformID, decision.CooldownSeconds); err == nil { + _ = s.emit(ctx, taskID, "task.policy.failover_cooled_down", "running", "failover", 0.51, "candidate platform cooled down by runner failover policy", addPolicyTracePayload(map[string]any{ + "platformId": candidate.PlatformID, + "platformModelId": candidate.PlatformModelID, + "cooldownSeconds": decision.CooldownSeconds, + "action": decision.Action, + "reason": decision.Reason, + }, decision.Match, decision.Info), simulated) + } + } +} + +func (s *Service) applyPriorityDemotePolicy(ctx context.Context, taskID string, attemptNo int, runnerPolicy store.RunnerPolicy, candidate store.RuntimeModelCandidate, cause error, simulated bool) { + decision := priorityDemoteDecisionForCandidate(runnerPolicy, cause) + if !decision.Demote { + return + } + if err := s.store.DemoteCandidatePlatformPriority(ctx, candidate.PlatformID, decision.Step); err == nil { + s.recordAttemptTrace(ctx, taskID, attemptNo, priorityDemoteTraceEntry(decision, candidate.PlatformID, candidate.PlatformModelID)) + _ = s.emit(ctx, taskID, "task.policy.priority_demoted", "running", "priority_demote", 0.52, "candidate platform priority demoted by runner policy", addPolicyTracePayload(map[string]any{ + "platformId": candidate.PlatformID, + "platformModelId": candidate.PlatformModelID, + "demoteStep": decision.Step, + "code": clients.ErrorCode(cause), + }, decision.Match, decision.Info), simulated) + } +} + func effectiveRuntimePolicy(base map[string]any, override map[string]any, key string) map[string]any { policy := base if nested, ok := override[key].(map[string]any); ok && len(nested) > 0 { diff --git a/apps/api/internal/runner/service.go b/apps/api/internal/runner/service.go index c2849aa..f0fb66e 100644 --- a/apps/api/internal/runner/service.go +++ b/apps/api/internal/runner/service.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log/slog" + "strconv" "strings" "time" @@ -51,6 +52,7 @@ func (s *Service) ExecuteStream(ctx context.Context, task store.GatewayTask, use } func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *auth.User, onDelta clients.StreamDelta) (Result, error) { + executeStartedAt := time.Now() body := normalizeRequest(task.Kind, task.Request) modelType := modelTypeFromKind(task.Kind, body) if err := validateRequest(task.Kind, body); err != nil { @@ -88,64 +90,159 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut return Result{}, err } - maxAttempts := maxAttemptsForCandidates(candidates) + runnerPolicy, err := s.store.GetActiveRunnerPolicy(ctx) + if err != nil { + return Result{}, err + } + maxPlatforms := maxPlatformsForCandidates(candidates, runnerPolicy) + maxFailoverDuration := maxFailoverDurationForCandidates(candidates, runnerPolicy) + attemptNo := 0 var lastErr error for index, candidate := range candidates { - if index >= maxAttempts { + if index >= maxPlatforms { break } - attemptNo := index + 1 - response, err := s.runCandidate(ctx, task, user, body, candidate, attemptNo, onDelta) - if err == nil { - billings := s.billings(ctx, user, task.Kind, body, candidate, response, isSimulation(task, candidate)) - record := buildSuccessRecord(task, user, body, candidate, response, billings, isSimulation(task, candidate)) - record.Metrics = s.withAttemptHistory(ctx, task.ID, record.Metrics) - finished, finishErr := s.store.FinishTaskSuccess(ctx, store.FinishTaskSuccessInput{ - TaskID: task.ID, - Result: response.Result, - Billings: billings, - RequestID: record.RequestID, - ResolvedModel: record.ResolvedModel, - Usage: record.Usage, - Metrics: record.Metrics, - BillingSummary: record.BillingSummary, - FinalChargeAmount: record.FinalChargeAmount, - ResponseStartedAt: record.ResponseStartedAt, - ResponseFinishedAt: record.ResponseFinishedAt, - ResponseDurationMS: record.ResponseDurationMS, - }) - if finishErr != nil { - return Result{}, finishErr - } - if settleErr := s.store.SettleTaskBilling(ctx, finished); settleErr != nil { - return Result{}, settleErr - } - if finished.FinalChargeAmount > 0 { - if err := s.emit(ctx, task.ID, "task.billing.settled", "succeeded", "billing", 0.98, "task billing settled", map[string]any{ - "amount": finished.FinalChargeAmount, - "currency": stringFromAny(record.BillingSummary["currency"]), + clientAttempts := clientAttemptsForCandidate(candidate) + var candidateErr error + for clientAttempt := 1; clientAttempt <= clientAttempts; clientAttempt++ { + attemptNo++ + response, err := s.runCandidate(ctx, task, user, body, candidate, attemptNo, onDelta) + if err == nil { + billings := s.billings(ctx, user, task.Kind, body, candidate, response, isSimulation(task, candidate)) + record := buildSuccessRecord(task, user, body, candidate, response, billings, isSimulation(task, candidate)) + record.Metrics = s.withAttemptHistory(ctx, task.ID, record.Metrics) + finished, finishErr := s.store.FinishTaskSuccess(ctx, store.FinishTaskSuccessInput{ + TaskID: task.ID, + Result: response.Result, + Billings: billings, + RequestID: record.RequestID, + ResolvedModel: record.ResolvedModel, + Usage: record.Usage, + Metrics: record.Metrics, + BillingSummary: record.BillingSummary, + FinalChargeAmount: record.FinalChargeAmount, + ResponseStartedAt: record.ResponseStartedAt, + ResponseFinishedAt: record.ResponseFinishedAt, + ResponseDurationMS: record.ResponseDurationMS, + }) + if finishErr != nil { + return Result{}, finishErr + } + if settleErr := s.store.SettleTaskBilling(ctx, finished); settleErr != nil { + return Result{}, settleErr + } + if finished.FinalChargeAmount > 0 { + if err := s.emit(ctx, task.ID, "task.billing.settled", "succeeded", "billing", 0.98, "task billing settled", map[string]any{ + "amount": finished.FinalChargeAmount, + "currency": stringFromAny(record.BillingSummary["currency"]), + }, isSimulation(task, candidate)); err != nil { + return Result{}, err + } + } + if err := s.emit(ctx, task.ID, "task.completed", "succeeded", "completed", 1, "task completed", map[string]any{ + "result": response.Result, + "billings": billings, + "usage": record.Usage, + "metrics": record.Metrics, + "billingSummary": record.BillingSummary, + "requestId": record.RequestID, }, isSimulation(task, candidate)); err != nil { return Result{}, err } + return Result{Task: finished, Output: response.Result}, nil } - if err := s.emit(ctx, task.ID, "task.completed", "succeeded", "completed", 1, "task completed", map[string]any{ - "result": response.Result, - "billings": billings, - "usage": record.Usage, - "metrics": record.Metrics, - "billingSummary": record.BillingSummary, - "requestId": record.RequestID, + lastErr = err + candidateErr = err + retryDecision := retryDecisionForCandidate(candidate, err) + retryAction := "retry" + if !retryDecision.Retry { + retryAction = "stop" + } + if clientAttempt >= clientAttempts { + retryDecision.Retry = false + retryDecision.Reason = "same_client_max_attempts" + retryDecision.Match = policyRuleMatch{ + Source: "model_runtime_policy_sets.retry_policy", + Policy: "retryPolicy", + Rule: "maxAttempts", + Value: strconv.Itoa(clientAttempts), + } + retryDecision.Info = failureInfoFromError(err) + retryAction = "stop" + } + if failoverTimeBudgetExceeded(executeStartedAt, maxFailoverDuration) { + retryDecision.Retry = false + retryDecision.Reason = "failover_time_budget_exceeded" + retryDecision.Match = policyRuleMatch{ + Source: "gateway_runner_policies.failover_policy", + Policy: "failoverPolicy", + Rule: "maxDurationSeconds", + Value: strconv.Itoa(int(maxFailoverDuration.Seconds())), + } + retryDecision.Info = failureInfoFromError(err) + retryAction = "stop" + } + s.recordAttemptTrace(ctx, task.ID, attemptNo, retryTraceEntry(retryDecision, retryAction, clientAttempt, clientAttempts)) + if !retryDecision.Retry { + break + } + if err := s.emit(ctx, task.ID, "task.retrying", "running", "retry", 0.45, "retrying same client", addPolicyTracePayload(map[string]any{ + "attempt": attemptNo, + "clientAttempt": clientAttempt, + "clientId": candidate.ClientID, + "error": err.Error(), + "reason": retryDecision.Reason, + "scope": "same_client", + }, retryDecision.Match, retryDecision.Info), isSimulation(task, candidate)); err != nil { + return Result{}, err + } + } + if candidateErr == nil || index+1 >= len(candidates) || index+1 >= maxPlatforms { + if candidateErr != nil { + s.applyPriorityDemotePolicy(ctx, task.ID, attemptNo, runnerPolicy, candidate, candidateErr, isSimulation(task, candidate)) + decision := failoverDecisionForCandidate(runnerPolicy, candidate, candidateErr) + if decision.Retry { + decision.Retry = false + decision.Action = "stop" + decision.Reason = "no_next_platform" + decision.Match = policyRuleMatch{Source: "runner_candidates", Policy: "candidateSelection", Rule: "candidateCount", Value: strconv.Itoa(len(candidates))} + if index+1 >= maxPlatforms { + decision.Reason = "max_platforms_reached" + decision.Match = policyRuleMatch{Source: "gateway_runner_policies.failover_policy", Policy: "failoverPolicy", Rule: "maxPlatforms", Value: strconv.Itoa(maxPlatforms)} + } + } + s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision)) + } + break + } + s.applyPriorityDemotePolicy(ctx, task.ID, attemptNo, runnerPolicy, candidate, candidateErr, isSimulation(task, candidate)) + if failoverTimeBudgetExceeded(executeStartedAt, maxFailoverDuration) { + elapsedSeconds := int(time.Since(executeStartedAt).Seconds()) + maxDurationSeconds := int(maxFailoverDuration.Seconds()) + s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTimeBudgetTraceEntry(elapsedSeconds, maxDurationSeconds, failureInfoFromError(candidateErr))) + if err := s.emit(ctx, task.ID, "task.failover.stopped", "running", "retry", 0.55, "failover time budget exceeded", map[string]any{ + "elapsedSeconds": elapsedSeconds, + "maxDurationSeconds": maxDurationSeconds, + "scope": "next_platform", + "statusCode": clients.ErrorResponseMetadata(candidateErr).StatusCode, }, isSimulation(task, candidate)); err != nil { return Result{}, err } - return Result{Task: finished, Output: response.Result}, nil - } - lastErr = err - retryable := clients.IsRetryable(err) - if !retryable || !retryEnabled(candidate) || attemptNo >= maxAttempts { break } - if err := s.emit(ctx, task.ID, "task.retrying", "running", "retry", 0.55, "retrying next client", map[string]any{"attempt": attemptNo, "error": err.Error()}, isSimulation(task, candidate)); err != nil { + decision := failoverDecisionForCandidate(runnerPolicy, candidate, candidateErr) + s.recordAttemptTrace(ctx, task.ID, attemptNo, failoverTraceEntry(decision)) + if !decision.Retry { + break + } + s.applyFailoverAction(ctx, task.ID, candidate, decision, isSimulation(task, candidate)) + if err := s.emit(ctx, task.ID, "task.retrying", "running", "retry", 0.55, "retrying next client", addPolicyTracePayload(map[string]any{ + "attempt": attemptNo, + "action": decision.Action, + "error": candidateErr.Error(), + "reason": decision.Reason, + "scope": "next_platform", + }, decision.Match, decision.Info), isSimulation(task, candidate)); err != nil { return Result{}, err } } @@ -184,15 +281,16 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user reservations := s.rateLimitReservations(ctx, user, candidate, body) limitResult, err := s.store.ReserveRateLimits(ctx, task.ID, reservations) if err != nil { + clientErr := &clients.ClientError{Code: "rate_limit", Message: err.Error(), Retryable: false} _ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{ AttemptID: attemptID, Status: "failed", Retryable: false, - Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false}), + Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false, "trace": []any{failureTraceEntry(clientErr, false)}}), ErrorCode: "rate_limit", ErrorMessage: err.Error(), }) - return clients.Response{}, &clients.ClientError{Code: "rate_limit", Message: err.Error(), Retryable: false} + return clients.Response{}, clientErr } defer s.store.ReleaseConcurrencyLeases(context.WithoutCancel(ctx), limitResult.LeaseIDs) @@ -207,7 +305,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user AttemptID: attemptID, Status: "failed", Retryable: false, - Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false}), + Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false, "trace": []any{failureTraceEntry(err, false)}}), ErrorCode: clients.ErrorCode(err), ErrorMessage: err.Error(), }) @@ -266,7 +364,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user ErrorCode: clients.ErrorCode(err), ErrorMessage: err.Error(), }) - _ = s.emit(ctx, task.ID, "task.attempt.failed", "running", "attempt_failed", 0.45, err.Error(), map[string]any{"attempt": attemptNo, "retryable": retryable, "requestId": requestID, "metrics": metrics}, simulated) + _ = s.emit(ctx, task.ID, "task.attempt.failed", "running", "attempt_failed", 0.45, err.Error(), map[string]any{"attempt": attemptNo, "retryable": retryable, "requestId": requestID, "statusCode": clients.ErrorResponseMetadata(err).StatusCode, "metrics": metrics}, simulated) s.applyCandidateFailurePolicies(ctx, task.ID, candidate, err, simulated) return clients.Response{}, err } @@ -275,6 +373,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user metrics := mergeMetrics(taskMetrics(task, user, body, candidate, response, simulated), map[string]any{ "error": err.Error(), "retryable": clients.IsRetryable(err), + "trace": []any{failureTraceEntry(err, clients.IsRetryable(err))}, }) _ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{ AttemptID: attemptID, @@ -429,20 +528,54 @@ func retryEnabled(candidate store.RuntimeModelCandidate) bool { return true } -func maxAttemptsForCandidates(candidates []store.RuntimeModelCandidate) int { +func clientAttemptsForCandidate(candidate store.RuntimeModelCandidate) int { + if !retryEnabled(candidate) { + return 1 + } + if value := intFromPolicy(effectiveRetryPolicy(candidate), "maxAttempts"); value > 0 { + return value + } + return 1 +} + +func maxPlatformsForCandidates(candidates []store.RuntimeModelCandidate, runnerPolicy store.RunnerPolicy) int { if len(candidates) == 0 { return 0 } - maxAttempts := len(candidates) + maxPlatforms := len(candidates) + if value := intFromPolicy(runnerPolicy.FailoverPolicy, "maxPlatforms"); value > 0 { + if value < maxPlatforms { + maxPlatforms = value + } + } else if maxPlatforms > 99 { + maxPlatforms = 99 + } for _, candidate := range candidates { - if value := intFromPolicy(effectiveRetryPolicy(candidate), "maxAttempts"); value > 0 && value < maxAttempts { - maxAttempts = value + if value := intFromPolicy(effectiveFailoverPolicy(runnerPolicy.FailoverPolicy, candidate.RuntimePolicyOverride), "maxPlatforms"); value > 0 && value < maxPlatforms { + maxPlatforms = value } } - if maxAttempts <= 0 { + if maxPlatforms <= 0 { return 1 } - return maxAttempts + return maxPlatforms +} + +func maxFailoverDurationForCandidates(candidates []store.RuntimeModelCandidate, runnerPolicy store.RunnerPolicy) time.Duration { + seconds := intFromPolicy(runnerPolicy.FailoverPolicy, "maxDurationSeconds") + if seconds <= 0 { + seconds = 600 + } + for _, candidate := range candidates { + if value := intFromPolicy(effectiveFailoverPolicy(runnerPolicy.FailoverPolicy, candidate.RuntimePolicyOverride), "maxDurationSeconds"); value > 0 && value < seconds { + seconds = value + } + } + return time.Duration(seconds) * time.Second +} + +func failoverTimeBudgetExceeded(start time.Time, maxDuration time.Duration) bool { + return maxDuration > 0 && time.Since(start) >= maxDuration } func normalizeRequest(kind string, body map[string]any) map[string]any { diff --git a/apps/api/internal/runner/trace.go b/apps/api/internal/runner/trace.go new file mode 100644 index 0000000..fc36887 --- /dev/null +++ b/apps/api/internal/runner/trace.go @@ -0,0 +1,128 @@ +package runner + +import ( + "context" + "fmt" + "time" +) + +func failureTraceEntry(err error, retryable bool) map[string]any { + info := failureInfoFromError(err) + entry := policyTraceEntry("failure", "client", "failed", "client_call_failed", policyRuleMatch{}, info) + entry["retryable"] = retryable + return entry +} + +func retryTraceEntry(decision retryDecision, action string, clientAttempt int, maxAttempts int) map[string]any { + entry := policyTraceEntry("same_client_retry", "same_client", action, decision.Reason, decision.Match, decision.Info) + entry["retry"] = decision.Retry + entry["clientAttempt"] = clientAttempt + entry["maxAttempts"] = maxAttempts + return entry +} + +func failoverTraceEntry(decision failoverDecision) map[string]any { + event := "failover_next" + if !decision.Retry { + event = "failover_stop" + } + entry := policyTraceEntry(event, "next_platform", decision.Action, decision.Reason, decision.Match, decision.Info) + entry["retry"] = decision.Retry + if decision.CooldownSeconds > 0 { + entry["cooldownSeconds"] = decision.CooldownSeconds + } + return entry +} + +func priorityDemoteTraceEntry(decision priorityDemoteDecision, platformID string, platformModelID string) map[string]any { + entry := policyTraceEntry("priority_demoted", "priority_demote", "demote", decision.Reason, decision.Match, decision.Info) + entry["demote"] = decision.Demote + entry["demoteStep"] = decision.Step + entry["platformId"] = platformID + entry["platformModelId"] = platformModelID + return entry +} + +func failoverTimeBudgetTraceEntry(elapsedSeconds int, maxDurationSeconds int, info failureInfo) map[string]any { + entry := policyTraceEntry("failover_stop", "next_platform", "stop", "failover_time_budget_exceeded", policyRuleMatch{ + Source: "gateway_runner_policies.failover_policy", + Policy: "failoverPolicy", + Rule: "maxDurationSeconds", + Value: fmt.Sprintf("%d", maxDurationSeconds), + }, info) + entry["elapsedSeconds"] = elapsedSeconds + entry["maxDurationSeconds"] = maxDurationSeconds + return entry +} + +func policyTraceEntry(event string, scope string, action string, reason string, match policyRuleMatch, info failureInfo) map[string]any { + entry := map[string]any{ + "event": event, + "scope": scope, + "action": action, + "reason": reason, + "createdAt": time.Now().Format(time.RFC3339Nano), + } + if info.Code != "" { + entry["errorCode"] = info.Code + } + if info.Message != "" { + entry["message"] = info.Message + } + if info.Status > 0 { + entry["statusCode"] = info.Status + } + if info.Category != "" { + entry["category"] = info.Category + } + if match.Source != "" { + entry["policySource"] = match.Source + } + if match.Policy != "" { + entry["policy"] = match.Policy + } + if match.Rule != "" { + entry["policyRule"] = match.Rule + } + if match.Value != "" { + entry["matchedValue"] = match.Value + } + return entry +} + +func addPolicyTracePayload(payload map[string]any, match policyRuleMatch, info failureInfo) map[string]any { + if payload == nil { + payload = map[string]any{} + } + if info.Code != "" { + payload["errorCode"] = info.Code + } + if info.Status > 0 { + payload["statusCode"] = info.Status + } + if info.Category != "" { + payload["category"] = info.Category + } + if match.Source != "" { + payload["policySource"] = match.Source + } + if match.Policy != "" { + payload["policy"] = match.Policy + } + if match.Rule != "" { + payload["policyRule"] = match.Rule + } + if match.Value != "" { + payload["matchedValue"] = match.Value + } + return payload +} + +func (s *Service) recordAttemptTrace(ctx context.Context, taskID string, attemptNo int, entry map[string]any) { + if attemptNo <= 0 || len(entry) == 0 { + return + } + if err := s.store.AppendTaskAttemptTrace(ctx, taskID, attemptNo, entry); err != nil { + s.logger.Warn("append task attempt trace failed", "taskID", taskID, "attempt", attemptNo, "error", err) + } +} diff --git a/apps/api/internal/store/postgres.go b/apps/api/internal/store/postgres.go index 34154ef..5f316c1 100644 --- a/apps/api/internal/store/postgres.go +++ b/apps/api/internal/store/postgres.go @@ -229,6 +229,20 @@ type RuntimePolicySet struct { UpdatedAt time.Time `json:"updatedAt"` } +type RunnerPolicy struct { + ID string `json:"id"` + PolicyKey string `json:"policyKey"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + FailoverPolicy map[string]any `json:"failoverPolicy,omitempty"` + HardStopPolicy map[string]any `json:"hardStopPolicy,omitempty"` + PriorityDemotePolicy map[string]any `json:"priorityDemotePolicy,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` + Status string `json:"status"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + type PricingRule struct { ID string `json:"id"` RuleSetID string `json:"ruleSetId,omitempty"` @@ -446,6 +460,7 @@ type TaskAttempt struct { Retryable bool `json:"retryable"` Simulated bool `json:"simulated"` RequestID string `json:"requestId,omitempty"` + StatusCode int `json:"statusCode,omitempty"` Usage map[string]any `json:"usage,omitempty"` Metrics map[string]any `json:"metrics,omitempty"` RequestSnapshot map[string]any `json:"requestSnapshot,omitempty"` @@ -1796,6 +1811,11 @@ func IsUniqueViolation(err error) bool { return isUniqueViolation(err) } +func IsUndefinedDatabaseObject(err error) bool { + var pgErr *pgconn.PgError + return errors.As(err, &pgErr) && (pgErr.Code == "42P01" || pgErr.Code == "42703") +} + func isUniqueViolation(err error) bool { var pgErr *pgconn.PgError return errors.As(err, &pgErr) && pgErr.Code == "23505" diff --git a/apps/api/internal/store/runner_policies.go b/apps/api/internal/store/runner_policies.go new file mode 100644 index 0000000..45240ff --- /dev/null +++ b/apps/api/internal/store/runner_policies.go @@ -0,0 +1,168 @@ +package store + +import ( + "context" + "encoding/json" + "strings" + "time" + + "github.com/jackc/pgx/v5" +) + +const runnerPolicyColumns = ` +id::text, policy_key, name, COALESCE(description, ''), failover_policy, +hard_stop_policy, priority_demote_policy, metadata, status, created_at, updated_at` + +type RunnerPolicyInput struct { + PolicyKey string `json:"policyKey"` + Name string `json:"name"` + Description string `json:"description"` + FailoverPolicy map[string]any `json:"failoverPolicy"` + HardStopPolicy map[string]any `json:"hardStopPolicy"` + PriorityDemotePolicy map[string]any `json:"priorityDemotePolicy"` + Metadata map[string]any `json:"metadata"` + Status string `json:"status"` +} + +type runnerPolicyScanner interface { + Scan(dest ...any) error +} + +func (s *Store) GetActiveRunnerPolicy(ctx context.Context) (RunnerPolicy, error) { + item, err := scanRunnerPolicy(s.pool.QueryRow(ctx, ` +SELECT `+runnerPolicyColumns+` +FROM gateway_runner_policies +ORDER BY CASE WHEN policy_key = 'default-runner-v1' THEN 0 ELSE 1 END, + CASE WHEN status = 'active' THEN 0 ELSE 1 END, + updated_at DESC +LIMIT 1`)) + if err != nil { + if err == pgx.ErrNoRows || IsUndefinedDatabaseObject(err) { + return defaultRunnerPolicy(), nil + } + return RunnerPolicy{}, err + } + return item, nil +} + +func (s *Store) UpsertDefaultRunnerPolicy(ctx context.Context, input RunnerPolicyInput) (RunnerPolicy, error) { + input = normalizeRunnerPolicyInput(input) + failoverPolicy, _ := json.Marshal(emptyObjectIfNil(input.FailoverPolicy)) + hardStopPolicy, _ := json.Marshal(emptyObjectIfNil(input.HardStopPolicy)) + priorityDemotePolicy, _ := json.Marshal(emptyObjectIfNil(input.PriorityDemotePolicy)) + metadata, _ := json.Marshal(emptyObjectIfNil(input.Metadata)) + return scanRunnerPolicy(s.pool.QueryRow(ctx, ` +INSERT INTO gateway_runner_policies ( + policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, metadata, status +) +VALUES ($1, $2, NULLIF($3, ''), $4, $5, $6, $7, $8) +ON CONFLICT (policy_key) DO UPDATE +SET name = EXCLUDED.name, + description = EXCLUDED.description, + failover_policy = EXCLUDED.failover_policy, + hard_stop_policy = EXCLUDED.hard_stop_policy, + priority_demote_policy = EXCLUDED.priority_demote_policy, + metadata = EXCLUDED.metadata, + status = EXCLUDED.status, + updated_at = now() +RETURNING `+runnerPolicyColumns, + input.PolicyKey, input.Name, input.Description, failoverPolicy, hardStopPolicy, priorityDemotePolicy, metadata, input.Status, + )) +} + +func scanRunnerPolicy(scanner runnerPolicyScanner) (RunnerPolicy, error) { + var item RunnerPolicy + var failoverPolicy []byte + var hardStopPolicy []byte + var priorityDemotePolicy []byte + var metadata []byte + if err := scanner.Scan( + &item.ID, + &item.PolicyKey, + &item.Name, + &item.Description, + &failoverPolicy, + &hardStopPolicy, + &priorityDemotePolicy, + &metadata, + &item.Status, + &item.CreatedAt, + &item.UpdatedAt, + ); err != nil { + return RunnerPolicy{}, err + } + item.FailoverPolicy = decodeObject(failoverPolicy) + item.HardStopPolicy = decodeObject(hardStopPolicy) + item.PriorityDemotePolicy = decodeObject(priorityDemotePolicy) + item.Metadata = decodeObject(metadata) + return item, nil +} + +func normalizeRunnerPolicyInput(input RunnerPolicyInput) RunnerPolicyInput { + input.PolicyKey = strings.TrimSpace(input.PolicyKey) + if input.PolicyKey == "" { + input.PolicyKey = "default-runner-v1" + } + input.Name = strings.TrimSpace(input.Name) + if input.Name == "" { + input.Name = "默认全局调度策略" + } + input.Description = strings.TrimSpace(input.Description) + input.Status = strings.TrimSpace(input.Status) + if input.Status == "" { + input.Status = "active" + } + return input +} + +func defaultRunnerPolicy() RunnerPolicy { + now := time.Now() + return RunnerPolicy{ + PolicyKey: "default-runner-v1", + Name: "默认全局调度策略", + Description: "控制多个候选平台之间的故障切换;模型运行策略只可覆盖 failoverPolicy,不能覆盖 hardStopPolicy。", + FailoverPolicy: defaultRunnerFailoverPolicy(), + HardStopPolicy: defaultRunnerHardStopPolicy(), + PriorityDemotePolicy: defaultRunnerPriorityDemotePolicy(), + Metadata: map[string]any{"source": "code-default"}, + Status: "active", + CreatedAt: now, + UpdatedAt: now, + } +} + +func defaultRunnerPriorityDemotePolicy() map[string]any { + return map[string]any{ + "enabled": true, + "demoteStep": 100, + "categories": []any{"network", "timeout", "stream_error", "rate_limit", "provider_5xx", "provider_overloaded"}, + "codes": []any{"network", "timeout", "stream_read_error", "rate_limit", "server_error", "overloaded"}, + "statusCodes": []any{408, 429, 500, 502, 503, 504}, + "keywords": []any{"timeout", "network", "rate_limit", "overloaded", "temporarily_unavailable", "server_error", "429", "5xx"}, + } +} + +func defaultRunnerFailoverPolicy() map[string]any { + return map[string]any{ + "enabled": true, + "maxPlatforms": 99, + "maxDurationSeconds": 600, + "allowCategories": []any{"network", "timeout", "stream_error", "rate_limit", "provider_5xx", "provider_overloaded", "auth_error"}, + "denyCategories": []any{"request_error", "unsupported_model", "user_permission", "insufficient_balance"}, + "allowCodes": []any{"auth_failed", "invalid_api_key", "missing_credentials"}, + "allowKeywords": []any{"timeout", "network", "rate_limit", "overloaded", "temporarily_unavailable", "server_error", "auth_failed", "invalid_api_key", "missing_credentials", "unauthorized", "forbidden", "429", "5xx"}, + "denyKeywords": []any{"invalid_parameter", "missing required", "bad request"}, + "allowStatusCodes": []any{401, 403, 408, 429, 500, 502, 503, 504}, + "denyStatusCodes": []any{}, + } +} + +func defaultRunnerHardStopPolicy() map[string]any { + return map[string]any{ + "enabled": true, + "categories": []any{"request_error", "unsupported_model", "user_permission", "insufficient_balance"}, + "codes": []any{"bad_request", "invalid_request", "invalid_parameter", "missing_required", "unsupported_kind", "unsupported_model", "insufficient_balance", "permission_denied"}, + "statusCodes": []any{}, + "keywords": []any{"invalid_parameter", "missing required", "bad request", "insufficient balance"}, + } +} diff --git a/apps/api/internal/store/runtime_policies.go b/apps/api/internal/store/runtime_policies.go index 6c44db0..dc15697 100644 --- a/apps/api/internal/store/runtime_policies.go +++ b/apps/api/internal/store/runtime_policies.go @@ -134,6 +134,28 @@ WHERE id = $1::uuid`, platformID, cooldownSeconds) return err } +func (s *Store) DemoteCandidatePlatformPriority(ctx context.Context, platformID string, demoteStep int) error { + if strings.TrimSpace(platformID) == "" { + return nil + } + if demoteStep <= 0 { + demoteStep = 100 + } + _, err := s.pool.Exec(ctx, ` +UPDATE integration_platforms target +SET dynamic_priority = GREATEST( + COALESCE(target.dynamic_priority, target.priority), + COALESCE(( + SELECT MAX(COALESCE(peer.dynamic_priority, peer.priority)) + FROM integration_platforms peer + WHERE peer.deleted_at IS NULL + ), target.priority) + $2::int + ), + updated_at = now() +WHERE target.id = $1::uuid`, platformID, demoteStep) + return err +} + func scanRuntimePolicySet(scanner runtimePolicyScanner) (RuntimePolicySet, error) { var item RuntimePolicySet var rateLimitPolicy []byte diff --git a/apps/api/internal/store/tasks_runtime.go b/apps/api/internal/store/tasks_runtime.go index 4b1f77d..ec88d2d 100644 --- a/apps/api/internal/store/tasks_runtime.go +++ b/apps/api/internal/store/tasks_runtime.go @@ -3,6 +3,7 @@ package store import ( "context" "encoding/json" + "strconv" "strings" "time" @@ -234,6 +235,27 @@ func (s *Store) ListTaskAttempts(ctx context.Context, taskID string) ([]TaskAtte return attemptsByTaskID[taskID], nil } +func (s *Store) AppendTaskAttemptTrace(ctx context.Context, taskID string, attemptNo int, entry map[string]any) error { + entryJSON, _ := json.Marshal(emptyObjectIfNil(entry)) + _, err := s.pool.Exec(ctx, ` +UPDATE gateway_task_attempts +SET metrics = jsonb_set( + COALESCE(metrics, '{}'::jsonb), + '{trace}', + ( + CASE + WHEN jsonb_typeof(COALESCE(metrics->'trace', '[]'::jsonb)) = 'array' + THEN COALESCE(metrics->'trace', '[]'::jsonb) + ELSE '[]'::jsonb + END + ) || jsonb_build_array($3::jsonb), + true + ) +WHERE task_id = $1::uuid + AND attempt_no = $2`, taskID, attemptNo, string(entryJSON)) + return err +} + func (s *Store) listTaskAttemptsByTaskIDs(ctx context.Context, taskIDs []string) (map[string][]TaskAttempt, error) { itemsByTaskID := map[string][]TaskAttempt{} if len(taskIDs) == 0 { @@ -331,6 +353,7 @@ func enrichTaskAttemptFromMetrics(item *TaskAttempt) { item.ModelAlias = firstNonEmpty(item.ModelAlias, taskAttemptMetricString(item.Metrics, "modelAlias")) item.ModelType = firstNonEmpty(item.ModelType, taskAttemptMetricString(item.Metrics, "modelType")) item.ClientID = firstNonEmpty(item.ClientID, taskAttemptMetricString(item.Metrics, "clientId")) + item.StatusCode = taskAttemptMetricInt(item.Metrics, "statusCode") } func taskAttemptMetricString(metrics map[string]any, key string) string { @@ -338,6 +361,25 @@ func taskAttemptMetricString(metrics map[string]any, key string) string { return strings.TrimSpace(value) } +func taskAttemptMetricInt(metrics map[string]any, key string) int { + switch value := metrics[key].(type) { + case int: + return value + case int64: + return int(value) + case float64: + return int(value) + case json.Number: + next, _ := value.Int64() + return int(next) + case string: + next, _ := strconv.Atoi(strings.TrimSpace(value)) + return next + default: + return 0 + } +} + func (s *Store) FinishTaskAttempt(ctx context.Context, input FinishTaskAttemptInput) error { responseJSON, _ := json.Marshal(emptyObjectIfNil(input.ResponseSnapshot)) usageJSON, _ := json.Marshal(emptyObjectIfNil(input.Usage)) diff --git a/apps/api/migrations/0026_runner_policies.sql b/apps/api/migrations/0026_runner_policies.sql new file mode 100644 index 0000000..3c94af2 --- /dev/null +++ b/apps/api/migrations/0026_runner_policies.sql @@ -0,0 +1,73 @@ +CREATE TABLE IF NOT EXISTS gateway_runner_policies ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + policy_key text NOT NULL UNIQUE, + name text NOT NULL, + description text, + failover_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + hard_stop_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + priority_demote_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + status text NOT NULL DEFAULT 'active', + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +ALTER TABLE IF EXISTS gateway_runner_policies + ADD COLUMN IF NOT EXISTS priority_demote_policy jsonb NOT NULL DEFAULT '{}'::jsonb; + +INSERT INTO gateway_runner_policies ( + policy_key, name, description, failover_policy, hard_stop_policy, priority_demote_policy, metadata, status +) +VALUES ( + 'default-runner-v1', + '默认全局调度策略', + '控制多个候选平台之间的故障切换;模型运行策略只可覆盖 failoverPolicy,不能覆盖 hardStopPolicy。', + '{ + "enabled": true, + "maxPlatforms": 99, + "maxDurationSeconds": 600, + "allowCategories": ["network", "timeout", "stream_error", "rate_limit", "provider_5xx", "provider_overloaded", "auth_error"], + "denyCategories": ["request_error", "unsupported_model", "user_permission", "insufficient_balance"], + "allowCodes": ["auth_failed", "invalid_api_key", "missing_credentials"], + "allowKeywords": ["timeout", "network", "rate_limit", "overloaded", "temporarily_unavailable", "server_error", "auth_failed", "invalid_api_key", "missing_credentials", "unauthorized", "forbidden", "429", "5xx"], + "denyKeywords": ["invalid_parameter", "missing required", "bad request"], + "allowStatusCodes": [401, 403, 408, 429, 500, 502, 503, 504], + "denyStatusCodes": [], + "actions": { + "auth_error": "disable_and_next", + "rate_limit": "cooldown_and_next", + "provider_5xx": "next", + "request_error": "stop" + } + }'::jsonb, + '{ + "enabled": true, + "categories": ["request_error", "unsupported_model", "user_permission", "insufficient_balance"], + "codes": ["bad_request", "invalid_request", "invalid_parameter", "missing_required", "unsupported_kind", "unsupported_model", "insufficient_balance", "permission_denied"], + "statusCodes": [], + "keywords": ["invalid_parameter", "missing required", "bad request", "insufficient balance"] + }'::jsonb, + '{ + "enabled": true, + "demoteStep": 100, + "categories": ["network", "timeout", "stream_error", "rate_limit", "provider_5xx", "provider_overloaded"], + "codes": ["network", "timeout", "stream_read_error", "rate_limit", "server_error", "overloaded"], + "statusCodes": [408, 429, 500, 502, 503, 504], + "keywords": ["timeout", "network", "rate_limit", "overloaded", "temporarily_unavailable", "server_error", "429", "5xx"] + }'::jsonb, + '{"seed":"0026_runner_policies"}'::jsonb, + 'active' +) +ON CONFLICT (policy_key) DO UPDATE +SET name = EXCLUDED.name, + description = EXCLUDED.description, + failover_policy = gateway_runner_policies.failover_policy || EXCLUDED.failover_policy, + hard_stop_policy = gateway_runner_policies.hard_stop_policy || EXCLUDED.hard_stop_policy, + priority_demote_policy = gateway_runner_policies.priority_demote_policy || EXCLUDED.priority_demote_policy, + metadata = gateway_runner_policies.metadata || EXCLUDED.metadata, + updated_at = now(); + +UPDATE model_runtime_policy_sets +SET description = '默认包含 TPM/RPM/并发、平台内调用重试、自动禁用和优先级降级关键词。' +WHERE policy_key = 'default-runtime-v1' + AND description = '默认包含 TPM/RPM/并发、失败重试、自动禁用和优先级降级关键词。'; diff --git a/apps/api/migrations/0027_runner_policy_failover_budget.sql b/apps/api/migrations/0027_runner_policy_failover_budget.sql new file mode 100644 index 0000000..690d323 --- /dev/null +++ b/apps/api/migrations/0027_runner_policy_failover_budget.sql @@ -0,0 +1,17 @@ +UPDATE gateway_runner_policies +SET failover_policy = jsonb_set( + jsonb_set( + failover_policy, + '{maxDurationSeconds}', + COALESCE(failover_policy -> 'maxDurationSeconds', '600'::jsonb), + true + ), + '{maxPlatforms}', + CASE + WHEN COALESCE(failover_policy ->> 'maxPlatforms', '') IN ('', '3') THEN '99'::jsonb + ELSE failover_policy -> 'maxPlatforms' + END, + true + ), + updated_at = now() +WHERE policy_key = 'default-runner-v1'; diff --git a/apps/api/migrations/0028_runner_policy_platform_auth_failover.sql b/apps/api/migrations/0028_runner_policy_platform_auth_failover.sql new file mode 100644 index 0000000..89488e2 --- /dev/null +++ b/apps/api/migrations/0028_runner_policy_platform_auth_failover.sql @@ -0,0 +1,22 @@ +UPDATE gateway_runner_policies +SET failover_policy = failover_policy + || jsonb_build_object( + 'allowCategories', jsonb_build_array('network', 'timeout', 'stream_error', 'rate_limit', 'provider_5xx', 'provider_overloaded', 'auth_error'), + 'denyCategories', jsonb_build_array('request_error', 'unsupported_model', 'user_permission', 'insufficient_balance'), + 'allowCodes', jsonb_build_array('auth_failed', 'invalid_api_key', 'missing_credentials'), + 'allowKeywords', jsonb_build_array('timeout', 'network', 'rate_limit', 'overloaded', 'temporarily_unavailable', 'server_error', 'auth_failed', 'invalid_api_key', 'missing_credentials', 'unauthorized', 'forbidden', '429', '5xx'), + 'denyKeywords', jsonb_build_array('invalid_parameter', 'missing required', 'bad request'), + 'allowStatusCodes', jsonb_build_array(401, 403, 408, 429, 500, 502, 503, 504), + 'denyStatusCodes', '[]'::jsonb, + 'actions', COALESCE(failover_policy->'actions', '{}'::jsonb) || jsonb_build_object('auth_error', 'disable_and_next') + ), + hard_stop_policy = hard_stop_policy + || jsonb_build_object( + 'categories', jsonb_build_array('request_error', 'unsupported_model', 'user_permission', 'insufficient_balance'), + 'codes', jsonb_build_array('bad_request', 'invalid_request', 'invalid_parameter', 'missing_required', 'unsupported_kind', 'unsupported_model', 'insufficient_balance', 'permission_denied'), + 'statusCodes', '[]'::jsonb, + 'keywords', jsonb_build_array('invalid_parameter', 'missing required', 'bad request', 'insufficient balance') + ), + metadata = metadata || jsonb_build_object('platformAuthFailoverPolicy', '0028_runner_policy_platform_auth_failover'), + updated_at = now() +WHERE policy_key = 'default-runner-v1'; diff --git a/apps/web/src/App.tsx b/apps/web/src/App.tsx index b57af61..f5011fc 100644 --- a/apps/web/src/App.tsx +++ b/apps/web/src/App.tsx @@ -8,6 +8,7 @@ import type { GatewayApiKey, GatewayAuditLog, GatewayNetworkProxyConfig, + GatewayRunnerPolicy, GatewayTenantUpsertRequest, GatewayTask, GatewayUserUpsertRequest, @@ -43,6 +44,7 @@ import { deleteUserGroup, getHealth, getNetworkProxyConfig, + getRunnerPolicy, getTask, getWalletSummary, listAccessRules, @@ -58,11 +60,11 @@ import { listPlatforms, listPricingRules, listPricingRuleSets, + listPublicBaseModels, + listPublicCatalogProviders, listRuntimePolicySets, listTasks, listWalletTransactions, - listPublicBaseModels, - listPublicCatalogProviders, listRateLimitWindows, listTenants, listUserGroups, @@ -135,6 +137,7 @@ type DataKey = | 'baseModels' | 'pricingRules' | 'pricingRuleSets' + | 'runnerPolicy' | 'runtimePolicySets' | 'rateLimitWindows' | 'tenants' @@ -175,6 +178,7 @@ export function App() { const [baseModels, setBaseModels] = useState([]); const [pricingRules, setPricingRules] = useState([]); const [pricingRuleSets, setPricingRuleSets] = useState([]); + const [runnerPolicy, setRunnerPolicy] = useState(null); const [runtimePolicySets, setRuntimePolicySets] = useState([]); const [accessRules, setAccessRules] = useState([]); const [auditLogs, setAuditLogs] = useState([]); @@ -217,9 +221,10 @@ export function App() { setPricingRuleSets, token, }); - const { removeRuntimePolicySet, saveRuntimePolicySet } = useRuntimePolicySetOperations({ + const { removeRuntimePolicySet, saveRunnerPolicy, saveRuntimePolicySet } = useRuntimePolicySetOperations({ setCoreMessage, setCoreState, + setRunnerPolicy, setRuntimePolicySets, token, }); @@ -269,6 +274,7 @@ export function App() { modelCatalog, models, networkProxyConfig, + runnerPolicy, platforms, pricingRules, pricingRuleSets, @@ -282,7 +288,7 @@ export function App() { users, walletAccounts, walletTransactions, - }), [accessRules, apiKeys, auditLogs, baseModels, modelCatalog, models, networkProxyConfig, platforms, pricingRuleSets, pricingRules, providers, rateLimitWindows, runtimePolicySets, taskResult, tasks, tenants, userGroups, users, walletAccounts, walletTransactions]); + }), [accessRules, apiKeys, auditLogs, baseModels, modelCatalog, models, networkProxyConfig, platforms, pricingRuleSets, pricingRules, providers, rateLimitWindows, runnerPolicy, runtimePolicySets, taskResult, tasks, tenants, userGroups, users, walletAccounts, walletTransactions]); async function refresh(nextToken = token) { await ensureRouteData(nextToken, true); @@ -377,6 +383,9 @@ export function App() { case 'pricingRuleSets': setPricingRuleSets((await listPricingRuleSets(nextToken)).items); return; + case 'runnerPolicy': + setRunnerPolicy(await getRunnerPolicy(nextToken)); + return; case 'runtimePolicySets': setRuntimePolicySets((await listRuntimePolicySets(nextToken)).items); return; @@ -774,6 +783,7 @@ export function App() { setBaseModels([]); setPricingRules([]); setPricingRuleSets([]); + setRunnerPolicy(null); setRuntimePolicySets([]); setAccessRules([]); setAuditLogs([]); @@ -948,6 +958,7 @@ export function App() { onSavePlatform={savePlatformWithModels} onSaveProvider={saveProvider} onSavePricingRuleSet={savePricingRuleSet} + onSaveRunnerPolicy={saveRunnerPolicy} onSaveRuntimePolicySet={saveRuntimePolicySet} onBatchAccessRules={batchSaveAccessRules} onSaveAccessRule={saveAccessRule} @@ -1127,7 +1138,7 @@ function dataKeysForRoute( case 'pricing': return ['pricingRuleSets']; case 'runtime': - return ['runtimePolicySets']; + return ['runtimePolicySets', 'runnerPolicy']; case 'baseModels': return ['baseModels', 'providers', 'pricingRuleSets', 'runtimePolicySets']; case 'platforms': diff --git a/apps/web/src/api.ts b/apps/web/src/api.ts index dd7fbe6..6256420 100644 --- a/apps/web/src/api.ts +++ b/apps/web/src/api.ts @@ -10,6 +10,8 @@ import type { GatewayAccessRuleUpsertRequest, GatewayApiKey, GatewayAuditLog, + GatewayRunnerPolicy, + GatewayRunnerPolicyUpsertRequest, GatewayTenant, GatewayTenantUpsertRequest, GatewayNetworkProxyConfig, @@ -239,6 +241,21 @@ export async function listRuntimePolicySets(token: string): Promise>('/api/admin/runtime/policy-sets', { token }); } +export async function getRunnerPolicy(token: string): Promise { + return request('/api/admin/runtime/runner-policy', { token }); +} + +export async function updateRunnerPolicy( + token: string, + input: GatewayRunnerPolicyUpsertRequest, +): Promise { + return request('/api/admin/runtime/runner-policy', { + body: input, + method: 'PATCH', + token, + }); +} + export async function createRuntimePolicySet( token: string, input: RuntimePolicySetUpsertRequest, diff --git a/apps/web/src/app-state.ts b/apps/web/src/app-state.ts index 9b75cf8..440b5b0 100644 --- a/apps/web/src/app-state.ts +++ b/apps/web/src/app-state.ts @@ -5,6 +5,7 @@ import type { GatewayApiKey, GatewayAuditLog, GatewayNetworkProxyConfig, + GatewayRunnerPolicy, GatewayTask, GatewayTenant, GatewayUser, @@ -28,6 +29,7 @@ export interface ConsoleData { modelCatalog: ModelCatalogResponse; models: PlatformModel[]; networkProxyConfig: GatewayNetworkProxyConfig | null; + runnerPolicy: GatewayRunnerPolicy | null; platforms: IntegrationPlatform[]; pricingRules: PricingRule[]; pricingRuleSets: PricingRuleSet[]; diff --git a/apps/web/src/hooks/useRuntimePolicySetOperations.ts b/apps/web/src/hooks/useRuntimePolicySetOperations.ts index 8ef753c..e1c026d 100644 --- a/apps/web/src/hooks/useRuntimePolicySetOperations.ts +++ b/apps/web/src/hooks/useRuntimePolicySetOperations.ts @@ -1,14 +1,31 @@ import type { Dispatch, SetStateAction } from 'react'; -import type { RuntimePolicySet, RuntimePolicySetUpsertRequest } from '@easyai-ai-gateway/contracts'; -import { createRuntimePolicySet, deleteRuntimePolicySet, updateRuntimePolicySet } from '../api'; +import type { GatewayRunnerPolicy, GatewayRunnerPolicyUpsertRequest, RuntimePolicySet, RuntimePolicySetUpsertRequest } from '@easyai-ai-gateway/contracts'; +import { createRuntimePolicySet, deleteRuntimePolicySet, updateRunnerPolicy, updateRuntimePolicySet } from '../api'; import type { LoadState } from '../types'; export function useRuntimePolicySetOperations(input: { setCoreMessage: Dispatch>; setCoreState: Dispatch>; + setRunnerPolicy: Dispatch>; setRuntimePolicySets: Dispatch>; token: string; }) { + async function saveRunnerPolicy(payload: GatewayRunnerPolicyUpsertRequest) { + if (!input.token) throw new Error('请先登录后再维护调度策略'); + input.setCoreState('loading'); + input.setCoreMessage(''); + try { + const policy = await updateRunnerPolicy(input.token, payload); + input.setRunnerPolicy(policy); + input.setCoreState('ready'); + input.setCoreMessage('全局调度策略已更新。'); + } catch (err) { + input.setCoreState('error'); + input.setCoreMessage(err instanceof Error ? err.message : '全局调度策略保存失败'); + throw err; + } + } + async function saveRuntimePolicySet(payload: RuntimePolicySetUpsertRequest, policySetId?: string) { if (!input.token) throw new Error('请先登录后再维护运行策略'); input.setCoreState('loading'); @@ -43,5 +60,5 @@ export function useRuntimePolicySetOperations(input: { } } - return { removeRuntimePolicySet, saveRuntimePolicySet }; + return { removeRuntimePolicySet, saveRunnerPolicy, saveRuntimePolicySet }; } diff --git a/apps/web/src/pages/AdminPage.tsx b/apps/web/src/pages/AdminPage.tsx index a69e005..480aab4 100644 --- a/apps/web/src/pages/AdminPage.tsx +++ b/apps/web/src/pages/AdminPage.tsx @@ -6,6 +6,7 @@ import type { GatewayAccessRuleBatchRequest, GatewayAccessRuleUpsertRequest, GatewayTenantUpsertRequest, + GatewayRunnerPolicyUpsertRequest, GatewayUserUpsertRequest, PricingRuleSetUpsertRequest, RuntimePolicySetUpsertRequest, @@ -62,6 +63,7 @@ export function AdminPage(props: { onSavePlatform: (input: PlatformWithModelsInput) => Promise; onSaveProvider: (input: CatalogProviderUpsertRequest, providerId?: string) => Promise; onSavePricingRuleSet: (input: PricingRuleSetUpsertRequest, ruleSetId?: string) => Promise; + onSaveRunnerPolicy: (input: GatewayRunnerPolicyUpsertRequest) => Promise; onSaveRuntimePolicySet: (input: RuntimePolicySetUpsertRequest, policySetId?: string) => Promise; onSaveAccessRule: (input: GatewayAccessRuleUpsertRequest, ruleId?: string) => Promise; onSaveTenant: (input: GatewayTenantUpsertRequest, tenantId?: string) => Promise; @@ -102,9 +104,11 @@ export function AdminPage(props: { {props.section === 'runtime' && ( )} diff --git a/apps/web/src/pages/WorkspacePage.tsx b/apps/web/src/pages/WorkspacePage.tsx index 94cabe5..2f510eb 100644 --- a/apps/web/src/pages/WorkspacePage.tsx +++ b/apps/web/src/pages/WorkspacePage.tsx @@ -851,6 +851,15 @@ function TaskAttemptPopoverContent(props: { task: GatewayTask }) { {taskAttemptMeta(attempt)} {attempt.status === 'failed' && {taskAttemptFailureReason(attempt)}} + {taskAttemptTrace(attempt).length > 0 && ( + + {taskAttemptTrace(attempt).map((entry, index) => ( + + {taskAttemptTraceText(entry)} + + ))} + + )} ))} @@ -879,9 +888,11 @@ function taskAttemptStatusText(status: string) { } function taskAttemptMeta(attempt: NonNullable[number]) { + const statusCode = taskAttemptStatusCode(attempt); const values = [ attempt.providerModelName || attempt.modelName || attempt.modelAlias, attempt.requestId ? `RequestID ${attempt.requestId}` : '', + statusCode ? `状态码 ${statusCode}` : '', attempt.responseDurationMs ? formatDuration(attempt.responseDurationMs) : '', ].filter(Boolean); return values.join(' · ') || attempt.clientId || '-'; @@ -894,8 +905,97 @@ function taskAttemptFailureReason(attempt: NonNullable[ metadataString(attempt.metrics, 'message'), ); const code = firstText(attempt.errorCode, metadataString(attempt.metrics, 'errorCode')); - if (detail && code && detail !== code) return `${detail}(${code})`; - return detail || code || '失败'; + const statusCode = taskAttemptStatusCode(attempt); + const statusText = statusCode ? `状态码 ${statusCode}` : ''; + const category = taskAttemptFailureCategory(attempt); + const categoryText = category ? `错误分类 ${category}` : ''; + if (detail && code && detail !== code) return [detail, code, statusText, categoryText].filter(Boolean).join(' · '); + return [detail || code || '失败', statusText, categoryText].filter(Boolean).join(' · '); +} + +function taskAttemptStatusCode(attempt: NonNullable[number]) { + const value = attempt.statusCode ?? metadataNumber(attempt.metrics, 'statusCode'); + return value && Number.isFinite(value) ? Math.trunc(value) : null; +} + +function taskAttemptTrace(attempt: NonNullable[number]) { + const raw = attempt.metrics?.trace; + if (!Array.isArray(raw)) return []; + return raw.filter((item): item is Record => Boolean(item) && typeof item === 'object' && !Array.isArray(item)); +} + +function taskAttemptTraceText(entry: Record) { + const event = objectString(entry, 'event'); + const action = objectString(entry, 'action'); + const reason = objectString(entry, 'reason'); + const statusCode = metadataNumber(entry, 'statusCode'); + const errorCode = objectString(entry, 'errorCode'); + const category = objectString(entry, 'category'); + const policy = taskAttemptTracePolicy(entry); + const values = [ + taskAttemptTraceEventLabel(event, action), + statusCode ? `状态码 ${Math.trunc(statusCode)}` : '', + errorCode ? `错误 ${errorCode}` : '', + category ? `错误分类 ${category}` : '', + policy, + reason ? `原因 ${taskAttemptTraceReasonLabel(reason)}` : '', + ].filter(Boolean); + return values.join(' · '); +} + +function taskAttemptFailureCategory(attempt: NonNullable[number]) { + const category = firstText( + metadataString(attempt.metrics, 'errorCategory'), + metadataString(attempt.metrics, 'category'), + ); + if (category) return category; + for (const entry of taskAttemptTrace(attempt)) { + const traceCategory = objectString(entry, 'category'); + if (traceCategory) return traceCategory; + } + return ''; +} + +function taskAttemptTracePolicy(entry: Record) { + const source = objectString(entry, 'policySource'); + const policy = objectString(entry, 'policy'); + const rule = objectString(entry, 'policyRule'); + const matchedValue = objectString(entry, 'matchedValue'); + const sourceLabel = source || policy; + const policyPath = [sourceLabel, rule].filter(Boolean).join('.'); + if (!policyPath) return ''; + return matchedValue ? `策略 ${policyPath}=${matchedValue}` : `策略 ${policyPath}`; +} + +function taskAttemptTraceEventLabel(event: string, action: string) { + if (event === 'failure') return '失败'; + if (event === 'same_client_retry') return action === 'retry' ? '本平台重试' : '本平台停止重试'; + if (event === 'failover_next') return '切换下个平台'; + if (event === 'failover_stop') return '停止切换平台'; + if (event === 'priority_demoted') return '优先级降级'; + return event || action || '链路事件'; +} + +function taskAttemptTraceReasonLabel(reason: string) { + const labels: Record = { + client_call_failed: '客户端调用失败', + retry_disabled: '模型重试关闭', + retry_deny_policy: '命中模型拒绝重试规则', + retry_allow_policy: '命中模型允许重试规则', + client_retryable: '客户端标记可重试', + client_non_retryable: '客户端标记不可重试', + same_client_max_attempts: '达到本平台最大尝试次数', + failover_time_budget_exceeded: '超过全局切换时间预算', + runner_policy_disabled: '全局调度策略停用', + hard_stop_policy: '命中硬拒绝规则', + failover_disabled: '平台切换关闭', + failover_deny_policy: '命中拒绝切换规则', + failover_allow_policy: '命中允许切换规则', + max_platforms_reached: '达到最大平台数', + no_next_platform: '没有更多候选平台', + priority_demote_policy: '命中优先级降级规则', + }; + return labels[reason] ?? reason; } function formatCellValue(value: unknown) { diff --git a/apps/web/src/pages/admin/PlatformManagementPanel.tsx b/apps/web/src/pages/admin/PlatformManagementPanel.tsx index 3a10a4a..2aae1de 100644 --- a/apps/web/src/pages/admin/PlatformManagementPanel.tsx +++ b/apps/web/src/pages/admin/PlatformManagementPanel.tsx @@ -158,7 +158,7 @@ export function PlatformManagementPanel(props: {
平台管理 -

配置平台授权、Base URL、重试与限流策略,并选择接入全部或部分基准模型。

+

配置平台授权、Base URL、平台内重试与限流策略,并选择接入全部或部分基准模型。

+ }
{(props.message || localError) &&

{localError || props.message}

} + }, + { value: 'runner', label: '全局调度策略', icon: }, + ]} + onValueChange={setActiveTab} + />
-
- {props.runtimePolicySets.map((policy) => ( -
-
-
-
- {policy.name} - {policy.policyKey} + {activeTab === 'runner' && ( + + )} + + {activeTab === 'model' && ( +
+ {props.runtimePolicySets.map((policy) => ( +
+
+
+
+ {policy.name} + {policy.policyKey} +
+ {policy.status} +
+ {policy.description &&

{policy.description}

} +
+ {rateLimitSummary(policy)} + {retrySummary(policy)} + {autoDisableSummary(policy)} + {degradeSummary(policy)}
- {policy.status} -
- {policy.description &&

{policy.description}

} -
- {rateLimitSummary(policy)} - {retrySummary(policy)} - {autoDisableSummary(policy)} - {degradeSummary(policy)} -
-
- - -
-
- ))} -
+
+ + +
+ + ))} + + )}
-
重试策略允许/拒绝关键词控制是否继续尝试下一个客户端
+
平台内重试策略允许/拒绝关键词控制同一平台是否再次调用
- setForm({ ...form, retryEnabled: checked })} /> + setForm({ ...form, retryEnabled: checked })} /> setForm({ ...form, retryAllowKeywords: value })} /> setForm({ ...form, retryDenyKeywords: value })} /> @@ -218,15 +289,244 @@ function Toggle(props: { checked: boolean; label: string; onChange: (checked: bo ); } -function KeywordField(props: { label: string; value: string; onChange: (value: string) => void }) { +function RunnerPolicyEditor(props: { + form: RunnerPolicyForm; + loading: boolean; + onChange: (form: RunnerPolicyForm) => void; + onSubmit: (event: FormEvent) => void; +}) { + const [activeStrategy, setActiveStrategy] = useState('failover'); + const patch = (next: Partial) => props.onChange({ ...props.form, ...next }); + const strategyDefinitions = [ + { + value: 'failover' as const, + title: '平台间故障切换', + description: '当前平台内部重试耗尽后是否尝试下一个候选平台', + enabled: props.form.failoverEnabled, + icon: , + }, + { + value: 'hardStop' as const, + title: '硬拒绝规则', + description: '参数、余额、权限等错误直接失败,不受模型覆盖影响', + enabled: props.form.hardStopEnabled, + icon: , + }, + { + value: 'priorityDemote' as const, + title: '优先级降级', + description: '命中后将失败平台的动态优先级调整到当前最后', + enabled: props.form.priorityDemoteEnabled, + icon: , + }, + ]; + const activeDefinition = strategyDefinitions.find((item) => item.value === activeStrategy) ?? strategyDefinitions[0]; + return ( -