From 98abd247d6fa9205f27f0d4d2a769090e1d8af34 Mon Sep 17 00:00:00 2001 From: wangbo Date: Tue, 12 May 2026 18:43:20 +0800 Subject: [PATCH] feat: add priority demotion controls --- .../httpapi/runtime_policy_handlers.go | 38 +++ apps/api/internal/httpapi/server.go | 1 + apps/api/internal/runner/retry_decision.go | 11 +- .../internal/runner/retry_decision_test.go | 13 +- apps/api/internal/runner/runtime_policy.go | 15 +- apps/api/internal/runner/trace.go | 6 +- apps/api/internal/store/candidates_test.go | 11 + apps/api/internal/store/postgres.go | 26 +- apps/api/internal/store/rate_limit_status.go | 179 ++++++++-- .../internal/store/rate_limit_status_test.go | 33 +- apps/api/internal/store/runner_policies.go | 1 - apps/api/internal/store/runtime_policies.go | 82 ++++- apps/api/migrations/0026_runner_policies.sql | 1 - ...4_runner_priority_demote_policy_repair.sql | 18 + .../0035_runner_priority_demote_auto_mode.sql | 5 + apps/web/src/App.tsx | 35 ++ apps/web/src/api.ts | 14 + apps/web/src/pages/AdminPage.tsx | 3 + .../web/src/pages/admin/RealtimeLoadPanel.tsx | 323 +++++++++++++++++- .../src/pages/admin/RuntimePoliciesPanel.tsx | 9 +- apps/web/src/styles/pages.css | 141 +++++++- packages/contracts/src/index.ts | 37 ++ 22 files changed, 917 insertions(+), 85 deletions(-) create mode 100644 apps/api/migrations/0034_runner_priority_demote_policy_repair.sql create mode 100644 apps/api/migrations/0035_runner_priority_demote_auto_mode.sql diff --git a/apps/api/internal/httpapi/runtime_policy_handlers.go b/apps/api/internal/httpapi/runtime_policy_handlers.go index 236f76b..95b0a3a 100644 --- a/apps/api/internal/httpapi/runtime_policy_handlers.go +++ b/apps/api/internal/httpapi/runtime_policy_handlers.go @@ -44,6 +44,44 @@ func (s *Server) updateRunnerPolicy(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, item) } +type updatePlatformDynamicPriorityRequest struct { + DynamicPriority *int `json:"dynamicPriority"` + Reset bool `json:"reset"` +} + +func (s *Server) updatePlatformDynamicPriority(w http.ResponseWriter, r *http.Request) { + var input updatePlatformDynamicPriorityRequest + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + writeError(w, http.StatusBadRequest, "invalid json body") + return + } + var dynamicPriority *int + if input.Reset { + dynamicPriority = nil + } else { + if input.DynamicPriority == nil { + writeError(w, http.StatusBadRequest, "dynamicPriority is required unless reset is true") + return + } + if *input.DynamicPriority < 0 { + writeError(w, http.StatusBadRequest, "dynamicPriority must be greater than or equal to 0") + return + } + dynamicPriority = input.DynamicPriority + } + item, err := s.store.UpdatePlatformDynamicPriority(r.Context(), r.PathValue("platformID"), dynamicPriority) + if err != nil { + if store.IsNotFound(err) { + writeError(w, http.StatusNotFound, "platform not found") + return + } + s.logger.Error("update platform dynamic priority failed", "error", err) + writeError(w, http.StatusInternalServerError, "update platform dynamic priority failed") + return + } + writeJSON(w, http.StatusOK, item) +} + func (s *Server) createRuntimePolicySet(w http.ResponseWriter, r *http.Request) { var input store.RuntimePolicySetInput if err := json.NewDecoder(r.Body).Decode(&input); err != nil { diff --git a/apps/api/internal/httpapi/server.go b/apps/api/internal/httpapi/server.go index ae89c66..5cff51b 100644 --- a/apps/api/internal/httpapi/server.go +++ b/apps/api/internal/httpapi/server.go @@ -105,6 +105,7 @@ func NewServerWithContext(ctx context.Context, cfg config.Config, db *store.Stor mux.Handle("GET /api/admin/platforms", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listPlatforms))) mux.Handle("POST /api/admin/platforms", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.createPlatform))) mux.Handle("PATCH /api/admin/platforms/{platformID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updatePlatform))) + mux.Handle("PATCH /api/admin/platforms/{platformID}/dynamic-priority", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updatePlatformDynamicPriority))) mux.Handle("DELETE /api/admin/platforms/{platformID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.deletePlatform))) mux.Handle("PUT /api/admin/platforms/{platformID}/models", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.replacePlatformModels))) mux.Handle("POST /api/admin/platforms/{platformID}/models", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.createPlatformModel))) diff --git a/apps/api/internal/runner/retry_decision.go b/apps/api/internal/runner/retry_decision.go index 7d15356..ecc06bd 100644 --- a/apps/api/internal/runner/retry_decision.go +++ b/apps/api/internal/runner/retry_decision.go @@ -43,7 +43,6 @@ type failoverDecision struct { type priorityDemoteDecision struct { Demote bool Reason string - Step int Match policyRuleMatch Info failureInfo } @@ -110,9 +109,9 @@ func failoverDecisionForCandidate(runnerPolicy store.RunnerPolicy, candidate sto return failoverDecision{Retry: false, Action: "stop", Reason: "client_non_retryable", Match: policyRuleMatch{Source: "provider_client", Policy: "ClientError", Rule: "Retryable", Value: "false"}, Info: info} } -func shouldDemoteCandidatePriority(runnerPolicy store.RunnerPolicy, err error) (bool, int) { +func shouldDemoteCandidatePriority(runnerPolicy store.RunnerPolicy, err error) bool { decision := priorityDemoteDecisionForCandidate(runnerPolicy, err) - return decision.Demote, decision.Step + return decision.Demote } func priorityDemoteDecisionForCandidate(runnerPolicy store.RunnerPolicy, err error) priorityDemoteDecision { @@ -128,11 +127,7 @@ func priorityDemoteDecisionForCandidate(runnerPolicy store.RunnerPolicy, err err return priorityDemoteDecision{Demote: false, Reason: "priority_demote_disabled", Info: info} } if match, ok := priorityDemotePolicyMatch(policy, info); ok { - step := intFromPolicy(policy, "demoteStep") - if step <= 0 { - step = 100 - } - return priorityDemoteDecision{Demote: true, Reason: "priority_demote_policy", Step: step, Match: match, Info: info} + return priorityDemoteDecision{Demote: true, Reason: "priority_demote_policy", Match: match, Info: info} } return priorityDemoteDecision{Demote: false, Reason: "priority_demote_no_match", Info: info} } diff --git a/apps/api/internal/runner/retry_decision_test.go b/apps/api/internal/runner/retry_decision_test.go index 25d3981..22db362 100644 --- a/apps/api/internal/runner/retry_decision_test.go +++ b/apps/api/internal/runner/retry_decision_test.go @@ -171,19 +171,16 @@ func TestPriorityDemotePolicyIsKeywordGatedAndHardStopSafe(t *testing.T) { "categories": []any{"request_error"}, }, PriorityDemotePolicy: map[string]any{ - "enabled": true, - "demoteStep": 25, - "keywords": []any{"rate_limit"}, + "enabled": true, + "keywords": []any{"rate_limit"}, }, } - shouldDemote, step := shouldDemoteCandidatePriority(runnerPolicy, &clients.ClientError{Code: "rate_limit", Message: "rate_limit from upstream", Retryable: true}) - if !shouldDemote || step != 25 { - t.Fatalf("priority demotion should be enabled only by matched policy, got shouldDemote=%v step=%d", shouldDemote, step) + if !shouldDemoteCandidatePriority(runnerPolicy, &clients.ClientError{Code: "rate_limit", Message: "rate_limit from upstream", Retryable: true}) { + t.Fatal("priority demotion should be enabled only by matched policy") } - shouldDemote, _ = shouldDemoteCandidatePriority(runnerPolicy, &clients.ClientError{Code: "bad_request", StatusCode: 400, Retryable: true}) - if shouldDemote { + if shouldDemoteCandidatePriority(runnerPolicy, &clients.ClientError{Code: "bad_request", StatusCode: 400, Retryable: true}) { t.Fatal("priority demotion should not run for hard-stop request errors") } } diff --git a/apps/api/internal/runner/runtime_policy.go b/apps/api/internal/runner/runtime_policy.go index 58ce14e..0fb2d59 100644 --- a/apps/api/internal/runner/runtime_policy.go +++ b/apps/api/internal/runner/runtime_policy.go @@ -73,17 +73,26 @@ func (s *Service) applyPriorityDemotePolicy(ctx context.Context, taskID string, if !decision.Demote { return } - if err := s.store.DemoteCandidatePlatformPriority(ctx, candidate.PlatformID, decision.Step); err == nil { - s.recordAttemptTrace(ctx, taskID, attemptNo, priorityDemoteTraceEntry(decision, candidate.PlatformID, candidate.PlatformModelID)) + if dynamicPriority, err := s.store.DemoteCandidatePlatformPriority(ctx, candidate.PlatformID); err == nil { + s.recordAttemptTrace(ctx, taskID, attemptNo, priorityDemoteTraceEntry(decision, candidate.PlatformID, candidate.PlatformModelID, dynamicPriority)) _ = s.emit(ctx, taskID, "task.policy.priority_demoted", "running", "priority_demote", 0.52, "candidate platform priority demoted by runner policy", addPolicyTracePayload(map[string]any{ "platformId": candidate.PlatformID, "platformModelId": candidate.PlatformModelID, - "demoteStep": decision.Step, + "dynamicPriority": dynamicPriority, "code": clients.ErrorCode(cause), + "reason": decision.Reason, + "errorMessage": messageFromError(cause), }, decision.Match, decision.Info), simulated) } } +func messageFromError(err error) string { + if err == nil { + return "" + } + return err.Error() +} + func effectiveRuntimePolicy(base map[string]any, override map[string]any, key string) map[string]any { policy := base if nested, ok := override[key].(map[string]any); ok && len(nested) > 0 { diff --git a/apps/api/internal/runner/trace.go b/apps/api/internal/runner/trace.go index fc36887..e0fcff5 100644 --- a/apps/api/internal/runner/trace.go +++ b/apps/api/internal/runner/trace.go @@ -34,10 +34,12 @@ func failoverTraceEntry(decision failoverDecision) map[string]any { return entry } -func priorityDemoteTraceEntry(decision priorityDemoteDecision, platformID string, platformModelID string) map[string]any { +func priorityDemoteTraceEntry(decision priorityDemoteDecision, platformID string, platformModelID string, dynamicPriority int) map[string]any { entry := policyTraceEntry("priority_demoted", "priority_demote", "demote", decision.Reason, decision.Match, decision.Info) entry["demote"] = decision.Demote - entry["demoteStep"] = decision.Step + if dynamicPriority > 0 { + entry["dynamicPriority"] = dynamicPriority + } entry["platformId"] = platformID entry["platformModelId"] = platformModelID return entry diff --git a/apps/api/internal/store/candidates_test.go b/apps/api/internal/store/candidates_test.go index 18659bd..5b41f7c 100644 --- a/apps/api/internal/store/candidates_test.go +++ b/apps/api/internal/store/candidates_test.go @@ -59,3 +59,14 @@ func TestRuntimeCandidateSortingAvoidsFullCandidatesButKeepsFallback(t *testing. t.Fatalf("expected full high-priority candidate to remain as avoided fallback, got %+v", candidates) } } + +func TestDefaultRunnerPriorityDemotePolicyUsesAutoMode(t *testing.T) { + policy := defaultRunnerPriorityDemotePolicy() + + if _, ok := policy["demoteStep"]; ok { + t.Fatal("priority demotion should be automatic and must not expose a demoteStep policy") + } + if policy["enabled"] != true { + t.Fatalf("expected default priority demotion to stay enabled, got %+v", policy["enabled"]) + } +} diff --git a/apps/api/internal/store/postgres.go b/apps/api/internal/store/postgres.go index adf0170..40a2245 100644 --- a/apps/api/internal/store/postgres.go +++ b/apps/api/internal/store/postgres.go @@ -3,6 +3,7 @@ package store import ( "context" "crypto/rand" + "database/sql" "encoding/base64" "encoding/json" "errors" @@ -67,6 +68,8 @@ type Platform struct { AuthType string `json:"authType"` Status string `json:"status"` Priority int `json:"priority"` + DynamicPriority *int `json:"dynamicPriority,omitempty"` + EffectivePriority int `json:"effectivePriority"` DefaultPricingMode string `json:"defaultPricingMode"` DefaultDiscountFactor float64 `json:"defaultDiscountFactor"` PricingRuleSetID string `json:"pricingRuleSetId,omitempty"` @@ -508,13 +511,14 @@ type TaskParamPreprocessingLog struct { func (s *Store) ListPlatforms(ctx context.Context) ([]Platform, error) { rows, err := s.pool.Query(ctx, ` -SELECT id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority, +SELECT id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, + priority, dynamic_priority, COALESCE(dynamic_priority, priority), default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''), config, credentials, retry_policy, rate_limit_policy, COALESCE(to_char(cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), created_at, updated_at FROM integration_platforms -ORDER BY priority ASC, created_at DESC`) +ORDER BY COALESCE(dynamic_priority, priority) ASC, priority ASC, created_at DESC`) if err != nil { return nil, err } @@ -527,6 +531,7 @@ ORDER BY priority ASC, created_at DESC`) var credentialsBytes []byte var retryPolicyBytes []byte var rateLimitPolicyBytes []byte + var dynamicPriority sql.NullInt64 if err := rows.Scan( &platform.ID, &platform.Provider, @@ -537,6 +542,8 @@ ORDER BY priority ASC, created_at DESC`) &platform.AuthType, &platform.Status, &platform.Priority, + &dynamicPriority, + &platform.EffectivePriority, &platform.DefaultPricingMode, &platform.DefaultDiscountFactor, &platform.PricingRuleSetID, @@ -550,6 +557,7 @@ ORDER BY priority ASC, created_at DESC`) ); err != nil { return nil, err } + platform.DynamicPriority = intPointerFromNull(dynamicPriority) platform.Config = decodeObject(configBytes) platform.CredentialsPreview = maskCredentialsPreview(credentialsBytes) platform.RetryPolicy = decodeObject(retryPolicyBytes) @@ -578,6 +586,7 @@ func (s *Store) CreatePlatform(ctx context.Context, input CreatePlatformInput) ( var credentialsResultBytes []byte var retryPolicyBytes []byte var rateLimitPolicyBytes []byte + var dynamicPriority sql.NullInt64 err := s.pool.QueryRow(ctx, ` INSERT INTO integration_platforms ( provider, platform_key, name, internal_name, base_url, auth_type, credentials, config, @@ -588,7 +597,8 @@ VALUES ( $1, COALESCE(NULLIF($2, ''), 'platform_' || replace(gen_random_uuid()::text, '-', '')), $3, NULLIF($4, ''), $5, $6, $7, $8, $9, $10, NULLIF($11, '')::uuid, $12, $13, $14 ) -RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority, +RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, + priority, dynamic_priority, COALESCE(dynamic_priority, priority), default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''), config, credentials, retry_policy, rate_limit_policy, COALESCE(to_char(cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), @@ -606,6 +616,8 @@ RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), C &platform.AuthType, &platform.Status, &platform.Priority, + &dynamicPriority, + &platform.EffectivePriority, &platform.DefaultPricingMode, &platform.DefaultDiscountFactor, &platform.PricingRuleSetID, @@ -620,6 +632,7 @@ RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), C if err != nil { return Platform{}, err } + platform.DynamicPriority = intPointerFromNull(dynamicPriority) platform.Config = decodeObject(configBytes) platform.CredentialsPreview = maskCredentialsPreview(credentialsResultBytes) platform.RetryPolicy = decodeObject(retryPolicyBytes) @@ -650,6 +663,7 @@ func (s *Store) UpdatePlatform(ctx context.Context, id string, input CreatePlatf var credentialsResultBytes []byte var retryPolicyBytes []byte var rateLimitPolicyBytes []byte + var dynamicPriority sql.NullInt64 err := s.pool.QueryRow(ctx, ` UPDATE integration_platforms SET provider = $2, @@ -672,7 +686,8 @@ SET provider = $2, rate_limit_policy = $15, updated_at = now() WHERE id = $1::uuid -RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority, +RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, + priority, dynamic_priority, COALESCE(dynamic_priority, priority), default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''), config, credentials, retry_policy, rate_limit_policy, COALESCE(to_char(cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), @@ -702,6 +717,8 @@ RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), C &platform.AuthType, &platform.Status, &platform.Priority, + &dynamicPriority, + &platform.EffectivePriority, &platform.DefaultPricingMode, &platform.DefaultDiscountFactor, &platform.PricingRuleSetID, @@ -716,6 +733,7 @@ RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), C if err != nil { return Platform{}, err } + platform.DynamicPriority = intPointerFromNull(dynamicPriority) platform.Config = decodeObject(configBytes) platform.CredentialsPreview = maskCredentialsPreview(credentialsResultBytes) platform.RetryPolicy = decodeObject(retryPolicyBytes) diff --git a/apps/api/internal/store/rate_limit_status.go b/apps/api/internal/store/rate_limit_status.go index 1c4e4c5..7ea8a40 100644 --- a/apps/api/internal/store/rate_limit_status.go +++ b/apps/api/internal/store/rate_limit_status.go @@ -2,8 +2,11 @@ package store import ( "context" + "database/sql" "sort" + "strconv" "strings" + "time" ) type RateLimitMetricStatus struct { @@ -17,30 +20,53 @@ type RateLimitMetricStatus struct { } type ModelRateLimitStatus struct { - PlatformModelID string `json:"platformModelId"` - PlatformID string `json:"platformId"` - PlatformName string `json:"platformName"` - Provider string `json:"provider"` - ModelName string `json:"modelName"` - ProviderModelName string `json:"providerModelName,omitempty"` - ModelAlias string `json:"modelAlias,omitempty"` - DisplayName string `json:"displayName"` - ModelType []string `json:"modelType"` - Enabled bool `json:"enabled"` - RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"` - PlatformCooldownUntil string `json:"platformCooldownUntil,omitempty"` - ModelCooldownUntil string `json:"modelCooldownUntil,omitempty"` - Concurrent RateLimitMetricStatus `json:"concurrent"` - QueuedTasks float64 `json:"queuedTasks"` - RPM RateLimitMetricStatus `json:"rpm"` - TPM RateLimitMetricStatus `json:"tpm"` - LoadRatio float64 `json:"loadRatio"` + PlatformModelID string `json:"platformModelId"` + PlatformID string `json:"platformId"` + PlatformName string `json:"platformName"` + Provider string `json:"provider"` + PlatformPriority int `json:"platformPriority"` + PlatformDynamicPriority *int `json:"platformDynamicPriority,omitempty"` + PlatformEffectivePriority int `json:"platformEffectivePriority"` + ModelName string `json:"modelName"` + ProviderModelName string `json:"providerModelName,omitempty"` + ModelAlias string `json:"modelAlias,omitempty"` + DisplayName string `json:"displayName"` + ModelType []string `json:"modelType"` + Enabled bool `json:"enabled"` + RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"` + PlatformCooldownUntil string `json:"platformCooldownUntil,omitempty"` + ModelCooldownUntil string `json:"modelCooldownUntil,omitempty"` + Concurrent RateLimitMetricStatus `json:"concurrent"` + QueuedTasks float64 `json:"queuedTasks"` + RPM RateLimitMetricStatus `json:"rpm"` + TPM RateLimitMetricStatus `json:"tpm"` + LoadRatio float64 `json:"loadRatio"` + RecentPriorityDemotions []PriorityDemotionRecord `json:"recentPriorityDemotions,omitempty"` +} + +type PriorityDemotionRecord struct { + ID string `json:"id"` + TaskID string `json:"taskId"` + PlatformID string `json:"platformId"` + PlatformModelID string `json:"platformModelId,omitempty"` + Reason string `json:"reason,omitempty"` + ErrorCode string `json:"errorCode,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` + Category string `json:"category,omitempty"` + StatusCode int `json:"statusCode,omitempty"` + PolicySource string `json:"policySource,omitempty"` + Policy string `json:"policy,omitempty"` + PolicyRule string `json:"policyRule,omitempty"` + MatchedValue string `json:"matchedValue,omitempty"` + DynamicPriority int `json:"dynamicPriority,omitempty"` + CreatedAt time.Time `json:"createdAt"` } func (s *Store) ListModelRateLimitStatuses(ctx context.Context) ([]ModelRateLimitStatus, error) { rows, err := s.pool.Query(ctx, ` - SELECT m.id::text, m.platform_id::text, p.name, p.provider, - m.model_name, COALESCE(NULLIF(m.provider_model_name, ''), m.model_name), COALESCE(m.model_alias, ''), + SELECT m.id::text, m.platform_id::text, p.name, p.provider, + p.priority, p.dynamic_priority, COALESCE(p.dynamic_priority, p.priority), + m.model_name, COALESCE(NULLIF(m.provider_model_name, ''), m.model_name), COALESCE(m.model_alias, ''), m.model_type, m.display_name, m.enabled, p.rate_limit_policy, COALESCE(rp.rate_limit_policy, '{}'::jsonb), COALESCE(NULLIF(m.runtime_policy_override, '{}'::jsonb), b.runtime_policy_override, '{}'::jsonb), m.rate_limit_policy, COALESCE(to_char(p.cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), @@ -119,6 +145,7 @@ ORDER BY p.priority ASC, m.model_name ASC`) var runtimePolicyBytes []byte var runtimeOverrideBytes []byte var modelPolicyBytes []byte + var platformDynamicPriority sql.NullInt64 var platformCooldownUntil string var modelCooldownUntil string var concurrentCurrent float64 @@ -134,6 +161,9 @@ ORDER BY p.priority ASC, m.model_name ASC`) &item.PlatformID, &item.PlatformName, &item.Provider, + &item.PlatformPriority, + &platformDynamicPriority, + &item.PlatformEffectivePriority, &item.ModelName, &item.ProviderModelName, &item.ModelAlias, @@ -157,6 +187,7 @@ ORDER BY p.priority ASC, m.model_name ASC`) ); err != nil { return nil, err } + item.PlatformDynamicPriority = intPointerFromNull(platformDynamicPriority) item.ModelType = decodeStringArray(modelTypeBytes) policy := effectiveModelRateLimitPolicy( decodeObject(platformPolicyBytes), @@ -177,6 +208,13 @@ ORDER BY p.priority ASC, m.model_name ASC`) if err := rows.Err(); err != nil { return nil, err } + demotions, err := s.listRecentPriorityDemotionsByPlatform(ctx, items, 10) + if err != nil { + return nil, err + } + for index := range items { + items[index].RecentPriorityDemotions = demotions[items[index].PlatformID] + } sort.SliceStable(items, func(i, j int) bool { if items[i].LoadRatio == items[j].LoadRatio { return strings.ToLower(items[i].DisplayName) < strings.ToLower(items[j].DisplayName) @@ -186,6 +224,91 @@ ORDER BY p.priority ASC, m.model_name ASC`) return items, nil } +func (s *Store) listRecentPriorityDemotionsByPlatform(ctx context.Context, statuses []ModelRateLimitStatus, limit int) (map[string][]PriorityDemotionRecord, error) { + out := map[string][]PriorityDemotionRecord{} + if limit <= 0 || len(statuses) == 0 { + return out, nil + } + seen := map[string]bool{} + platformIDs := make([]string, 0, len(statuses)) + for _, status := range statuses { + platformID := strings.TrimSpace(status.PlatformID) + if platformID == "" || seen[platformID] { + continue + } + seen[platformID] = true + platformIDs = append(platformIDs, platformID) + } + if len(platformIDs) == 0 { + return out, nil + } + rows, err := s.pool.Query(ctx, ` + SELECT id::text, task_id::text, COALESCE(message, ''), payload, created_at + FROM ( + SELECT e.*, + row_number() OVER ( + PARTITION BY e.payload->>'platformId' + ORDER BY e.created_at DESC, e.seq DESC + ) AS demotion_rank + FROM gateway_task_events e + WHERE e.event_type = 'task.policy.priority_demoted' + AND e.payload->>'platformId' = ANY($1::text[]) + ) ranked + WHERE demotion_rank <= $2 + ORDER BY payload->>'platformId' ASC, created_at DESC`, platformIDs, limit) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var id string + var taskID string + var message string + var payloadBytes []byte + var createdAt time.Time + if err := rows.Scan(&id, &taskID, &message, &payloadBytes, &createdAt); err != nil { + return nil, err + } + record := priorityDemotionRecordFromEventPayload(id, taskID, message, decodeObject(payloadBytes), createdAt) + if record.PlatformID == "" { + continue + } + out[record.PlatformID] = append(out[record.PlatformID], record) + } + return out, rows.Err() +} + +func priorityDemotionRecordFromEventPayload(id string, taskID string, message string, payload map[string]any, createdAt time.Time) PriorityDemotionRecord { + errorMessage := stringValue(payload["errorMessage"]) + if errorMessage == "" { + errorMessage = stringValue(payload["message"]) + } + if errorMessage == "" { + errorMessage = strings.TrimSpace(message) + } + errorCode := stringValue(payload["errorCode"]) + if errorCode == "" { + errorCode = stringValue(payload["code"]) + } + return PriorityDemotionRecord{ + ID: id, + TaskID: taskID, + PlatformID: stringValue(payload["platformId"]), + PlatformModelID: stringValue(payload["platformModelId"]), + Reason: stringValue(payload["reason"]), + ErrorCode: errorCode, + ErrorMessage: errorMessage, + Category: stringValue(payload["category"]), + StatusCode: intValue(payload["statusCode"]), + PolicySource: stringValue(payload["policySource"]), + Policy: stringValue(payload["policy"]), + PolicyRule: stringValue(payload["policyRule"]), + MatchedValue: stringValue(payload["matchedValue"]), + DynamicPriority: intValue(payload["dynamicPriority"]), + CreatedAt: createdAt, + } +} + func effectiveModelRateLimitPolicy(platformPolicy map[string]any, runtimePolicy map[string]any, runtimeOverride map[string]any, modelPolicy map[string]any) map[string]any { policy := platformPolicy if hasRateLimitRules(runtimePolicy) { @@ -279,3 +402,19 @@ func floatValue(value any) float64 { return 0 } } + +func intValue(value any) int { + switch typed := value.(type) { + case int: + return typed + case int64: + return int(typed) + case float64: + return int(typed) + case string: + parsed, _ := strconv.Atoi(strings.TrimSpace(typed)) + return parsed + default: + return 0 + } +} diff --git a/apps/api/internal/store/rate_limit_status_test.go b/apps/api/internal/store/rate_limit_status_test.go index d0e176f..f57413a 100644 --- a/apps/api/internal/store/rate_limit_status_test.go +++ b/apps/api/internal/store/rate_limit_status_test.go @@ -1,6 +1,9 @@ package store -import "testing" +import ( + "testing" + "time" +) func TestEffectiveModelRateLimitPolicyTreatsModelRulesAsAuthoritative(t *testing.T) { policy := effectiveModelRateLimitPolicy( @@ -30,3 +33,31 @@ func TestEffectiveModelRateLimitPolicyTreatsModelRulesAsAuthoritative(t *testing t.Fatalf("expected missing model tpm limit to mean unlimited, got %v", got) } } + +func TestPriorityDemotionRecordFromEventPayloadKeepsReason(t *testing.T) { + createdAt := time.Date(2026, 5, 12, 9, 30, 0, 0, time.UTC) + record := priorityDemotionRecordFromEventPayload("event-1", "task-1", "fallback message", map[string]any{ + "platformId": "platform-1", + "platformModelId": "platform-model-1", + "reason": "priority_demote_policy", + "errorCode": "rate_limit", + "errorMessage": "upstream 429 rate limit", + "category": "rate_limit", + "statusCode": float64(429), + "policySource": "gateway_runner_policies.priority_demote_policy", + "policy": "priorityDemotePolicy", + "policyRule": "categories", + "matchedValue": "rate_limit", + "dynamicPriority": float64(1511), + }, createdAt) + + if record.Reason != "priority_demote_policy" || record.ErrorMessage != "upstream 429 rate limit" { + t.Fatalf("expected demotion reason and error message to survive, got %+v", record) + } + if record.StatusCode != 429 || record.DynamicPriority != 1511 { + t.Fatalf("expected numeric demotion metadata, got %+v", record) + } + if !record.CreatedAt.Equal(createdAt) { + t.Fatalf("expected createdAt %s, got %s", createdAt, record.CreatedAt) + } +} diff --git a/apps/api/internal/store/runner_policies.go b/apps/api/internal/store/runner_policies.go index 45240ff..2ea4fca 100644 --- a/apps/api/internal/store/runner_policies.go +++ b/apps/api/internal/store/runner_policies.go @@ -134,7 +134,6 @@ func defaultRunnerPolicy() RunnerPolicy { func defaultRunnerPriorityDemotePolicy() map[string]any { return map[string]any{ "enabled": true, - "demoteStep": 100, "categories": []any{"network", "timeout", "stream_error", "rate_limit", "provider_5xx", "provider_overloaded"}, "codes": []any{"network", "timeout", "stream_read_error", "rate_limit", "server_error", "overloaded"}, "statusCodes": []any{408, 429, 500, 502, 503, 504}, diff --git a/apps/api/internal/store/runtime_policies.go b/apps/api/internal/store/runtime_policies.go index dc28538..ddd0c7a 100644 --- a/apps/api/internal/store/runtime_policies.go +++ b/apps/api/internal/store/runtime_policies.go @@ -2,8 +2,10 @@ package store import ( "context" + "database/sql" "encoding/json" "strings" + "time" "github.com/jackc/pgx/v5" ) @@ -28,6 +30,14 @@ type runtimePolicyScanner interface { Scan(dest ...any) error } +type PlatformDynamicPriorityState struct { + PlatformID string `json:"platformId"` + Priority int `json:"priority"` + DynamicPriority *int `json:"dynamicPriority,omitempty"` + EffectivePriority int `json:"effectivePriority"` + UpdatedAt time.Time `json:"updatedAt"` +} + func (s *Store) ListRuntimePolicySets(ctx context.Context) ([]RuntimePolicySet, error) { rows, err := s.pool.Query(ctx, `SELECT `+runtimePolicyColumns+` FROM model_runtime_policy_sets ORDER BY policy_key ASC`) if err != nil { @@ -149,26 +159,64 @@ WHERE id = $1::uuid`, platformModelID, cooldownSeconds) return err } -func (s *Store) DemoteCandidatePlatformPriority(ctx context.Context, platformID string, demoteStep int) error { +func (s *Store) DemoteCandidatePlatformPriority(ctx context.Context, platformID string) (int, error) { if strings.TrimSpace(platformID) == "" { + return 0, nil + } + var dynamicPriority int + err := s.pool.QueryRow(ctx, ` + UPDATE integration_platforms target + SET dynamic_priority = COALESCE(( + SELECT MAX(COALESCE(peer.dynamic_priority, peer.priority)) + FROM integration_platforms peer + WHERE peer.deleted_at IS NULL + ), target.priority) + 1, + updated_at = now() + WHERE target.id = $1::uuid + RETURNING dynamic_priority`, platformID).Scan(&dynamicPriority) + return dynamicPriority, err +} + +func (s *Store) UpdatePlatformDynamicPriority(ctx context.Context, platformID string, dynamicPriority *int) (PlatformDynamicPriorityState, error) { + if strings.TrimSpace(platformID) == "" { + return PlatformDynamicPriorityState{}, pgx.ErrNoRows + } + value := 0 + reset := dynamicPriority == nil + if dynamicPriority != nil { + value = *dynamicPriority + } + return scanPlatformDynamicPriorityState(s.pool.QueryRow(ctx, ` +UPDATE integration_platforms +SET dynamic_priority = CASE WHEN $2::boolean THEN priority ELSE $3::int END, + updated_at = now() +WHERE id = $1::uuid + AND deleted_at IS NULL +RETURNING id::text, priority, dynamic_priority, COALESCE(dynamic_priority, priority), updated_at`, platformID, reset, value)) +} + +func scanPlatformDynamicPriorityState(scanner runtimePolicyScanner) (PlatformDynamicPriorityState, error) { + var item PlatformDynamicPriorityState + var dynamicPriority sql.NullInt64 + if err := scanner.Scan( + &item.PlatformID, + &item.Priority, + &dynamicPriority, + &item.EffectivePriority, + &item.UpdatedAt, + ); err != nil { + return PlatformDynamicPriorityState{}, err + } + item.DynamicPriority = intPointerFromNull(dynamicPriority) + return item, nil +} + +func intPointerFromNull(value sql.NullInt64) *int { + if !value.Valid { return nil } - if demoteStep <= 0 { - demoteStep = 100 - } - _, err := s.pool.Exec(ctx, ` -UPDATE integration_platforms target -SET dynamic_priority = GREATEST( - COALESCE(target.dynamic_priority, target.priority), - COALESCE(( - SELECT MAX(COALESCE(peer.dynamic_priority, peer.priority)) - FROM integration_platforms peer - WHERE peer.deleted_at IS NULL - ), target.priority) + $2::int - ), - updated_at = now() -WHERE target.id = $1::uuid`, platformID, demoteStep) - return err + converted := int(value.Int64) + return &converted } func scanRuntimePolicySet(scanner runtimePolicyScanner) (RuntimePolicySet, error) { diff --git a/apps/api/migrations/0026_runner_policies.sql b/apps/api/migrations/0026_runner_policies.sql index 3c94af2..11fd9a2 100644 --- a/apps/api/migrations/0026_runner_policies.sql +++ b/apps/api/migrations/0026_runner_policies.sql @@ -49,7 +49,6 @@ VALUES ( }'::jsonb, '{ "enabled": true, - "demoteStep": 100, "categories": ["network", "timeout", "stream_error", "rate_limit", "provider_5xx", "provider_overloaded"], "codes": ["network", "timeout", "stream_read_error", "rate_limit", "server_error", "overloaded"], "statusCodes": [408, 429, 500, 502, 503, 504], diff --git a/apps/api/migrations/0034_runner_priority_demote_policy_repair.sql b/apps/api/migrations/0034_runner_priority_demote_policy_repair.sql new file mode 100644 index 0000000..ede99a2 --- /dev/null +++ b/apps/api/migrations/0034_runner_priority_demote_policy_repair.sql @@ -0,0 +1,18 @@ +ALTER TABLE IF EXISTS gateway_runner_policies + ADD COLUMN IF NOT EXISTS priority_demote_policy jsonb NOT NULL DEFAULT '{}'::jsonb; + +UPDATE gateway_runner_policies +SET priority_demote_policy = CASE + WHEN COALESCE(priority_demote_policy, '{}'::jsonb) = '{}'::jsonb THEN + '{ + "enabled": true, + "categories": ["network", "timeout", "stream_error", "rate_limit", "provider_5xx", "provider_overloaded"], + "codes": ["network", "timeout", "stream_read_error", "rate_limit", "server_error", "overloaded"], + "statusCodes": [408, 429, 500, 502, 503, 504], + "keywords": ["timeout", "network", "rate_limit", "overloaded", "temporarily_unavailable", "server_error", "429", "5xx"] + }'::jsonb + ELSE priority_demote_policy + END, + metadata = metadata || jsonb_build_object('priorityDemotePolicyRepair', '0034_runner_priority_demote_policy_repair'), + updated_at = now() +WHERE policy_key = 'default-runner-v1'; diff --git a/apps/api/migrations/0035_runner_priority_demote_auto_mode.sql b/apps/api/migrations/0035_runner_priority_demote_auto_mode.sql new file mode 100644 index 0000000..b993adb --- /dev/null +++ b/apps/api/migrations/0035_runner_priority_demote_auto_mode.sql @@ -0,0 +1,5 @@ +UPDATE gateway_runner_policies +SET priority_demote_policy = priority_demote_policy - 'demoteStep', + metadata = metadata || jsonb_build_object('priorityDemoteAutoMode', '0035_runner_priority_demote_auto_mode'), + updated_at = now() +WHERE policy_key = 'default-runner-v1'; diff --git a/apps/web/src/App.tsx b/apps/web/src/App.tsx index 51fe812..0b909f7 100644 --- a/apps/web/src/App.tsx +++ b/apps/web/src/App.tsx @@ -19,6 +19,7 @@ import type { IntegrationPlatform, ModelCatalogResponse, ModelRateLimitStatus, + PlatformDynamicPriorityUpdateRequest, PlatformModel, PricingRule, PricingRuleSet, @@ -79,6 +80,7 @@ import { updateAccessRule, updateGatewayUser, updatePlatform, + updatePlatformDynamicPriority, updateTenant, updateUserGroup, } from './api'; @@ -558,6 +560,38 @@ export function App() { } } + async function savePlatformDynamicPriority(platformId: string, input: PlatformDynamicPriorityUpdateRequest) { + setCoreState('loading'); + setCoreMessage(''); + try { + const state = await updatePlatformDynamicPriority(token, platformId, input); + setPlatforms((current) => current.map((platform) => platform.id === platformId + ? { + ...platform, + dynamicPriority: state.dynamicPriority, + effectivePriority: state.effectivePriority, + priority: state.priority, + updatedAt: state.updatedAt, + } + : platform)); + setModelRateLimits((current) => current.map((status) => status.platformId === platformId + ? { + ...status, + platformDynamicPriority: state.dynamicPriority, + platformEffectivePriority: state.effectivePriority, + platformPriority: state.priority, + } + : status)); + invalidateDataKeys('modelCatalog', 'modelRateLimits', 'platforms', 'playgroundModels'); + setCoreState('ready'); + setCoreMessage(input.reset ? '平台动态优先级已重置。' : '平台动态优先级已更新。'); + } catch (err) { + setCoreState('error'); + setCoreMessage(err instanceof Error ? err.message : '更新平台动态优先级失败'); + throw err; + } + } + async function removePlatform(platformId: string) { setCoreState('loading'); setCoreMessage(''); @@ -994,6 +1028,7 @@ export function App() { onResetAllBaseModels={resetAllBaseModelsToDefault} onResetBaseModel={resetBaseModelToDefault} onSavePlatform={savePlatformWithModels} + onSavePlatformDynamicPriority={savePlatformDynamicPriority} onSaveProvider={saveProvider} onSavePricingRuleSet={savePricingRuleSet} onSaveRunnerPolicy={saveRunnerPolicy} diff --git a/apps/web/src/api.ts b/apps/web/src/api.ts index 20c471f..eeb407c 100644 --- a/apps/web/src/api.ts +++ b/apps/web/src/api.ts @@ -24,6 +24,8 @@ import type { ListResponse, ModelCatalogResponse, ModelRateLimitStatus, + PlatformDynamicPriorityState, + PlatformDynamicPriorityUpdateRequest, PlatformModel, PlayableGatewayApiKey, PricingRule, @@ -102,6 +104,18 @@ export async function listPlatforms(token: string): Promise>('/api/admin/platforms', { token }); } +export async function updatePlatformDynamicPriority( + token: string, + platformId: string, + input: PlatformDynamicPriorityUpdateRequest, +): Promise { + return request(`/api/admin/platforms/${platformId}/dynamic-priority`, { + body: input, + method: 'PATCH', + token, + }); +} + export async function listModels(token: string): Promise> { return request>('/api/admin/models', { token }); } diff --git a/apps/web/src/pages/AdminPage.tsx b/apps/web/src/pages/AdminPage.tsx index 56f5f32..6f6a18f 100644 --- a/apps/web/src/pages/AdminPage.tsx +++ b/apps/web/src/pages/AdminPage.tsx @@ -8,6 +8,7 @@ import type { GatewayTenantUpsertRequest, GatewayRunnerPolicyUpsertRequest, GatewayUserUpsertRequest, + PlatformDynamicPriorityUpdateRequest, PricingRuleSetUpsertRequest, RuntimePolicySetUpsertRequest, UserGroupUpsertRequest, @@ -63,6 +64,7 @@ export function AdminPage(props: { onResetBaseModel: (baseModelId: string) => Promise; onBatchAccessRules: (input: GatewayAccessRuleBatchRequest) => Promise; onSavePlatform: (input: PlatformWithModelsInput) => Promise; + onSavePlatformDynamicPriority: (platformId: string, input: PlatformDynamicPriorityUpdateRequest) => Promise; onSaveProvider: (input: CatalogProviderUpsertRequest, providerId?: string) => Promise; onSavePricingRuleSet: (input: PricingRuleSetUpsertRequest, ruleSetId?: string) => Promise; onSaveRunnerPolicy: (input: GatewayRunnerPolicyUpsertRequest) => Promise; @@ -154,6 +156,7 @@ export function AdminPage(props: { modelRateLimits={props.data.modelRateLimits} modelRateLimitsUpdatedAt={props.data.modelRateLimitsUpdatedAt} platforms={props.data.platforms} + onSavePlatformDynamicPriority={props.onSavePlatformDynamicPriority} /> )} {props.section === 'tenants' && } diff --git a/apps/web/src/pages/admin/RealtimeLoadPanel.tsx b/apps/web/src/pages/admin/RealtimeLoadPanel.tsx index 235b6a2..8dbc5e6 100644 --- a/apps/web/src/pages/admin/RealtimeLoadPanel.tsx +++ b/apps/web/src/pages/admin/RealtimeLoadPanel.tsx @@ -1,14 +1,19 @@ -import { useEffect, useMemo, useState } from 'react'; -import { Gauge } from 'lucide-react'; -import type { IntegrationPlatform, ModelRateLimitStatus } from '@easyai-ai-gateway/contracts'; -import { Badge, Card, CardContent, CardHeader, CardTitle, EmptyState, Table, TableCell, TableHead, TableRow } from '../../components/ui'; +import { useEffect, useMemo, useState, type FormEvent } from 'react'; +import { Popover as AntPopover } from 'antd'; +import { CheckCircle2, Gauge, History, RotateCcw, SlidersHorizontal } from 'lucide-react'; +import type { IntegrationPlatform, ModelRateLimitStatus, PlatformDynamicPriorityUpdateRequest, PriorityDemotionRecord } from '@easyai-ai-gateway/contracts'; +import { Badge, Button, Card, CardContent, CardHeader, CardTitle, EmptyState, FormDialog, Input, Label, Table, TableCell, TableHead, TableRow } from '../../components/ui'; export function RealtimeLoadPanel(props: { modelRateLimits: ModelRateLimitStatus[]; modelRateLimitsUpdatedAt: number | null; platforms: IntegrationPlatform[]; + onSavePlatformDynamicPriority: (platformId: string, input: PlatformDynamicPriorityUpdateRequest) => Promise; }) { const [now, setNow] = useState(() => Date.now()); + const [priorityDialog, setPriorityDialog] = useState(null); + const [priorityError, setPriorityError] = useState(''); + const [prioritySaving, setPrioritySaving] = useState(false); const platformMap = useMemo(() => new Map(props.platforms.map((item) => [item.id, item])), [props.platforms]); useEffect(() => { @@ -16,6 +21,50 @@ export function RealtimeLoadPanel(props: { return () => window.clearInterval(timer); }, []); + function openPriorityDialog(status: ModelRateLimitStatus, platform: IntegrationPlatform | undefined) { + setPriorityError(''); + setPriorityDialog({ + platform, + status, + value: formatPriority(platformEffectivePriority(status, platform)), + }); + } + + function closePriorityDialog() { + if (prioritySaving) return; + setPriorityDialog(null); + setPriorityError(''); + } + + async function submitPriorityForm(event: FormEvent) { + event.preventDefault(); + if (!priorityDialog) return; + const dynamicPriority = parsePriorityInput(priorityDialog.value); + if (dynamicPriority === null) { + setPriorityError('请输入大于等于 0 的整数。'); + return; + } + await savePriority(priorityDialog.status.platformId, { dynamicPriority }); + } + + async function resetPriority() { + if (!priorityDialog) return; + await savePriority(priorityDialog.status.platformId, { reset: true }); + } + + async function savePriority(platformId: string, input: PlatformDynamicPriorityUpdateRequest) { + setPrioritySaving(true); + setPriorityError(''); + try { + await props.onSavePlatformDynamicPriority(platformId, input); + setPriorityDialog(null); + } catch (err) { + setPriorityError(err instanceof Error ? err.message : '更新平台动态优先级失败'); + } finally { + setPrioritySaving(false); + } + } + return (
@@ -31,14 +80,36 @@ export function RealtimeLoadPanel(props: { platformMap={platformMap} statuses={props.modelRateLimits} updatedAt={props.modelRateLimitsUpdatedAt} + onAdjustPriority={openPriorityDialog} /> + setPriorityDialog((current) => current ? { ...current, value } : current)} + />
); } -function RateLimitStatusTable(props: { statuses: ModelRateLimitStatus[]; platformMap: Map; now: number; updatedAt: number | null }) { +type PriorityDialogState = { + platform: IntegrationPlatform | undefined; + status: ModelRateLimitStatus; + value: string; +}; + +function RateLimitStatusTable(props: { + statuses: ModelRateLimitStatus[]; + platformMap: Map; + now: number; + updatedAt: number | null; + onAdjustPriority: (status: ModelRateLimitStatus, platform: IntegrationPlatform | undefined) => void; +}) { if (!props.statuses.length) { return ; } @@ -53,6 +124,8 @@ function RateLimitStatusTable(props: { statuses: ModelRateLimitStatus[]; platfor 模型 平台 + 平台优先级 + 满载率 并发 正在执行 / 并发 / 排队 @@ -60,7 +133,6 @@ function RateLimitStatusTable(props: { statuses: ModelRateLimitStatus[]; platfor TPM RPM 状态 - 满载率 {props.statuses.map((status) => { const platform = props.platformMap.get(status.platformId); @@ -78,16 +150,17 @@ function RateLimitStatusTable(props: { statuses: ModelRateLimitStatus[]; platfor {status.provider} - {concurrencyMetricCell(status)} - {metricCell(status.tpm, true)} - {metricCell(status.rpm)} - {modelRuntimeStatusCell(status, props.now)} + {platformPriorityCell(status, platform, props.onAdjustPriority)} 0.8 ? 'true' : undefined}> {formatPercent(status.loadRatio)} + {concurrencyMetricCell(status)} + {metricCell(status.tpm, true)} + {metricCell(status.rpm)} + {modelRuntimeStatusCell(status, props.now)} ); })} @@ -101,6 +174,176 @@ function platformDisplayName(platform: IntegrationPlatform) { return platform.internalName?.trim() || platform.name; } +function platformPriorityCell( + status: ModelRateLimitStatus, + platform: IntegrationPlatform | undefined, + onAdjustPriority: (status: ModelRateLimitStatus, platform: IntegrationPlatform | undefined) => void, +) { + const records = status.recentPriorityDemotions ?? []; + const content = ; + return ( + + + + + + + ); +} + +function PlatformPriorityDialog(props: { + dialog: PriorityDialogState | null; + error: string; + saving: boolean; + onClose: () => void; + onReset: () => void; + onSubmit: (event: FormEvent) => void; + onValueChange: (value: string) => void; +}) { + const dialog = props.dialog; + const platform = dialog?.platform; + const status = dialog?.status; + return ( + + + + + + )} + open={Boolean(dialog)} + title="调整平台动态优先级" + onClose={props.onClose} + onSubmit={props.onSubmit} + > +
+ + {platform ? platformDisplayName(platform) : status?.platformName} + {status?.provider} + + + {formatPriority(platformEffectivePriority(status, platform))} + 当前生效 + +
+
+ 静态 {formatPriority(platformStaticPriority(status, platform))} + 动态 {formatPriority(platformDynamicPriority(status, platform))} +
+ + {props.error &&

{props.error}

} +
+ ); +} + +function PriorityDemotionPopover(props: { records: PriorityDemotionRecord[] }) { + if (!props.records.length) { + return ( + + 暂无优先级降级记录 + + ); + } + return ( + + + + 最近 10 条优先级降级 + + {props.records.map((record) => ( + + + {priorityDemotionReasonText(record)} + {priorityIsSet(record.dynamicPriority) ? 队尾 {formatPriority(record.dynamicPriority)} : null} + + {priorityDemotionMetaText(record)} + {record.errorMessage && {record.errorMessage}} + + ))} + + ); +} + +function platformPrioritySubtitle(status: ModelRateLimitStatus, platform: IntegrationPlatform | undefined, demotionCount: number) { + const staticPriority = platformStaticPriority(status, platform); + const dynamicPriority = platformDynamicPriority(status, platform); + const base = priorityIsSet(dynamicPriority) ? `静态 ${formatPriority(staticPriority)} · 动态 ${formatPriority(dynamicPriority)}` : `静态 ${formatPriority(staticPriority)}`; + return demotionCount ? `${base} · 降级 ${demotionCount}` : base; +} + +function priorityDemotionAriaLabel(status: ModelRateLimitStatus, platform: IntegrationPlatform | undefined) { + const count = status.recentPriorityDemotions?.length ?? 0; + return `平台优先级 ${formatPriority(platformEffectivePriority(status, platform))},最近 ${count} 条优先级降级记录`; +} + +function priorityDemotionReasonText(record: PriorityDemotionRecord) { + const category = record.category ? `错误分类 ${record.category}` : ''; + const code = record.errorCode ? `错误 ${record.errorCode}` : ''; + const statusCode = record.statusCode ? `状态码 ${record.statusCode}` : ''; + return [priorityDemotionReasonLabel(record.reason), statusCode, code, category].filter(Boolean).join(' · '); +} + +function priorityDemotionMetaText(record: PriorityDemotionRecord) { + const policy = priorityDemotionPolicyText(record); + const values = [ + formatDateTime(record.createdAt), + shortId(record.taskId) ? `任务 ${shortId(record.taskId)}` : '', + policy, + ].filter(Boolean); + return values.join(' · ') || '-'; +} + +function priorityDemotionPolicyText(record: PriorityDemotionRecord) { + const policyPath = [record.policySource || record.policy, record.policyRule].filter(Boolean).join('.'); + if (!policyPath) return ''; + return record.matchedValue ? `策略 ${policyPath}=${record.matchedValue}` : `策略 ${policyPath}`; +} + +function priorityDemotionReasonLabel(reason: string | undefined) { + const labels: Record = { + priority_demote_policy: '命中优先级降级规则', + hard_stop_policy: '命中硬拒绝规则', + runner_policy_disabled: '全局调度策略停用', + }; + return reason ? labels[reason] ?? reason : '优先级降级'; +} + function metricCell(metric: ModelRateLimitStatus['rpm'], includeReserved = false) { if (!metric.limited) return {formatLimit(metric.currentValue)} / 不限{includeReserved ? reservedMetricText(metric) : '未配置上限'}; return ( @@ -132,6 +375,19 @@ function formatTimeOfDay(timestamp: number | null) { return `${pad(date.getHours())}:${pad(date.getMinutes())}:${pad(date.getSeconds())}`; } +function formatDateTime(value: string | undefined) { + if (!value) return ''; + const date = new Date(value); + if (Number.isNaN(date.getTime())) return value; + const pad = (item: number) => String(item).padStart(2, '0'); + return `${pad(date.getMonth() + 1)}-${pad(date.getDate())} ${pad(date.getHours())}:${pad(date.getMinutes())}:${pad(date.getSeconds())}`; +} + +function shortId(value: string | undefined) { + if (!value) return ''; + return value.length > 8 ? value.slice(0, 8) : value; +} + function modelRuntimeStatusCell(status: ModelRateLimitStatus, now: number) { const modelCooldownMs = cooldownRemainingMs(status.modelCooldownUntil, now); const platformCooldownMs = cooldownRemainingMs(status.platformCooldownUntil, now); @@ -184,6 +440,53 @@ function formatLimit(value: number) { return trimNumber(value); } +function platformEffectivePriority(status: ModelRateLimitStatus | undefined, platform: IntegrationPlatform | undefined) { + return firstPriority( + status?.platformEffectivePriority, + platform?.effectivePriority, + status?.platformDynamicPriority, + platform?.dynamicPriority, + status?.platformPriority, + platform?.priority, + ); +} + +function platformStaticPriority(status: ModelRateLimitStatus | undefined, platform: IntegrationPlatform | undefined) { + return firstPriority(status?.platformPriority, platform?.priority); +} + +function platformDynamicPriority(status: ModelRateLimitStatus | undefined, platform: IntegrationPlatform | undefined) { + return firstPriority( + status?.platformDynamicPriority, + platform?.dynamicPriority, + status?.platformEffectivePriority, + platform?.effectivePriority, + status?.platformPriority, + platform?.priority, + ); +} + +function firstPriority(...values: Array) { + return values.find(priorityIsSet); +} + +function priorityIsSet(value: number | undefined): value is number { + return typeof value === 'number' && Number.isFinite(value); +} + +function formatPriority(value: number | undefined) { + if (!Number.isFinite(value)) return '-'; + return String(Math.trunc(value ?? 0)); +} + +function parsePriorityInput(value: string) { + const trimmed = value.trim(); + if (!/^\d+$/.test(trimmed)) return null; + const parsed = Number(trimmed); + if (!Number.isSafeInteger(parsed)) return null; + return parsed; +} + function trimNumber(value: number) { return Number.isInteger(value) ? String(value) : value.toFixed(2).replace(/\.?0+$/, ''); } diff --git a/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx b/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx index 5cd0bc2..9ba6d4b 100644 --- a/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx +++ b/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx @@ -50,7 +50,6 @@ type RunnerPolicyForm = { hardStopStatusCodes: string[]; hardStopKeywords: string[]; priorityDemoteEnabled: boolean; - priorityDemoteStep: string; priorityDemoteCategories: string[]; priorityDemoteCodes: string[]; priorityDemoteStatusCodes: string[]; @@ -406,11 +405,7 @@ function RunnerPolicyEditor(props: { {activeStrategy === 'priorityDemote' && (
patch({ priorityDemoteEnabled: checked })} /> - + 命中降级规则后,失败平台会自动调整到当前所有平台的优先级队尾。 patch({ priorityDemoteCategories: value })} /> patch({ priorityDemoteCodes: value })} /> patch({ priorityDemoteStatusCodes: value })} /> @@ -478,7 +473,6 @@ function runnerPolicyToForm(policy: GatewayRunnerPolicy | null): RunnerPolicyFor hardStopStatusCodes: tagsFromValue(hardStop.statusCodes ?? []), hardStopKeywords: tagsFromValue(hardStop.keywords ?? ['invalid_parameter', 'missing required', 'bad request', 'insufficient balance']), priorityDemoteEnabled: readBool(priorityDemote.enabled, true), - priorityDemoteStep: String(readNumber(priorityDemote.demoteStep, 100)), priorityDemoteCategories: tagsFromValue(priorityDemote.categories ?? ['network', 'timeout', 'stream_error', 'rate_limit', 'provider_5xx', 'provider_overloaded']), priorityDemoteCodes: tagsFromValue(priorityDemote.codes ?? ['network', 'timeout', 'stream_read_error', 'rate_limit', 'server_error', 'overloaded']), priorityDemoteStatusCodes: tagsFromValue(priorityDemote.statusCodes ?? [408, 429, 500, 502, 503, 504]), @@ -516,7 +510,6 @@ function runnerFormToPayload(form: RunnerPolicyForm): GatewayRunnerPolicyUpsertR }, priorityDemotePolicy: { enabled: form.priorityDemoteEnabled, - demoteStep: positiveInt(form.priorityDemoteStep, 100), categories: cleanTags(form.priorityDemoteCategories), codes: cleanTags(form.priorityDemoteCodes), statusCodes: parseNumberTags(form.priorityDemoteStatusCodes), diff --git a/apps/web/src/styles/pages.css b/apps/web/src/styles/pages.css index 2a242c0..ea68b4b 100644 --- a/apps/web/src/styles/pages.css +++ b/apps/web/src/styles/pages.css @@ -1016,8 +1016,8 @@ } .platformLimitTable .shTableRow { - grid-template-columns: minmax(180px, 1.15fr) minmax(160px, 0.95fr) 150px 170px 140px 132px 132px; - min-width: 1064px; + grid-template-columns: minmax(180px, 1.1fr) minmax(160px, 0.9fr) 160px 132px 150px 170px 140px 132px; + min-width: 1224px; } .platformLimitTable .shTableHead, @@ -1113,6 +1113,143 @@ background: var(--destructive); } +.platformPriorityCell { + display: grid; + width: 100%; + min-width: 0; + gap: 7px; + align-content: start; +} + +.platformPriorityCell .rateMetricCell small { + overflow: visible; + line-height: 1.35; + text-overflow: clip; + white-space: normal; +} + +.priorityDemotionTrigger { + display: grid; + width: 100%; + padding: 0; + border: 0; + background: transparent; + color: inherit; + cursor: default; + text-align: left; + font: inherit; +} + +.priorityAdjustButton { + justify-self: start; + min-height: 22px; + padding-inline: 8px; + border-color: var(--border); + color: var(--text-normal); + background: #fff; +} + +.priorityDemotionAntPopover { + z-index: 1200; +} + +.priorityDemotionPopover { + display: grid; + width: min(36rem, calc(100vw - 2rem)); + gap: 0.6rem; +} + +.priorityDemotionHeader, +.priorityDemotionItemHeader { + display: flex; + min-width: 0; + align-items: center; + justify-content: space-between; + gap: 0.5rem; +} + +.priorityDemotionHeader { + color: var(--text-strong); +} + +.priorityDemotionHeader strong, +.priorityDemotionItemHeader strong { + min-width: 0; + overflow-wrap: anywhere; +} + +.priorityDemotionItem { + display: grid; + gap: 0.35rem; + padding-bottom: 0.6rem; + border-bottom: 1px solid var(--border); +} + +.priorityDemotionItem:last-child { + padding-bottom: 0; + border-bottom: 0; +} + +.priorityDemotionItem small, +.priorityDemotionEmpty { + color: var(--text-soft); + font-size: var(--font-size-xs); + line-height: 1.4; +} + +.priorityDemotionError { + color: var(--destructive); + font-size: var(--font-size-xs); + line-height: 1.45; + overflow-wrap: anywhere; +} + +.platformPriorityDialog { + width: min(30rem, calc(100vw - 2rem)); +} + +.platformPriorityDialogBody { + display: grid; + gap: 1rem; +} + +.platformPriorityDialogSummary, +.platformPriorityDialogMetrics { + display: flex; + min-width: 0; + align-items: center; + justify-content: space-between; + gap: 1rem; +} + +.platformPriorityDialogSummary { + padding: 0.75rem; + border: 1px solid var(--border-subtle); + border-radius: 8px; + background: #f8fafc; +} + +.platformPriorityDialogSummary span { + display: grid; + min-width: 0; + gap: 3px; +} + +.platformPriorityDialogSummary span:last-child { + justify-items: end; + font-variant-numeric: tabular-nums; +} + +.platformPriorityDialogSummary strong { + color: var(--text-strong); +} + +.platformPriorityDialogSummary small, +.platformPriorityDialogMetrics { + color: var(--muted-foreground); + font-size: var(--font-size-xs); +} + .platformModelToolbar { display: grid; grid-template-columns: minmax(220px, 0.6fr) minmax(260px, 1fr); diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts index 7802fb2..166f903 100644 --- a/packages/contracts/src/index.ts +++ b/packages/contracts/src/index.ts @@ -62,6 +62,8 @@ export interface IntegrationPlatform { authType: string; status: 'enabled' | 'disabled' | string; priority: number; + dynamicPriority?: number; + effectivePriority?: number; defaultPricingMode: PricingMode; defaultDiscountFactor: number; pricingRuleSetId?: string; @@ -760,11 +762,45 @@ export interface RateLimitMetricStatus { resetAt?: string; } +export interface PriorityDemotionRecord { + id: string; + taskId: string; + platformId: string; + platformModelId?: string; + reason?: string; + errorCode?: string; + errorMessage?: string; + category?: string; + statusCode?: number; + policySource?: string; + policy?: string; + policyRule?: string; + matchedValue?: string; + dynamicPriority?: number; + createdAt: string; +} + +export interface PlatformDynamicPriorityUpdateRequest { + dynamicPriority?: number; + reset?: boolean; +} + +export interface PlatformDynamicPriorityState { + platformId: string; + priority: number; + dynamicPriority?: number; + effectivePriority: number; + updatedAt: string; +} + export interface ModelRateLimitStatus { platformModelId: string; platformId: string; platformName: string; provider: string; + platformPriority: number; + platformDynamicPriority?: number; + platformEffectivePriority: number; modelName: string; providerModelName?: string; modelAlias?: string; @@ -779,6 +815,7 @@ export interface ModelRateLimitStatus { rpm: RateLimitMetricStatus; tpm: RateLimitMetricStatus; loadRatio: number; + recentPriorityDemotions?: PriorityDemotionRecord[]; } export interface GatewayNetworkProxyConfig {