From 682a491d2706b2358c369feb9e71ca7dc210cff7 Mon Sep 17 00:00:00 2001 From: wangbo Date: Tue, 12 May 2026 21:32:23 +0800 Subject: [PATCH] Decouple stream cancellation and clarify failover rules --- .../httpapi/core_flow_integration_test.go | 62 ++++++ apps/api/internal/httpapi/handlers.go | 52 ++++- apps/api/internal/store/rate_limit_status.go | 125 ++++++++++- .../internal/store/rate_limit_status_test.go | 23 ++ .../web/src/pages/admin/RealtimeLoadPanel.tsx | 84 ++++++- .../src/pages/admin/RuntimePoliciesPanel.tsx | 209 +++++++++++++++++- apps/web/src/styles/pages.css | 71 ++++++ packages/contracts/src/index.ts | 20 ++ 8 files changed, 629 insertions(+), 17 deletions(-) diff --git a/apps/api/internal/httpapi/core_flow_integration_test.go b/apps/api/internal/httpapi/core_flow_integration_test.go index 1e2cef7..48659b9 100644 --- a/apps/api/internal/httpapi/core_flow_integration_test.go +++ b/apps/api/internal/httpapi/core_flow_integration_test.go @@ -351,6 +351,48 @@ VALUES ($1, 5, '{"purpose":"core-flow"}'::jsonb)`, inviteCode); err != nil { t.Fatalf("unexpected compatible chat response: %+v", compatChat) } + cancelMarker := "cancel-stream-" + suffixText + cancelCtx, cancelRequest := context.WithCancel(context.Background()) + cancelPayload := map[string]any{ + "model": defaultTextModel, + "runMode": "simulation", + "messages": []map[string]any{{"role": "user", "content": "cancelled stream"}}, + "stream": true, + "simulation": true, + "simulationDurationMs": 250, + "cancelTestId": cancelMarker, + } + cancelRaw, err := json.Marshal(cancelPayload) + if err != nil { + t.Fatalf("marshal cancelled stream payload: %v", err) + } + cancelReq, err := http.NewRequestWithContext(cancelCtx, http.MethodPost, server.URL+"/v1/chat/completions", bytes.NewReader(cancelRaw)) + if err != nil { + t.Fatalf("build cancelled stream request: %v", err) + } + cancelReq.Header.Set("Authorization", "Bearer "+apiKeyResponse.Secret) + cancelReq.Header.Set("Content-Type", "application/json") + cancelErrCh := make(chan error, 1) + go func() { + resp, err := http.DefaultClient.Do(cancelReq) + if resp != nil { + _, _ = io.ReadAll(resp.Body) + _ = resp.Body.Close() + } + cancelErrCh <- err + }() + cancelTaskID := waitForTaskIDByRequestMarker(t, ctx, testPool, cancelMarker, 2*time.Second) + cancelRequest() + select { + case <-cancelErrCh: + case <-time.After(time.Second): + t.Fatal("cancelled stream request did not return after client cancellation") + } + cancelledStreamTask := waitForTaskStatus(t, server.URL, apiKeyResponse.Secret, cancelTaskID, []string{"succeeded"}, 2*time.Second) + if cancelledStreamTask.Status != "succeeded" { + t.Fatalf("client-cancelled compatible stream should keep backend task running to success, got %+v", cancelledStreamTask) + } + var imageResponse struct { Task struct { ID string `json:"id"` @@ -1398,6 +1440,26 @@ func waitForTaskStatus(t *testing.T, baseURL string, token string, taskID string return detail } +func waitForTaskIDByRequestMarker(t *testing.T, ctx context.Context, pool *pgxpool.Pool, marker string, timeout time.Duration) string { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + var taskID string + err := pool.QueryRow(ctx, ` +SELECT id::text +FROM gateway_tasks +WHERE request->>'cancelTestId' = $1 +ORDER BY created_at DESC +LIMIT 1`, marker).Scan(&taskID) + if err == nil && taskID != "" { + return taskID + } + time.Sleep(50 * time.Millisecond) + } + t.Fatalf("task with request marker %s was not created within %s", marker, timeout) + return "" +} + func assertLoadAvoidanceSimulatedRetryChain(t *testing.T, ctx context.Context, testPool *pgxpool.Pool, baseURL string, adminToken string, runtimeToken string, suffixText string) { t.Helper() model := "load-avoidance-smoke-" + suffixText diff --git a/apps/api/internal/httpapi/handlers.go b/apps/api/internal/httpapi/handlers.go index 5b23c71..5492f85 100644 --- a/apps/api/internal/httpapi/handlers.go +++ b/apps/api/internal/httpapi/handlers.go @@ -1,6 +1,7 @@ package httpapi import ( + "context" "encoding/json" "errors" "fmt" @@ -564,10 +565,15 @@ func (s *Server) createTask(kind string, compatible bool) http.Handler { writeTaskAccepted(w, task) return } + runCtx, cancelRun := s.requestExecutionContext(r) + defer cancelRun() if compatible { if boolValue(body, "stream") { flusher := prepareCompatibleStream(w) - result, runErr := s.runner.ExecuteStream(r.Context(), task, user, func(delta string) error { + result, runErr := s.runner.ExecuteStream(runCtx, task, user, func(delta string) error { + if !requestStillConnected(r) { + return nil + } writeCompatibleDelta(w, kind, model, delta) if flusher != nil { flusher.Flush() @@ -575,6 +581,9 @@ func (s *Server) createTask(kind string, compatible bool) http.Handler { return nil }) if runErr != nil { + if !requestStillConnected(r) { + return + } status := statusFromRunError(runErr) errorPayload := map[string]any{ "code": runErrorCode(runErr), @@ -593,29 +602,66 @@ func (s *Server) createTask(kind string, compatible bool) http.Handler { } return } + if !requestStillConnected(r) { + return + } writeCompatibleDone(w, kind, model, result.Output) if flusher != nil { flusher.Flush() } return } - result, runErr := s.runner.Execute(r.Context(), task, user) + result, runErr := s.runner.Execute(runCtx, task, user) if runErr != nil { + if !requestStillConnected(r) { + return + } writeError(w, statusFromRunError(runErr), runErr.Error(), runErrorCode(runErr)) return } + if !requestStillConnected(r) { + return + } writeJSON(w, http.StatusOK, result.Output) return } - result, runErr := s.runner.Execute(r.Context(), task, user) + result, runErr := s.runner.Execute(runCtx, task, user) if runErr != nil { s.logger.Warn("task completed with failure", "kind", kind, "taskId", task.ID, "error", runErr) } + if !requestStillConnected(r) { + return + } writeTaskAccepted(w, result.Task) }) } +func (s *Server) requestExecutionContext(r *http.Request) (context.Context, context.CancelFunc) { + base := context.WithoutCancel(r.Context()) + if s.ctx == nil { + return base, func() {} + } + ctx, cancel := context.WithCancel(base) + go func() { + select { + case <-s.ctx.Done(): + cancel() + case <-ctx.Done(): + } + }() + return ctx, cancel +} + +func requestStillConnected(r *http.Request) bool { + select { + case <-r.Context().Done(): + return false + default: + return true + } +} + func asyncRequest(r *http.Request) bool { value := strings.TrimSpace(strings.ToLower(r.Header.Get("x-async"))) return value == "1" || value == "true" || value == "yes" || value == "on" diff --git a/apps/api/internal/store/rate_limit_status.go b/apps/api/internal/store/rate_limit_status.go index 7ea8a40..633a058 100644 --- a/apps/api/internal/store/rate_limit_status.go +++ b/apps/api/internal/store/rate_limit_status.go @@ -24,6 +24,8 @@ type ModelRateLimitStatus struct { PlatformID string `json:"platformId"` PlatformName string `json:"platformName"` Provider string `json:"provider"` + PlatformStatus string `json:"platformStatus"` + PlatformDisabledReason *PlatformPolicyEvent `json:"platformDisabledReason,omitempty"` PlatformPriority int `json:"platformPriority"` PlatformDynamicPriority *int `json:"platformDynamicPriority,omitempty"` PlatformEffectivePriority int `json:"platformEffectivePriority"` @@ -62,9 +64,27 @@ type PriorityDemotionRecord struct { CreatedAt time.Time `json:"createdAt"` } +type PlatformPolicyEvent struct { + ID string `json:"id"` + TaskID string `json:"taskId"` + PlatformID string `json:"platformId"` + PlatformModelID string `json:"platformModelId,omitempty"` + EventType string `json:"eventType"` + Reason string `json:"reason,omitempty"` + ErrorCode string `json:"errorCode,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` + Category string `json:"category,omitempty"` + StatusCode int `json:"statusCode,omitempty"` + PolicySource string `json:"policySource,omitempty"` + Policy string `json:"policy,omitempty"` + PolicyRule string `json:"policyRule,omitempty"` + MatchedValue string `json:"matchedValue,omitempty"` + CreatedAt time.Time `json:"createdAt"` +} + func (s *Store) ListModelRateLimitStatuses(ctx context.Context) ([]ModelRateLimitStatus, error) { rows, err := s.pool.Query(ctx, ` - SELECT m.id::text, m.platform_id::text, p.name, p.provider, + SELECT m.id::text, m.platform_id::text, p.name, p.provider, p.status, p.priority, p.dynamic_priority, COALESCE(p.dynamic_priority, p.priority), m.model_name, COALESCE(NULLIF(m.provider_model_name, ''), m.model_name), COALESCE(m.model_alias, ''), m.model_type, m.display_name, m.enabled, @@ -161,6 +181,7 @@ ORDER BY p.priority ASC, m.model_name ASC`) &item.PlatformID, &item.PlatformName, &item.Provider, + &item.PlatformStatus, &item.PlatformPriority, &platformDynamicPriority, &item.PlatformEffectivePriority, @@ -212,8 +233,15 @@ ORDER BY p.priority ASC, m.model_name ASC`) if err != nil { return nil, err } + disabledReasons, err := s.listLatestPlatformDisabledReasons(ctx, items) + if err != nil { + return nil, err + } for index := range items { items[index].RecentPriorityDemotions = demotions[items[index].PlatformID] + if items[index].PlatformStatus != "enabled" { + items[index].PlatformDisabledReason = disabledReasons[items[index].PlatformID] + } } sort.SliceStable(items, func(i, j int) bool { if items[i].LoadRatio == items[j].LoadRatio { @@ -309,6 +337,101 @@ func priorityDemotionRecordFromEventPayload(id string, taskID string, message st } } +func (s *Store) listLatestPlatformDisabledReasons(ctx context.Context, statuses []ModelRateLimitStatus) (map[string]*PlatformPolicyEvent, error) { + out := map[string]*PlatformPolicyEvent{} + seen := map[string]bool{} + platformIDs := make([]string, 0, len(statuses)) + for _, status := range statuses { + platformID := strings.TrimSpace(status.PlatformID) + if platformID == "" || status.PlatformStatus == "enabled" || seen[platformID] { + continue + } + seen[platformID] = true + platformIDs = append(platformIDs, platformID) + } + if len(platformIDs) == 0 { + return out, nil + } + rows, err := s.pool.Query(ctx, ` +SELECT id::text, task_id::text, event_type, COALESCE(message, ''), payload, COALESCE(attempt_error_message, ''), created_at +FROM ( + SELECT e.*, + a.error_message AS attempt_error_message, + row_number() OVER ( + PARTITION BY e.payload->>'platformId' + ORDER BY e.created_at DESC, e.seq DESC + ) AS disabled_rank + FROM gateway_task_events e + LEFT JOIN LATERAL ( + SELECT error_message + FROM gateway_task_attempts attempt + WHERE attempt.task_id = e.task_id + AND attempt.platform_id::text = e.payload->>'platformId' + ORDER BY attempt.attempt_no DESC, attempt.started_at DESC + LIMIT 1 + ) a ON TRUE + WHERE e.event_type IN ('task.policy.failover_disabled', 'task.policy.auto_disabled') + AND e.payload->>'platformId' = ANY($1::text[]) +) ranked +WHERE disabled_rank = 1`, platformIDs) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var id string + var taskID string + var eventType string + var message string + var payloadBytes []byte + var attemptErrorMessage string + var createdAt time.Time + if err := rows.Scan(&id, &taskID, &eventType, &message, &payloadBytes, &attemptErrorMessage, &createdAt); err != nil { + return nil, err + } + record := platformPolicyEventFromPayload(id, taskID, eventType, message, attemptErrorMessage, decodeObject(payloadBytes), createdAt) + if record.PlatformID == "" { + continue + } + out[record.PlatformID] = &record + } + return out, rows.Err() +} + +func platformPolicyEventFromPayload(id string, taskID string, eventType string, message string, attemptErrorMessage string, payload map[string]any, createdAt time.Time) PlatformPolicyEvent { + errorMessage := stringValue(payload["errorMessage"]) + if errorMessage == "" { + errorMessage = stringValue(payload["message"]) + } + if errorMessage == "" { + errorMessage = strings.TrimSpace(attemptErrorMessage) + } + if errorMessage == "" { + errorMessage = strings.TrimSpace(message) + } + errorCode := stringValue(payload["errorCode"]) + if errorCode == "" { + errorCode = stringValue(payload["code"]) + } + return PlatformPolicyEvent{ + ID: id, + TaskID: taskID, + PlatformID: stringValue(payload["platformId"]), + PlatformModelID: stringValue(payload["platformModelId"]), + EventType: eventType, + Reason: stringValue(payload["reason"]), + ErrorCode: errorCode, + ErrorMessage: errorMessage, + Category: stringValue(payload["category"]), + StatusCode: intValue(payload["statusCode"]), + PolicySource: stringValue(payload["policySource"]), + Policy: stringValue(payload["policy"]), + PolicyRule: stringValue(payload["policyRule"]), + MatchedValue: stringValue(payload["matchedValue"]), + CreatedAt: createdAt, + } +} + func effectiveModelRateLimitPolicy(platformPolicy map[string]any, runtimePolicy map[string]any, runtimeOverride map[string]any, modelPolicy map[string]any) map[string]any { policy := platformPolicy if hasRateLimitRules(runtimePolicy) { diff --git a/apps/api/internal/store/rate_limit_status_test.go b/apps/api/internal/store/rate_limit_status_test.go index f57413a..aacda62 100644 --- a/apps/api/internal/store/rate_limit_status_test.go +++ b/apps/api/internal/store/rate_limit_status_test.go @@ -61,3 +61,26 @@ func TestPriorityDemotionRecordFromEventPayloadKeepsReason(t *testing.T) { t.Fatalf("expected createdAt %s, got %s", createdAt, record.CreatedAt) } } + +func TestPlatformPolicyEventFromPayloadUsesAttemptErrorMessage(t *testing.T) { + createdAt := time.Date(2026, 5, 12, 10, 30, 0, 0, time.UTC) + record := platformPolicyEventFromPayload("event-1", "task-1", "task.policy.failover_disabled", "fallback event message", "upstream invalid api key", map[string]any{ + "platformId": "platform-1", + "platformModelId": "platform-model-1", + "reason": "failover_allow_policy", + "errorCode": "auth_failed", + "category": "auth_error", + "statusCode": float64(401), + "policySource": "gateway_runner_policies.failover_policy", + "policy": "failoverPolicy", + "policyRule": "allowCategories", + "matchedValue": "auth_error", + }, createdAt) + + if record.EventType != "task.policy.failover_disabled" || record.Reason != "failover_allow_policy" { + t.Fatalf("expected disabled event identity to survive, got %+v", record) + } + if record.ErrorMessage != "upstream invalid api key" || record.StatusCode != 401 { + t.Fatalf("expected disabled reason details from attempt, got %+v", record) + } +} diff --git a/apps/web/src/pages/admin/RealtimeLoadPanel.tsx b/apps/web/src/pages/admin/RealtimeLoadPanel.tsx index 8dbc5e6..449a587 100644 --- a/apps/web/src/pages/admin/RealtimeLoadPanel.tsx +++ b/apps/web/src/pages/admin/RealtimeLoadPanel.tsx @@ -1,7 +1,7 @@ import { useEffect, useMemo, useState, type FormEvent } from 'react'; import { Popover as AntPopover } from 'antd'; import { CheckCircle2, Gauge, History, RotateCcw, SlidersHorizontal } from 'lucide-react'; -import type { IntegrationPlatform, ModelRateLimitStatus, PlatformDynamicPriorityUpdateRequest, PriorityDemotionRecord } from '@easyai-ai-gateway/contracts'; +import type { IntegrationPlatform, ModelRateLimitStatus, PlatformDynamicPriorityUpdateRequest, PlatformPolicyEvent, PriorityDemotionRecord } from '@easyai-ai-gateway/contracts'; import { Badge, Button, Card, CardContent, CardHeader, CardTitle, EmptyState, FormDialog, Input, Label, Table, TableCell, TableHead, TableRow } from '../../components/ui'; export function RealtimeLoadPanel(props: { @@ -124,6 +124,7 @@ function RateLimitStatusTable(props: { 模型 平台 + 状态 平台优先级 满载率 @@ -132,7 +133,6 @@ function RateLimitStatusTable(props: { TPM RPM - 状态 {props.statuses.map((status) => { const platform = props.platformMap.get(status.platformId); @@ -150,6 +150,7 @@ function RateLimitStatusTable(props: { {status.provider} + {modelRuntimeStatusCell(status, platform, props.now)} {platformPriorityCell(status, platform, props.onAdjustPriority)} 0.8 ? 'true' : undefined}> @@ -160,7 +161,6 @@ function RateLimitStatusTable(props: { {concurrencyMetricCell(status)} {metricCell(status.tpm, true)} {metricCell(status.rpm)} - {modelRuntimeStatusCell(status, props.now)} ); })} @@ -300,6 +300,32 @@ function PriorityDemotionPopover(props: { records: PriorityDemotionRecord[] }) { ); } +function PlatformDisabledReasonPopover(props: { record?: PlatformPolicyEvent }) { + const record = props.record; + if (!record) { + return ( + + 暂无禁用原因记录 + + ); + } + return ( + + + + 禁用原因 + + + + {platformPolicyEventReasonText(record)} + + {platformPolicyEventMetaText(record)} + {record.errorMessage && {record.errorMessage}} + + + ); +} + function platformPrioritySubtitle(status: ModelRateLimitStatus, platform: IntegrationPlatform | undefined, demotionCount: number) { const staticPriority = platformStaticPriority(status, platform); const dynamicPriority = platformDynamicPriority(status, platform); @@ -338,12 +364,45 @@ function priorityDemotionPolicyText(record: PriorityDemotionRecord) { function priorityDemotionReasonLabel(reason: string | undefined) { const labels: Record = { priority_demote_policy: '命中优先级降级规则', + failover_allow_policy: '命中跨平台切换规则', + auto_disable_policy: '命中自动禁用规则', + client_retryable: '客户端错误可重试', hard_stop_policy: '命中硬拒绝规则', runner_policy_disabled: '全局调度策略停用', }; return reason ? labels[reason] ?? reason : '优先级降级'; } +function platformPolicyEventReasonText(record: PlatformPolicyEvent) { + const category = record.category ? `错误分类 ${record.category}` : ''; + const code = record.errorCode ? `错误 ${record.errorCode}` : ''; + const statusCode = record.statusCode ? `状态码 ${record.statusCode}` : ''; + return [platformPolicyEventReasonLabel(record), statusCode, code, category].filter(Boolean).join(' · '); +} + +function platformPolicyEventMetaText(record: PlatformPolicyEvent) { + const policyPath = [record.policySource || record.policy, record.policyRule].filter(Boolean).join('.'); + const policy = policyPath ? (record.matchedValue ? `策略 ${policyPath}=${record.matchedValue}` : `策略 ${policyPath}`) : ''; + return [ + formatDateTime(record.createdAt), + shortId(record.taskId) ? `任务 ${shortId(record.taskId)}` : '', + policy, + ].filter(Boolean).join(' · ') || '-'; +} + +function platformPolicyEventReasonLabel(record: PlatformPolicyEvent) { + if (record.reason) { + return priorityDemotionReasonLabel(record.reason); + } + if (record.eventType === 'task.policy.auto_disabled') { + return '命中自动禁用规则'; + } + if (record.eventType === 'task.policy.failover_disabled') { + return '跨平台切换时禁用'; + } + return '平台已禁用'; +} + function metricCell(metric: ModelRateLimitStatus['rpm'], includeReserved = false) { if (!metric.limited) return {formatLimit(metric.currentValue)} / 不限{includeReserved ? reservedMetricText(metric) : '未配置上限'}; return ( @@ -388,9 +447,10 @@ function shortId(value: string | undefined) { return value.length > 8 ? value.slice(0, 8) : value; } -function modelRuntimeStatusCell(status: ModelRateLimitStatus, now: number) { +function modelRuntimeStatusCell(status: ModelRateLimitStatus, platform: IntegrationPlatform | undefined, now: number) { const modelCooldownMs = cooldownRemainingMs(status.modelCooldownUntil, now); const platformCooldownMs = cooldownRemainingMs(status.platformCooldownUntil, now); + const platformStatus = platform?.status || status.platformStatus || 'enabled'; if (modelCooldownMs > 0) { return ( @@ -399,6 +459,22 @@ function modelRuntimeStatusCell(status: ModelRateLimitStatus, now: number) { ); } + if (platformStatus !== 'enabled') { + const badge = 已禁用; + return ( + } + overlayClassName="priorityDemotionAntPopover" + placement="bottomLeft" + trigger={['hover', 'focus']} + > + + {badge} + + + ); + } if (platformCooldownMs > 0) { return ( diff --git a/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx b/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx index 9ba6d4b..3a98ed5 100644 --- a/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx +++ b/apps/web/src/pages/admin/RuntimePoliciesPanel.tsx @@ -58,6 +58,39 @@ type RunnerPolicyForm = { status: string; }; +const failoverActionDefinitions = [ + { + value: 'next', + title: '仅轮转', + description: '这些错误分类只尝试下一个平台,不修改当前平台状态。', + }, + { + value: 'disable_and_next', + title: '禁用后轮转', + description: '这些错误分类会先禁用当前平台,再尝试下一个平台。', + }, + { + value: 'cooldown_and_next', + title: '冷却后轮转', + description: '这些错误分类会先让当前平台模型进入冷却,再尝试下一个平台。', + }, +] as const; + +const failoverCategoryOptions = [ + 'network', + 'timeout', + 'stream_error', + 'rate_limit', + 'provider_5xx', + 'provider_overloaded', + 'auth_error', + 'request_error', + 'unsupported_model', + 'user_permission', + 'insufficient_balance', + 'client_error', +].map((item) => ({ label: item, value: item })); + export function RuntimePoliciesPanel(props: { message: string; runnerPolicy: GatewayRunnerPolicy | null; @@ -380,14 +413,22 @@ function RunnerPolicyEditor(props: { patch({ maxDurationSeconds: event.target.value })} /> 从任务开始执行计时,超过后不再继续重试,默认 600 秒。 - patch({ allowCategories: value })} /> - patch({ denyCategories: value })} /> - patch({ allowCodes: value })} /> - patch({ denyCodes: value })} /> - patch({ allowKeywords: value })} /> - patch({ denyKeywords: value })} /> - patch({ allowStatusCodes: value })} /> - patch({ denyStatusCodes: value })} /> + patch(value)} + /> +
+ 补充触发条件(可自定义扩展) + 这些条件只决定是否进入平台间切换;分类命中后的处理方式以上面的分组为准。 +
+ patch({ allowCodes: value })} /> + patch({ denyCodes: value })} /> + patch({ allowKeywords: value })} /> + patch({ denyKeywords: value })} /> + patch({ allowStatusCodes: value })} /> + patch({ denyStatusCodes: value })} /> )} @@ -448,6 +489,65 @@ function KeywordField(props: { label: string; value: string[]; onChange: (value: ); } +function FailoverCategoryRoutingEditor(props: { + actions: Record; + allowCategories: string[]; + denyCategories: string[]; + onChange: (value: Pick) => void; +}) { + const groups = failoverCategoryRoutingGroups(props.allowCategories, props.denyCategories, props.actions); + const options = categoryOptions(props.allowCategories, props.denyCategories, Object.keys(props.actions)); + const updateGroup = (group: string, value: string[]) => { + props.onChange(updateFailoverCategoryRouting(props.allowCategories, props.denyCategories, props.actions, group, value)); + }; + return ( +
+
+ 错误分类处理方式(一般保持默认无需修改) + 每个错误分类只放在一个分组;前三组会进入平台间切换,拒绝轮转不会尝试后续平台。 +
+
+ {failoverActionDefinitions.map((action) => ( + + ))} + +
+
+ ); +} + function runnerPolicyToForm(policy: GatewayRunnerPolicy | null): RunnerPolicyForm { const failover = readObject(policy?.failoverPolicy); const hardStop = readObject(policy?.hardStopPolicy); @@ -499,7 +599,7 @@ function runnerFormToPayload(form: RunnerPolicyForm): GatewayRunnerPolicyUpsertR denyKeywords: cleanTags(form.denyKeywords), allowStatusCodes: parseNumberTags(form.allowStatusCodes), denyStatusCodes: parseNumberTags(form.denyStatusCodes), - actions: Object.keys(form.failoverActions).length > 0 ? form.failoverActions : defaultFailoverActions(), + actions: normalizedFailoverActions(form), }, hardStopPolicy: { enabled: form.hardStopEnabled, @@ -552,6 +652,97 @@ function defaultFailoverActions(): Record { }; } +function failoverCategoryRoutingGroups(allowCategories: string[], denyCategories: string[], actions: Record) { + const actionSet = new Set(failoverActionDefinitions.map((item) => item.value)); + const assignments = new Map(); + for (const category of cleanTags(allowCategories)) { + const rawAction = actions[category]; + const action = typeof rawAction === 'string' ? rawAction : ''; + assignments.set(category, actionSet.has(action as (typeof failoverActionDefinitions)[number]['value']) ? action : 'next'); + } + for (const [category, rawAction] of Object.entries(actions)) { + if (assignments.has(category)) continue; + const action = typeof rawAction === 'string' ? rawAction : ''; + if (action === 'stop') { + assignments.set(category, 'deny'); + } else if (actionSet.has(action as (typeof failoverActionDefinitions)[number]['value'])) { + assignments.set(category, action); + } + } + for (const category of cleanTags(denyCategories)) { + assignments.set(category, 'deny'); + } + const groups: Record = {}; + for (const [category, group] of assignments.entries()) { + groups[group] = [...(groups[group] ?? []), category]; + } + for (const key of [...failoverActionDefinitions.map((item) => item.value), 'deny']) { + groups[key] = cleanTags(groups[key] ?? []); + } + return groups; +} + +function updateFailoverCategoryRouting( + allowCategories: string[], + denyCategories: string[], + actions: Record, + group: string, + value: string[], +): Pick { + const groups = failoverCategoryRoutingGroups(allowCategories, denyCategories, actions); + groups[group] = cleanTags(value); + const nextActions: Record = {}; + const knownCategories = new Set(); + const nextAllowCategories: string[] = []; + for (const action of failoverActionDefinitions) { + for (const category of cleanTags(groups[action.value] ?? [])) { + knownCategories.add(category); + nextAllowCategories.push(category); + if (action.value !== 'next') { + nextActions[category] = action.value; + } + } + } + const nextDenyCategories = cleanTags(groups.deny ?? []); + for (const category of nextDenyCategories) { + knownCategories.add(category); + } + for (const [category, rawAction] of Object.entries(actions)) { + if (!knownCategories.has(category) && typeof rawAction === 'string' && rawAction !== 'next' && rawAction !== 'stop') { + nextActions[category] = rawAction; + } + } + return { + allowCategories: cleanTags(nextAllowCategories), + denyCategories: nextDenyCategories, + failoverActions: nextActions, + }; +} + +function normalizedFailoverActions(form: RunnerPolicyForm) { + const groups = failoverCategoryRoutingGroups(form.allowCategories, form.denyCategories, form.failoverActions); + const nextActions: Record = {}; + for (const action of failoverActionDefinitions) { + if (action.value === 'next') continue; + for (const category of cleanTags(groups[action.value] ?? [])) { + nextActions[category] = action.value; + } + } + return nextActions; +} + +function categoryOptions(...values: string[][]) { + const knownValues = new Set(failoverCategoryOptions.map((option) => option.value)); + const options = [...failoverCategoryOptions]; + for (const value of values.flat()) { + const tag = String(value).trim(); + if (!tag || knownValues.has(tag)) continue; + knownValues.add(tag); + options.push({ label: tag, value: tag }); + } + return options; +} + function policyToForm(policy: RuntimePolicySet): RuntimePolicyForm { const rateRules = Array.isArray(policy.rateLimitPolicy?.rules) ? policy.rateLimitPolicy.rules : []; const retry = readObject(policy.retryPolicy); diff --git a/apps/web/src/styles/pages.css b/apps/web/src/styles/pages.css index ea68b4b..105bcaf 100644 --- a/apps/web/src/styles/pages.css +++ b/apps/web/src/styles/pages.css @@ -1881,6 +1881,71 @@ padding: 8px 10px; } +.runnerActionMatrix { + display: grid; + gap: 10px; +} + +.runnerActionIntro { + display: flex; + align-items: baseline; + justify-content: space-between; + gap: 12px; +} + +.runnerActionIntro strong { + color: var(--text-normal); +} + +.runnerActionIntro small, +.runnerActionGroup small { + color: var(--muted-foreground); + font-size: var(--font-size-xs); + font-weight: var(--font-weight-regular); + line-height: 1.4; +} + +.runnerActionGrid { + display: grid; + grid-template-columns: repeat(2, minmax(0, 1fr)); + gap: 10px; +} + +.runnerActionGroup { + display: grid; + gap: 8px; + padding: 10px; + border: 1px solid var(--border); + border-radius: 8px; + background: #fff; +} + +.runnerActionGroup > span { + display: grid; + gap: 2px; +} + +.runnerActionGroup strong { + color: var(--text-normal); + font-size: var(--font-size-sm); +} + +.runnerSupplementalRules { + display: grid; + gap: 2px; + padding-top: 6px; +} + +.runnerSupplementalRules strong { + color: var(--text-normal); +} + +.runnerSupplementalRules small { + color: var(--muted-foreground); + font-size: var(--font-size-xs); + line-height: 1.4; +} + .runtimePolicyActions { display: flex; justify-content: flex-end; @@ -1933,6 +1998,7 @@ .runtimePolicyGrid, .runtimePolicyFormBody, .runtimePolicyRows, + .runnerActionGrid, .accessPermissionGrid, .accessTreeToolbar, .apiKeyCreateDialogBody, @@ -1940,6 +2006,11 @@ grid-template-columns: 1fr; } + .runnerActionIntro { + align-items: flex-start; + flex-direction: column; + } + .platformModelChoice { grid-template-columns: 1fr auto; } diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts index 166f903..e9eace9 100644 --- a/packages/contracts/src/index.ts +++ b/packages/contracts/src/index.ts @@ -780,6 +780,24 @@ export interface PriorityDemotionRecord { createdAt: string; } +export interface PlatformPolicyEvent { + id: string; + taskId: string; + platformId: string; + platformModelId?: string; + eventType: string; + reason?: string; + errorCode?: string; + errorMessage?: string; + category?: string; + statusCode?: number; + policySource?: string; + policy?: string; + policyRule?: string; + matchedValue?: string; + createdAt: string; +} + export interface PlatformDynamicPriorityUpdateRequest { dynamicPriority?: number; reset?: boolean; @@ -798,6 +816,8 @@ export interface ModelRateLimitStatus { platformId: string; platformName: string; provider: string; + platformStatus?: string; + platformDisabledReason?: PlatformPolicyEvent; platformPriority: number; platformDynamicPriority?: number; platformEffectivePriority: number;