From ba850a06c6a37aae814c39224313835cfcf6a5f8 Mon Sep 17 00:00:00 2001 From: wangbo Date: Tue, 12 May 2026 03:22:29 +0800 Subject: [PATCH] feat: improve model rate limit tracking --- apps/api/cmd/gateway/main.go | 11 + .../httpapi/core_flow_integration_test.go | 42 ++- apps/api/internal/httpapi/handlers.go | 27 +- apps/api/internal/httpapi/server.go | 1 + apps/api/internal/runner/limits.go | 19 + apps/api/internal/runner/limits_test.go | 29 ++ apps/api/internal/runner/runtime_policy.go | 8 +- apps/api/internal/runner/service.go | 14 +- apps/api/internal/store/candidates.go | 93 +++++ apps/api/internal/store/platform_models.go | 2 + apps/api/internal/store/postgres.go | 19 +- apps/api/internal/store/rate_limit_status.go | 249 +++++++++++++ .../internal/store/rate_limit_status_test.go | 32 ++ apps/api/internal/store/rate_limits.go | 342 +++++++++++++++--- apps/api/internal/store/runtime_policies.go | 15 + apps/api/internal/store/runtime_types.go | 27 +- .../0029_rate_limit_reservations.sql | 23 ++ .../0030_platform_model_cooldown.sql | 5 + apps/web/src/App.tsx | 42 ++- apps/web/src/api.ts | 5 + apps/web/src/app-state.ts | 2 + apps/web/src/pages/AdminPage.tsx | 5 +- .../pages/admin/PlatformManagementPanel.tsx | 177 +++++++-- apps/web/src/styles/pages.css | 98 +++++ packages/contracts/src/index.ts | 32 ++ 25 files changed, 1223 insertions(+), 96 deletions(-) create mode 100644 apps/api/internal/runner/limits_test.go create mode 100644 apps/api/internal/store/rate_limit_status.go create mode 100644 apps/api/internal/store/rate_limit_status_test.go create mode 100644 apps/api/migrations/0029_rate_limit_reservations.sql create mode 100644 apps/api/migrations/0030_platform_model_cooldown.sql diff --git a/apps/api/cmd/gateway/main.go b/apps/api/cmd/gateway/main.go index 255e296..e3f329a 100644 --- a/apps/api/cmd/gateway/main.go +++ b/apps/api/cmd/gateway/main.go @@ -30,6 +30,17 @@ func main() { os.Exit(1) } defer db.Close() + if recovery, err := db.RecoverInterruptedRuntimeState(ctx); err != nil { + logger.Error("recover interrupted runtime state failed", "error", err) + os.Exit(1) + } else if recovery.ReleasedConcurrencyLeases > 0 || recovery.ReleasedRateReservations > 0 || recovery.FailedAttempts > 0 || recovery.FailedTasks > 0 { + logger.Warn("interrupted runtime state recovered", + "releasedConcurrencyLeases", recovery.ReleasedConcurrencyLeases, + "releasedRateReservations", recovery.ReleasedRateReservations, + "failedAttempts", recovery.FailedAttempts, + "failedTasks", recovery.FailedTasks, + ) + } server := &http.Server{ Addr: cfg.HTTPAddr, diff --git a/apps/api/internal/httpapi/core_flow_integration_test.go b/apps/api/internal/httpapi/core_flow_integration_test.go index 4ea62b6..3257ee3 100644 --- a/apps/api/internal/httpapi/core_flow_integration_test.go +++ b/apps/api/internal/httpapi/core_flow_integration_test.go @@ -665,6 +665,23 @@ WHERE reference_type = 'gateway_task' "runtimePolicySetId": rateLimitPolicySet.ID, "runtimePolicyOverride": map[string]any{}, }, http.StatusCreated, &rateLimitPlatformModel) + var rateLimitFailedTask struct { + Task struct { + Status string `json:"status"` + ErrorCode string `json:"errorCode"` + } `json:"task"` + } + doJSON(t, server.URL, http.MethodPost, "/api/v1/chat/completions", apiKeyResponse.Secret, map[string]any{ + "model": rateLimitedModel, + "runMode": "simulation", + "simulation": true, + "simulationDurationMs": 5, + "simulationProfile": "non_retryable_failure", + "messages": []map[string]any{{"role": "user", "content": "failed first"}}, + }, http.StatusAccepted, &rateLimitFailedTask) + if rateLimitFailedTask.Task.Status != "failed" || rateLimitFailedTask.Task.ErrorCode != "bad_request" { + t.Fatalf("failed rate-limited task should fail before consuming rpm: %+v", rateLimitFailedTask.Task) + } var rateLimitTaskOne struct { Task struct { Status string `json:"status"` @@ -863,7 +880,9 @@ WHERE reference_type = 'gateway_task' {platformID: degradedPlatform.ID, runtimePolicySetID: degradePolicySet.ID}, {platformID: degradeSuccessPlatform.ID}, } { - var platformModel map[string]any + var platformModel struct { + ID string `json:"id"` + } doJSON(t, server.URL, http.MethodPost, "/api/admin/platforms/"+item.platformID+"/models", loginResponse.AccessToken, map[string]any{ "canonicalModelKey": "openai:gpt-4o-mini", "modelName": degradeModel, @@ -887,14 +906,23 @@ WHERE reference_type = 'gateway_task' "messages": []map[string]any{{"role": "user", "content": "degrade please"}}, }, http.StatusAccepted, °radeTask) if degradeTask.Task.Status != "succeeded" { - t.Fatalf("degrade task should fail over after cooling down failed platform: %+v", degradeTask.Task) + t.Fatalf("degrade task should fail over after cooling down failed model: %+v", degradeTask.Task) } - var cooledDown bool - if err := testPool.QueryRow(ctx, `SELECT COALESCE(cooldown_until > now(), false) FROM integration_platforms WHERE id = $1::uuid`, degradedPlatform.ID).Scan(&cooledDown); err != nil { - t.Fatalf("read degraded platform cooldown: %v", err) + var cooledDownModel bool + var cooledDownPlatform bool + if err := testPool.QueryRow(ctx, ` +SELECT COALESCE(m.cooldown_until > now(), false), COALESCE(p.cooldown_until > now(), false) +FROM platform_models m +JOIN integration_platforms p ON p.id = m.platform_id +WHERE m.platform_id = $1::uuid + AND m.model_name = $2`, degradedPlatform.ID, degradeModel).Scan(&cooledDownModel, &cooledDownPlatform); err != nil { + t.Fatalf("read degraded model cooldown: %v", err) } - if !cooledDown { - t.Fatal("degrade policy should set platform cooldown_until") + if !cooledDownModel { + t.Fatal("degrade policy should set platform model cooldown_until") + } + if cooledDownPlatform { + t.Fatal("degrade policy should not cool down the entire platform") } var autoDisablePolicySet struct { diff --git a/apps/api/internal/httpapi/handlers.go b/apps/api/internal/httpapi/handlers.go index 2bad30b..0ab9d15 100644 --- a/apps/api/internal/httpapi/handlers.go +++ b/apps/api/internal/httpapi/handlers.go @@ -490,7 +490,7 @@ func (s *Server) estimatePricing(w http.ResponseWriter, r *http.Request) { estimate, err := s.runner.Estimate(r.Context(), kind, model, body, user) if err != nil { if errors.Is(err, store.ErrNoModelCandidate) { - writeError(w, http.StatusNotFound, "no enabled platform model matches request") + writeError(w, statusFromRunError(err), err.Error(), store.ModelCandidateErrorCode(err)) return } s.logger.Error("estimate pricing failed", "error", err) @@ -510,6 +510,16 @@ func (s *Server) listRateLimitWindows(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]any{"items": items}) } +func (s *Server) listModelRateLimitStatuses(w http.ResponseWriter, r *http.Request) { + items, err := s.store.ListModelRateLimitStatuses(r.Context()) + if err != nil { + s.logger.Error("list model rate limit statuses failed", "error", err) + writeError(w, http.StatusInternalServerError, "list model rate limit statuses failed") + return + } + writeJSON(w, http.StatusOK, map[string]any{"items": items}) +} + func (s *Server) createTask(kind string, compatible bool) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { user, ok := auth.UserFromContext(r.Context()) @@ -557,7 +567,7 @@ func (s *Server) createTask(kind string, compatible bool) http.Handler { if runErr != nil { status := statusFromRunError(runErr) errorPayload := map[string]any{ - "code": clients.ErrorCode(runErr), + "code": runErrorCode(runErr), "message": runErr.Error(), "status": status, } @@ -581,7 +591,7 @@ func (s *Server) createTask(kind string, compatible bool) http.Handler { } result, runErr := s.runner.Execute(r.Context(), task, user) if runErr != nil { - writeError(w, statusFromRunError(runErr), runErr.Error(), clients.ErrorCode(runErr)) + writeError(w, statusFromRunError(runErr), runErr.Error(), runErrorCode(runErr)) return } writeJSON(w, http.StatusOK, result.Output) @@ -634,10 +644,14 @@ func scopeForTaskKind(kind string) string { func statusFromRunError(err error) int { switch { + case store.ModelCandidateErrorCode(err) == "platform_cooling_down" || store.ModelCandidateErrorCode(err) == "model_cooling_down": + return http.StatusTooManyRequests case errors.Is(err, store.ErrNoModelCandidate): return http.StatusNotFound case errors.Is(err, store.ErrRateLimited): return http.StatusTooManyRequests + case clients.ErrorCode(err) == "rate_limit": + return http.StatusTooManyRequests case errors.Is(err, store.ErrInsufficientWalletBalance): return http.StatusPaymentRequired default: @@ -645,6 +659,13 @@ func statusFromRunError(err error) int { } } +func runErrorCode(err error) string { + if errors.Is(err, store.ErrNoModelCandidate) { + return store.ModelCandidateErrorCode(err) + } + return clients.ErrorCode(err) +} + func (s *Server) listTasks(w http.ResponseWriter, r *http.Request) { user, ok := auth.UserFromContext(r.Context()) if !ok { diff --git a/apps/api/internal/httpapi/server.go b/apps/api/internal/httpapi/server.go index 15156e6..8b2158a 100644 --- a/apps/api/internal/httpapi/server.go +++ b/apps/api/internal/httpapi/server.go @@ -107,6 +107,7 @@ func NewServer(cfg config.Config, db *store.Store, logger *slog.Logger) http.Han mux.Handle("GET /api/v1/models", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.listPlayableModels))) mux.Handle("GET /api/v1/playground/models", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.listPlayableModels))) mux.Handle("GET /api/admin/runtime/rate-limit-windows", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listRateLimitWindows))) + mux.Handle("GET /api/admin/runtime/model-rate-limits", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listModelRateLimitStatuses))) mux.Handle("POST /api/v1/chat/completions", server.auth.Require(auth.PermissionBasic, server.createTask("chat.completions", false))) mux.Handle("POST /api/v1/responses", server.auth.Require(auth.PermissionBasic, server.createTask("responses", false))) mux.Handle("POST /api/v1/images/generations", server.auth.Require(auth.PermissionBasic, server.createTask("images.generations", false))) diff --git a/apps/api/internal/runner/limits.go b/apps/api/internal/runner/limits.go index c83d366..1cc0744 100644 --- a/apps/api/internal/runner/limits.go +++ b/apps/api/internal/runner/limits.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/easyai/easyai-ai-gateway/apps/api/internal/auth" + "github.com/easyai/easyai-ai-gateway/apps/api/internal/clients" "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" ) @@ -108,3 +109,21 @@ func estimateRequestTokens(body map[string]any) int { } return len([]rune(text))/4 + 1 } + +func tokenUsageAmounts(usage clients.Usage) map[string]float64 { + out := map[string]float64{} + if usage.InputTokens > 0 { + out["tpm_input"] = float64(usage.InputTokens) + } + if usage.OutputTokens > 0 { + out["tpm_output"] = float64(usage.OutputTokens) + } + total := usage.TotalTokens + if total <= 0 { + total = usage.InputTokens + usage.OutputTokens + } + if total > 0 { + out["tpm_total"] = float64(total) + } + return out +} diff --git a/apps/api/internal/runner/limits_test.go b/apps/api/internal/runner/limits_test.go new file mode 100644 index 0000000..3c1cbb1 --- /dev/null +++ b/apps/api/internal/runner/limits_test.go @@ -0,0 +1,29 @@ +package runner + +import ( + "testing" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/clients" +) + +func TestTokenUsageAmountsUsesActualUsageForTPM(t *testing.T) { + got := tokenUsageAmounts(clients.Usage{InputTokens: 12, OutputTokens: 8, TotalTokens: 21}) + + if got["tpm_input"] != 12 { + t.Fatalf("expected input token amount 12, got %v", got["tpm_input"]) + } + if got["tpm_output"] != 8 { + t.Fatalf("expected output token amount 8, got %v", got["tpm_output"]) + } + if got["tpm_total"] != 21 { + t.Fatalf("expected total token amount 21, got %v", got["tpm_total"]) + } +} + +func TestTokenUsageAmountsFallsBackToInputOutputTotal(t *testing.T) { + got := tokenUsageAmounts(clients.Usage{InputTokens: 3, OutputTokens: 5}) + + if got["tpm_total"] != 8 { + t.Fatalf("expected total token fallback 8, got %v", got["tpm_total"]) + } +} diff --git a/apps/api/internal/runner/runtime_policy.go b/apps/api/internal/runner/runtime_policy.go index 42417c6..0e0dc71 100644 --- a/apps/api/internal/runner/runtime_policy.go +++ b/apps/api/internal/runner/runtime_policy.go @@ -29,8 +29,8 @@ func (s *Service) applyCandidateFailurePolicies(ctx context.Context, taskID stri degradePolicy := effectiveRuntimePolicy(candidate.DegradePolicy, candidate.RuntimePolicyOverride, "degradePolicy") if failurePolicyMatches(degradePolicy, code, message) { cooldownSeconds := intFromPolicy(degradePolicy, "cooldownSeconds") - if err := s.store.CooldownCandidatePlatform(ctx, candidate.PlatformID, cooldownSeconds); err == nil { - _ = s.emit(ctx, taskID, "task.policy.degraded", "running", "degrade", 0.5, "candidate platform cooled down by failure policy", map[string]any{ + if err := s.store.CooldownCandidatePlatformModel(ctx, candidate.PlatformModelID, cooldownSeconds); err == nil { + _ = s.emit(ctx, taskID, "task.policy.degraded", "running", "degrade", 0.5, "candidate model cooled down by failure policy", map[string]any{ "platformId": candidate.PlatformID, "platformModelId": candidate.PlatformModelID, "cooldownSeconds": cooldownSeconds, @@ -52,8 +52,8 @@ func (s *Service) applyFailoverAction(ctx context.Context, taskID string, candid }, decision.Match, decision.Info), simulated) } case "cooldown_and_next": - if err := s.store.CooldownCandidatePlatform(ctx, candidate.PlatformID, decision.CooldownSeconds); err == nil { - _ = s.emit(ctx, taskID, "task.policy.failover_cooled_down", "running", "failover", 0.51, "candidate platform cooled down by runner failover policy", addPolicyTracePayload(map[string]any{ + if err := s.store.CooldownCandidatePlatformModel(ctx, candidate.PlatformModelID, decision.CooldownSeconds); err == nil { + _ = s.emit(ctx, taskID, "task.policy.failover_cooled_down", "running", "failover", 0.51, "candidate model cooled down by runner failover policy", addPolicyTracePayload(map[string]any{ "platformId": candidate.PlatformID, "platformModelId": candidate.PlatformModelID, "cooldownSeconds": decision.CooldownSeconds, diff --git a/apps/api/internal/runner/service.go b/apps/api/internal/runner/service.go index f0fb66e..b468e72 100644 --- a/apps/api/internal/runner/service.go +++ b/apps/api/internal/runner/service.go @@ -64,7 +64,7 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut } candidates, err := s.store.ListModelCandidates(ctx, task.Model, modelType, user) if err != nil { - failed, finishErr := s.failTask(ctx, task.ID, "no_model_candidate", err.Error(), task.RunMode == "simulation", err) + failed, finishErr := s.failTask(ctx, task.ID, store.ModelCandidateErrorCode(err), err.Error(), task.RunMode == "simulation", err) if finishErr != nil { return Result{}, finishErr } @@ -279,7 +279,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user return clients.Response{}, err } reservations := s.rateLimitReservations(ctx, user, candidate, body) - limitResult, err := s.store.ReserveRateLimits(ctx, task.ID, reservations) + limitResult, err := s.store.ReserveRateLimits(ctx, task.ID, attemptID, reservations) if err != nil { clientErr := &clients.ClientError{Code: "rate_limit", Message: err.Error(), Retryable: false} _ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{ @@ -292,6 +292,12 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user }) return clients.Response{}, clientErr } + rateReservationsFinalized := false + defer func() { + if !rateReservationsFinalized { + _ = s.store.ReleaseRateLimitReservations(context.WithoutCancel(ctx), limitResult.Reservations, "attempt_failed") + } + }() defer s.store.ReleaseConcurrencyLeases(context.WithoutCancel(ctx), limitResult.LeaseIDs) if err := s.store.RecordClientAssignment(ctx, candidate); err != nil { @@ -397,6 +403,10 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user return clients.Response{}, err } } + if err := s.store.CommitRateLimitReservations(ctx, limitResult.Reservations, tokenUsageAmounts(response.Usage)); err != nil { + return clients.Response{}, err + } + rateReservationsFinalized = true if err := s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{ AttemptID: attemptID, Status: "succeeded", diff --git a/apps/api/internal/store/candidates.go b/apps/api/internal/store/candidates.go index 2c04b3f..ff56b12 100644 --- a/apps/api/internal/store/candidates.go +++ b/apps/api/internal/store/candidates.go @@ -3,6 +3,7 @@ package store import ( "context" "fmt" + "strings" "github.com/easyai/easyai-ai-gateway/apps/api/internal/auth" ) @@ -38,6 +39,7 @@ WHERE p.status = 'enabled' AND m.enabled = true AND m.model_type @> jsonb_build_array($2) AND (p.cooldown_until IS NULL OR p.cooldown_until <= now()) + AND (m.cooldown_until IS NULL OR m.cooldown_until <= now()) AND ( (COALESCE(m.model_alias, '') <> '' AND m.model_alias = $1) OR ( @@ -151,6 +153,11 @@ ORDER BY effective_priority ASC, return nil, err } if len(items) == 0 { + if unavailableErr, err := s.modelCandidateCooldownError(ctx, model, modelType); err != nil { + return nil, err + } else if unavailableErr != nil { + return nil, unavailableErr + } return nil, ErrNoModelCandidate } items, err = s.filterCandidatesByAccessRules(ctx, user, items) @@ -162,3 +169,89 @@ ORDER BY effective_priority ASC, } return items, nil } + +func (s *Store) modelCandidateCooldownError(ctx context.Context, model string, modelType string) (error, error) { + rows, err := s.pool.Query(ctx, ` +SELECT p.name, + COALESCE(NULLIF(m.display_name, ''), NULLIF(m.model_alias, ''), m.model_name), + COALESCE(to_char(p.cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), + GREATEST(COALESCE(EXTRACT(EPOCH FROM p.cooldown_until - now()), 0), 0)::float8, + COALESCE(to_char(m.cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), + GREATEST(COALESCE(EXTRACT(EPOCH FROM m.cooldown_until - now()), 0), 0)::float8 +FROM platform_models m +JOIN integration_platforms p ON p.id = m.platform_id +LEFT JOIN base_model_catalog b ON b.id = m.base_model_id +WHERE p.status = 'enabled' + AND p.deleted_at IS NULL + AND m.enabled = true + AND m.model_type @> jsonb_build_array($2) + AND ( + (COALESCE(m.model_alias, '') <> '' AND m.model_alias = $1) + OR ( + COALESCE(m.model_alias, '') = '' + AND ( + m.model_name = $1 + OR b.canonical_model_key = $1 + OR b.provider_model_name = $1 + ) + ) + ) +ORDER BY GREATEST(COALESCE(p.cooldown_until, to_timestamp(0)), COALESCE(m.cooldown_until, to_timestamp(0))) DESC, + p.priority ASC, + m.created_at ASC`, model, modelType) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var platformName string + var displayName string + var platformCooldownUntil string + var platformRemainingSeconds float64 + var modelCooldownUntil string + var modelRemainingSeconds float64 + if err := rows.Scan( + &platformName, + &displayName, + &platformCooldownUntil, + &platformRemainingSeconds, + &modelCooldownUntil, + &modelRemainingSeconds, + ); err != nil { + return nil, err + } + if modelRemainingSeconds > 0 { + return &ModelCandidateUnavailableError{ + Code: "model_cooling_down", + Message: cooldownErrorMessage("模型", displayName, modelRemainingSeconds, modelCooldownUntil), + }, nil + } + if platformRemainingSeconds > 0 { + return &ModelCandidateUnavailableError{ + Code: "platform_cooling_down", + Message: cooldownErrorMessage("平台", platformName, platformRemainingSeconds, platformCooldownUntil), + }, nil + } + } + if err := rows.Err(); err != nil { + return nil, err + } + return nil, nil +} + +func cooldownErrorMessage(scope string, name string, remainingSeconds float64, cooldownUntil string) string { + name = strings.TrimSpace(name) + if name == "" { + name = "候选" + } + remainingMinutes := remainingSeconds / 60 + if remainingMinutes < 0.1 { + remainingMinutes = 0.1 + } + message := fmt.Sprintf("%s %s 冷却中,剩余 %.1f 分钟", scope, name, remainingMinutes) + if strings.TrimSpace(cooldownUntil) != "" { + message += ",预计恢复时间 " + cooldownUntil + } + return message +} diff --git a/apps/api/internal/store/platform_models.go b/apps/api/internal/store/platform_models.go index b4d92a4..b324bc5 100644 --- a/apps/api/internal/store/platform_models.go +++ b/apps/api/internal/store/platform_models.go @@ -193,6 +193,7 @@ RETURNING id::text, platform_id::text, COALESCE(base_model_id::text, ''), model_ capabilities, pricing_mode, COALESCE(discount_factor, 0)::float8, COALESCE(pricing_rule_set_id::text, ''), billing_config_override, billing_config, permission_config, retry_policy, rate_limit_policy, COALESCE(runtime_policy_set_id::text, ''), runtime_policy_override, + COALESCE(to_char(cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), enabled, created_at, updated_at`, input.PlatformID, baseID, @@ -234,6 +235,7 @@ RETURNING id::text, platform_id::text, COALESCE(base_model_id::text, ''), model_ &rateLimitPolicyBytes, &model.RuntimePolicySetID, &runtimePolicyOverrideBytes, + &model.CooldownUntil, &model.Enabled, &model.CreatedAt, &model.UpdatedAt, diff --git a/apps/api/internal/store/postgres.go b/apps/api/internal/store/postgres.go index 5f316c1..c308a10 100644 --- a/apps/api/internal/store/postgres.go +++ b/apps/api/internal/store/postgres.go @@ -70,6 +70,7 @@ type Platform struct { CredentialsPreview map[string]any `json:"credentialsPreview,omitempty"` RetryPolicy map[string]any `json:"retryPolicy,omitempty"` RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"` + CooldownUntil string `json:"cooldownUntil,omitempty"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` } @@ -151,6 +152,7 @@ type PlatformModel struct { RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"` RuntimePolicySetID string `json:"runtimePolicySetId,omitempty"` RuntimePolicyOverride map[string]any `json:"runtimePolicyOverride,omitempty"` + CooldownUntil string `json:"cooldownUntil,omitempty"` Enabled bool `json:"enabled"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` @@ -478,7 +480,9 @@ func (s *Store) ListPlatforms(ctx context.Context) ([]Platform, error) { rows, err := s.pool.Query(ctx, ` SELECT id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority, default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''), - config, credentials, retry_policy, rate_limit_policy, created_at, updated_at + config, credentials, retry_policy, rate_limit_policy, + COALESCE(to_char(cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), + created_at, updated_at FROM integration_platforms ORDER BY priority ASC, created_at DESC`) if err != nil { @@ -510,6 +514,7 @@ ORDER BY priority ASC, created_at DESC`) &credentialsBytes, &retryPolicyBytes, &rateLimitPolicyBytes, + &platform.CooldownUntil, &platform.CreatedAt, &platform.UpdatedAt, ); err != nil { @@ -555,7 +560,9 @@ VALUES ( ) RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority, default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''), - config, credentials, retry_policy, rate_limit_policy, created_at, updated_at`, + config, credentials, retry_policy, rate_limit_policy, + COALESCE(to_char(cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), + created_at, updated_at`, input.Provider, input.PlatformKey, input.Name, strings.TrimSpace(input.InternalName), input.BaseURL, input.AuthType, credentials, config, input.DefaultPricingMode, input.DefaultDiscountFactor, input.PricingRuleSetID, input.Priority, string(retryPolicy), string(rateLimitPolicy), @@ -576,6 +583,7 @@ RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), C &credentialsResultBytes, &retryPolicyBytes, &rateLimitPolicyBytes, + &platform.CooldownUntil, &platform.CreatedAt, &platform.UpdatedAt, ) @@ -636,7 +644,9 @@ SET provider = $2, WHERE id = $1::uuid RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority, default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''), - config, credentials, retry_policy, rate_limit_policy, created_at, updated_at`, + config, credentials, retry_policy, rate_limit_policy, + COALESCE(to_char(cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), + created_at, updated_at`, id, input.Provider, input.PlatformKey, @@ -669,6 +679,7 @@ RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), C &credentialsResultBytes, &retryPolicyBytes, &rateLimitPolicyBytes, + &platform.CooldownUntil, &platform.CreatedAt, &platform.UpdatedAt, ) @@ -746,6 +757,7 @@ SELECT m.id::text, m.platform_id::text, COALESCE(m.base_model_id::text, ''), p.p m.capability_override, m.capabilities, m.pricing_mode, COALESCE(m.discount_factor, 0)::float8, COALESCE(m.pricing_rule_set_id::text, ''), m.billing_config_override, m.billing_config, m.permission_config, m.retry_policy, m.rate_limit_policy, COALESCE(m.runtime_policy_set_id::text, ''), m.runtime_policy_override, + COALESCE(to_char(m.cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), m.enabled, m.created_at, m.updated_at FROM platform_models m JOIN integration_platforms p ON p.id = m.platform_id @@ -791,6 +803,7 @@ ORDER BY m.model_type ASC, m.model_name ASC`, args...) &rateLimitPolicy, &model.RuntimePolicySetID, &runtimePolicyOverride, + &model.CooldownUntil, &model.Enabled, &model.CreatedAt, &model.UpdatedAt, diff --git a/apps/api/internal/store/rate_limit_status.go b/apps/api/internal/store/rate_limit_status.go new file mode 100644 index 0000000..ca177df --- /dev/null +++ b/apps/api/internal/store/rate_limit_status.go @@ -0,0 +1,249 @@ +package store + +import ( + "context" + "sort" + "strings" +) + +type RateLimitMetricStatus struct { + CurrentValue float64 `json:"currentValue"` + UsedValue float64 `json:"usedValue"` + ReservedValue float64 `json:"reservedValue"` + LimitValue float64 `json:"limitValue"` + Limited bool `json:"limited"` + Ratio float64 `json:"ratio"` + ResetAt string `json:"resetAt,omitempty"` +} + +type ModelRateLimitStatus struct { + PlatformModelID string `json:"platformModelId"` + PlatformID string `json:"platformId"` + PlatformName string `json:"platformName"` + Provider string `json:"provider"` + ModelName string `json:"modelName"` + ProviderModelName string `json:"providerModelName,omitempty"` + ModelAlias string `json:"modelAlias,omitempty"` + DisplayName string `json:"displayName"` + ModelType []string `json:"modelType"` + Enabled bool `json:"enabled"` + RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"` + PlatformCooldownUntil string `json:"platformCooldownUntil,omitempty"` + ModelCooldownUntil string `json:"modelCooldownUntil,omitempty"` + Concurrent RateLimitMetricStatus `json:"concurrent"` + RPM RateLimitMetricStatus `json:"rpm"` + TPM RateLimitMetricStatus `json:"tpm"` + LoadRatio float64 `json:"loadRatio"` +} + +func (s *Store) ListModelRateLimitStatuses(ctx context.Context) ([]ModelRateLimitStatus, error) { + rows, err := s.pool.Query(ctx, ` +SELECT m.id::text, m.platform_id::text, p.name, p.provider, + m.model_name, COALESCE(NULLIF(m.provider_model_name, ''), m.model_name), COALESCE(m.model_alias, ''), + m.model_type, m.display_name, m.enabled, + p.rate_limit_policy, COALESCE(rp.rate_limit_policy, '{}'::jsonb), COALESCE(NULLIF(m.runtime_policy_override, '{}'::jsonb), b.runtime_policy_override, '{}'::jsonb), m.rate_limit_policy, + COALESCE(to_char(p.cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), + COALESCE(to_char(m.cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''), + COALESCE(con.active, 0)::float8, + COALESCE(rpm.used_value, 0)::float8, COALESCE(rpm.reserved_value, 0)::float8, COALESCE(rpm.reset_at::text, ''), + COALESCE(tpm.used_value, 0)::float8, COALESCE(tpm.reserved_value, 0)::float8, COALESCE(tpm.reset_at::text, '') +FROM platform_models m +JOIN integration_platforms p ON p.id = m.platform_id +LEFT JOIN base_model_catalog b ON b.id = m.base_model_id +LEFT JOIN model_runtime_policy_sets rp ON rp.id = COALESCE(m.runtime_policy_set_id, b.runtime_policy_set_id) +LEFT JOIN ( + SELECT scope_key, SUM(lease_value) AS active + FROM gateway_concurrency_leases + WHERE scope_type = 'platform_model' + AND released_at IS NULL + AND expires_at > now() + GROUP BY scope_key +) con ON con.scope_key = m.id::text +LEFT JOIN ( + SELECT DISTINCT ON (scope_key) scope_key, used_value, reserved_value, reset_at + FROM gateway_rate_limit_counters + WHERE scope_type = 'platform_model' + AND metric = 'rpm' + AND reset_at > now() + ORDER BY scope_key, window_start DESC +) rpm ON rpm.scope_key = m.id::text +LEFT JOIN ( + SELECT scope_key, SUM(used_value) AS used_value, SUM(reserved_value) AS reserved_value, MAX(reset_at) AS reset_at + FROM gateway_rate_limit_counters + WHERE scope_type = 'platform_model' + AND metric LIKE 'tpm%' + AND reset_at > now() + GROUP BY scope_key +) tpm ON tpm.scope_key = m.id::text +WHERE p.deleted_at IS NULL +ORDER BY p.priority ASC, m.model_name ASC`) + if err != nil { + return nil, err + } + defer rows.Close() + + items := make([]ModelRateLimitStatus, 0) + for rows.Next() { + var item ModelRateLimitStatus + var modelTypeBytes []byte + var platformPolicyBytes []byte + var runtimePolicyBytes []byte + var runtimeOverrideBytes []byte + var modelPolicyBytes []byte + var platformCooldownUntil string + var modelCooldownUntil string + var concurrentCurrent float64 + var rpmUsed float64 + var rpmReserved float64 + var rpmResetAt string + var tpmUsed float64 + var tpmReserved float64 + var tpmResetAt string + if err := rows.Scan( + &item.PlatformModelID, + &item.PlatformID, + &item.PlatformName, + &item.Provider, + &item.ModelName, + &item.ProviderModelName, + &item.ModelAlias, + &modelTypeBytes, + &item.DisplayName, + &item.Enabled, + &platformPolicyBytes, + &runtimePolicyBytes, + &runtimeOverrideBytes, + &modelPolicyBytes, + &platformCooldownUntil, + &modelCooldownUntil, + &concurrentCurrent, + &rpmUsed, + &rpmReserved, + &rpmResetAt, + &tpmUsed, + &tpmReserved, + &tpmResetAt, + ); err != nil { + return nil, err + } + item.ModelType = decodeStringArray(modelTypeBytes) + policy := effectiveModelRateLimitPolicy( + decodeObject(platformPolicyBytes), + decodeObject(runtimePolicyBytes), + decodeObject(runtimeOverrideBytes), + decodeObject(modelPolicyBytes), + ) + item.PlatformCooldownUntil = platformCooldownUntil + item.ModelCooldownUntil = modelCooldownUntil + item.RateLimitPolicy = policy + item.Concurrent = metricStatus(concurrentCurrent, concurrentCurrent, 0, rateLimitForMetric(policy, "concurrent"), "") + item.RPM = metricStatus(rpmUsed+rpmReserved, rpmUsed, rpmReserved, rateLimitForMetric(policy, "rpm"), rpmResetAt) + item.TPM = metricStatus(tpmUsed+tpmReserved, tpmUsed, tpmReserved, tpmLimit(policy), tpmResetAt) + item.LoadRatio = maxFloat(item.Concurrent.Ratio, item.RPM.Ratio, item.TPM.Ratio) + items = append(items, item) + } + if err := rows.Err(); err != nil { + return nil, err + } + sort.SliceStable(items, func(i, j int) bool { + if items[i].LoadRatio == items[j].LoadRatio { + return strings.ToLower(items[i].DisplayName) < strings.ToLower(items[j].DisplayName) + } + return items[i].LoadRatio > items[j].LoadRatio + }) + return items, nil +} + +func effectiveModelRateLimitPolicy(platformPolicy map[string]any, runtimePolicy map[string]any, runtimeOverride map[string]any, modelPolicy map[string]any) map[string]any { + policy := platformPolicy + if hasRateLimitRules(runtimePolicy) { + policy = shallowMergeMap(policy, runtimePolicy) + } + if nested, ok := runtimeOverride["rateLimitPolicy"].(map[string]any); ok && len(nested) > 0 { + policy = shallowMergeMap(policy, nested) + } + if hasRateLimitRules(modelPolicy) { + policy = shallowMergeMap(policy, modelPolicy) + } + if hasRateLimitRules(policy) { + return policy + } + return nil +} + +func hasRateLimitRules(policy map[string]any) bool { + rules, _ := policy["rules"].([]any) + return len(rules) > 0 +} + +func shallowMergeMap(base map[string]any, override map[string]any) map[string]any { + out := map[string]any{} + for key, value := range base { + out[key] = value + } + for key, value := range override { + out[key] = value + } + return out +} + +func rateLimitForMetric(policy map[string]any, metric string) float64 { + rules, _ := policy["rules"].([]any) + for _, rawRule := range rules { + rule, _ := rawRule.(map[string]any) + if strings.TrimSpace(stringValue(rule["metric"])) == metric { + return floatValue(rule["limit"]) + } + } + return 0 +} + +func tpmLimit(policy map[string]any) float64 { + if limit := rateLimitForMetric(policy, "tpm_total"); limit > 0 { + return limit + } + return rateLimitForMetric(policy, "tpm_input") + rateLimitForMetric(policy, "tpm_output") +} + +func metricStatus(current float64, used float64, reserved float64, limit float64, resetAt string) RateLimitMetricStatus { + status := RateLimitMetricStatus{ + CurrentValue: current, + UsedValue: used, + ReservedValue: reserved, + LimitValue: limit, + Limited: limit > 0, + ResetAt: resetAt, + } + if status.Limited { + status.Ratio = current / limit + } + return status +} + +func maxFloat(values ...float64) float64 { + out := 0.0 + for _, value := range values { + if value > out { + out = value + } + } + return out +} + +func stringValue(value any) string { + text, _ := value.(string) + return strings.TrimSpace(text) +} + +func floatValue(value any) float64 { + switch typed := value.(type) { + case int: + return float64(typed) + case int64: + return float64(typed) + case float64: + return typed + default: + return 0 + } +} diff --git a/apps/api/internal/store/rate_limit_status_test.go b/apps/api/internal/store/rate_limit_status_test.go new file mode 100644 index 0000000..d0e176f --- /dev/null +++ b/apps/api/internal/store/rate_limit_status_test.go @@ -0,0 +1,32 @@ +package store + +import "testing" + +func TestEffectiveModelRateLimitPolicyTreatsModelRulesAsAuthoritative(t *testing.T) { + policy := effectiveModelRateLimitPolicy( + map[string]any{"rules": []any{ + map[string]any{"metric": "rpm", "limit": 500}, + map[string]any{"metric": "tpm_total", "limit": 100000}, + }}, + map[string]any{"rules": []any{ + map[string]any{"metric": "rpm", "limit": 120}, + map[string]any{"metric": "tpm_total", "limit": 240000}, + map[string]any{"metric": "concurrent", "limit": 6}, + }}, + map[string]any{}, + map[string]any{"rules": []any{ + map[string]any{"metric": "rpm", "limit": 30}, + map[string]any{"metric": "concurrent", "limit": 2}, + }}, + ) + + if got := rateLimitForMetric(policy, "rpm"); got != 30 { + t.Fatalf("expected model rpm limit to win, got %v", got) + } + if got := rateLimitForMetric(policy, "concurrent"); got != 2 { + t.Fatalf("expected model concurrent limit to win, got %v", got) + } + if got := rateLimitForMetric(policy, "tpm_total"); got != 0 { + t.Fatalf("expected missing model tpm limit to mean unlimited, got %v", got) + } +} diff --git a/apps/api/internal/store/rate_limits.go b/apps/api/internal/store/rate_limits.go index 578d599..7bea2cf 100644 --- a/apps/api/internal/store/rate_limits.go +++ b/apps/api/internal/store/rate_limits.go @@ -3,9 +3,19 @@ package store import ( "context" "errors" + "time" + + "github.com/jackc/pgx/v5" ) -func (s *Store) ReserveRateLimits(ctx context.Context, taskID string, reservations []RateLimitReservation) (RateLimitResult, error) { +type RuntimeRecoveryResult struct { + ReleasedConcurrencyLeases int64 `json:"releasedConcurrencyLeases"` + ReleasedRateReservations int64 `json:"releasedRateReservations"` + FailedAttempts int64 `json:"failedAttempts"` + FailedTasks int64 `json:"failedTasks"` +} + +func (s *Store) ReserveRateLimits(ctx context.Context, taskID string, attemptID string, reservations []RateLimitReservation) (RateLimitResult, error) { tx, err := s.pool.Begin(ctx) if err != nil { return RateLimitResult{}, err @@ -24,70 +34,122 @@ func (s *Store) ReserveRateLimits(ctx context.Context, taskID string, reservatio reservation.WindowSeconds = 60 } if reservation.Metric == "concurrent" { - if reservation.LeaseTTLSeconds <= 0 { - reservation.LeaseTTLSeconds = 120 + leaseID, err := reserveConcurrencyLease(ctx, tx, taskID, attemptID, reservation) + if err != nil { + return RateLimitResult{}, err } - var active float64 - if err := tx.QueryRow(ctx, ` + result.LeaseIDs = append(result.LeaseIDs, leaseID) + continue + } + normalized, err := reserveCounterWindow(ctx, tx, taskID, attemptID, reservation) + if err != nil { + return RateLimitResult{}, err + } + result.Reservations = append(result.Reservations, normalized) + } + return result, tx.Commit(ctx) +} + +func reserveConcurrencyLease(ctx context.Context, tx pgx.Tx, taskID string, attemptID string, reservation RateLimitReservation) (string, error) { + if reservation.LeaseTTLSeconds <= 0 { + reservation.LeaseTTLSeconds = 120 + } + var active float64 + if err := tx.QueryRow(ctx, ` SELECT COALESCE(SUM(lease_value), 0)::float8 FROM gateway_concurrency_leases WHERE scope_type = $1 AND scope_key = $2 AND released_at IS NULL AND expires_at > now()`, - reservation.ScopeType, - reservation.ScopeKey, - ).Scan(&active); err != nil { - return RateLimitResult{}, err - } - if active+reservation.Amount > reservation.Limit { - return RateLimitResult{}, ErrRateLimited - } - var leaseID string - if err := tx.QueryRow(ctx, ` -INSERT INTO gateway_concurrency_leases (task_id, scope_type, scope_key, lease_value, expires_at) -VALUES ($1::uuid, $2, $3, $4, now() + ($5::int * interval '1 second')) + reservation.ScopeType, + reservation.ScopeKey, + ).Scan(&active); err != nil { + return "", err + } + if active+reservation.Amount > reservation.Limit { + return "", ErrRateLimited + } + var leaseID string + if err := tx.QueryRow(ctx, ` +INSERT INTO gateway_concurrency_leases (task_id, attempt_id, scope_type, scope_key, lease_value, expires_at) +VALUES ($1::uuid, NULLIF($2, '')::uuid, $3, $4, $5, now() + ($6::int * interval '1 second')) RETURNING id::text`, - taskID, - reservation.ScopeType, - reservation.ScopeKey, - reservation.Amount, - reservation.LeaseTTLSeconds, - ).Scan(&leaseID); err != nil { - return RateLimitResult{}, err - } - result.LeaseIDs = append(result.LeaseIDs, leaseID) - continue - } - tag, err := tx.Exec(ctx, ` + taskID, + attemptID, + reservation.ScopeType, + reservation.ScopeKey, + reservation.Amount, + reservation.LeaseTTLSeconds, + ).Scan(&leaseID); err != nil { + return "", err + } + return leaseID, nil +} + +func reserveCounterWindow(ctx context.Context, tx pgx.Tx, taskID string, attemptID string, reservation RateLimitReservation) (RateLimitReservation, error) { + usedAmount := 0.0 + reservedAmount := reservation.Amount + var windowStart time.Time + err := tx.QueryRow(ctx, ` INSERT INTO gateway_rate_limit_counters ( scope_type, scope_key, metric, window_start, limit_value, used_value, reserved_value, reset_at ) VALUES ( - $1, $2, $3, date_trunc('minute', now()), $4, $5, 0, - date_trunc('minute', now()) + ($6::int * interval '1 second') + $1, $2, $3, date_trunc('minute', now()), $4, $5, $6, + date_trunc('minute', now()) + ($7::int * interval '1 second') ) ON CONFLICT (scope_type, scope_key, metric, window_start) DO UPDATE SET limit_value = EXCLUDED.limit_value, used_value = gateway_rate_limit_counters.used_value + EXCLUDED.used_value, + reserved_value = gateway_rate_limit_counters.reserved_value + EXCLUDED.reserved_value, reset_at = EXCLUDED.reset_at, updated_at = now() -WHERE gateway_rate_limit_counters.used_value + EXCLUDED.used_value <= EXCLUDED.limit_value`, - reservation.ScopeType, - reservation.ScopeKey, - reservation.Metric, - reservation.Limit, - reservation.Amount, - reservation.WindowSeconds, - ) - if err != nil { - return RateLimitResult{}, err - } - if tag.RowsAffected() == 0 { - return RateLimitResult{}, ErrRateLimited +WHERE gateway_rate_limit_counters.used_value + gateway_rate_limit_counters.reserved_value + EXCLUDED.used_value + EXCLUDED.reserved_value <= EXCLUDED.limit_value +RETURNING window_start`, + reservation.ScopeType, + reservation.ScopeKey, + reservation.Metric, + reservation.Limit, + usedAmount, + reservedAmount, + reservation.WindowSeconds, + ).Scan(&windowStart) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return RateLimitReservation{}, ErrRateLimited } + return RateLimitReservation{}, err } - return result, tx.Commit(ctx) + reservation.WindowStart = windowStart + if err := tx.QueryRow(ctx, ` +INSERT INTO gateway_rate_limit_reservations ( + task_id, attempt_id, scope_type, scope_key, metric, window_start, limit_value, reserved_amount, status +) +VALUES ( + $1::uuid, NULLIF($2, '')::uuid, $3, $4, $5, $6, $7, $8, 'reserved' +) +RETURNING id::text`, + taskID, + attemptID, + reservation.ScopeType, + reservation.ScopeKey, + reservation.Metric, + windowStart, + reservation.Limit, + reservedAmount, + ).Scan(&reservation.ReservationID); err != nil { + return RateLimitReservation{}, err + } + return reservation, nil +} + +func (s *Store) CommitRateLimitReservations(ctx context.Context, reservations []RateLimitReservation, actualByMetric map[string]float64) error { + return s.finishRateLimitReservations(ctx, reservations, actualByMetric, "committed", "success") +} + +func (s *Store) ReleaseRateLimitReservations(ctx context.Context, reservations []RateLimitReservation, reason string) error { + return s.finishRateLimitReservations(ctx, reservations, nil, "released", reason) } func (s *Store) ReleaseConcurrencyLeases(ctx context.Context, leaseIDs []string) error { @@ -107,3 +169,193 @@ WHERE id = $1::uuid AND released_at IS NULL`, leaseID); err != nil && !errors.Is } return nil } + +func (s *Store) RecoverInterruptedRuntimeState(ctx context.Context) (RuntimeRecoveryResult, error) { + tx, err := s.pool.Begin(ctx) + if err != nil { + return RuntimeRecoveryResult{}, err + } + defer tx.Rollback(ctx) + + result := RuntimeRecoveryResult{} + rows, err := tx.Query(ctx, ` +UPDATE gateway_rate_limit_reservations +SET status = 'released', + reason = 'server_restarted', + finalized_at = now(), + updated_at = now() +WHERE status = 'reserved' +RETURNING scope_type, scope_key, metric, window_start, reserved_amount::float8`) + if err != nil { + return RuntimeRecoveryResult{}, err + } + for rows.Next() { + var reservation RateLimitReservation + if err := rows.Scan(&reservation.ScopeType, &reservation.ScopeKey, &reservation.Metric, &reservation.WindowStart, &reservation.Amount); err != nil { + rows.Close() + return RuntimeRecoveryResult{}, err + } + if err := releaseCounterReservation(ctx, tx, reservation.ScopeType, reservation.ScopeKey, reservation.Metric, reservation.WindowStart, reservation.Amount); err != nil { + rows.Close() + return RuntimeRecoveryResult{}, err + } + result.ReleasedRateReservations++ + } + if err := rows.Err(); err != nil { + rows.Close() + return RuntimeRecoveryResult{}, err + } + rows.Close() + + tag, err := tx.Exec(ctx, ` +UPDATE gateway_concurrency_leases +SET released_at = now() +WHERE released_at IS NULL + AND expires_at > now()`) + if err != nil { + return RuntimeRecoveryResult{}, err + } + result.ReleasedConcurrencyLeases = tag.RowsAffected() + + tag, err = tx.Exec(ctx, ` +UPDATE gateway_task_attempts +SET status = 'failed', + retryable = false, + error_code = 'server_restarted', + error_message = 'attempt interrupted by service restart', + finished_at = now() +WHERE status = 'running'`) + if err != nil { + return RuntimeRecoveryResult{}, err + } + result.FailedAttempts = tag.RowsAffected() + + taskRows, err := tx.Query(ctx, ` +UPDATE gateway_tasks +SET status = 'failed', + error = 'task interrupted by service restart', + error_code = 'server_restarted', + error_message = 'task interrupted by service restart', + finished_at = now(), + updated_at = now() +WHERE status IN ('queued', 'running') +RETURNING id::text`) + if err != nil { + return RuntimeRecoveryResult{}, err + } + taskIDs := make([]string, 0) + for taskRows.Next() { + var taskID string + if err := taskRows.Scan(&taskID); err != nil { + taskRows.Close() + return RuntimeRecoveryResult{}, err + } + taskIDs = append(taskIDs, taskID) + } + if err := taskRows.Err(); err != nil { + taskRows.Close() + return RuntimeRecoveryResult{}, err + } + taskRows.Close() + + for _, taskID := range taskIDs { + if _, err := tx.Exec(ctx, ` +INSERT INTO gateway_task_events (task_id, seq, event_type, status, phase, progress, message, payload, simulated) +VALUES ( + $1::uuid, + COALESCE((SELECT MAX(seq) + 1 FROM gateway_task_events WHERE task_id = $1::uuid), 1), + 'task.recovered', + 'failed', + 'recovered', + 1, + 'task interrupted by service restart', + '{"code":"server_restarted"}'::jsonb, + false +)`, taskID); err != nil { + return RuntimeRecoveryResult{}, err + } + } + result.FailedTasks = int64(len(taskIDs)) + + return result, tx.Commit(ctx) +} + +func (s *Store) finishRateLimitReservations(ctx context.Context, reservations []RateLimitReservation, actualByMetric map[string]float64, status string, reason string) error { + if len(reservations) == 0 { + return nil + } + tx, err := s.pool.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + for _, reservation := range reservations { + if reservation.ReservationID == "" { + continue + } + actualAmount := actualByMetric[reservation.Metric] + if status == "committed" && actualAmount <= 0 { + actualAmount = reservation.Amount + } + var stored RateLimitReservation + err := tx.QueryRow(ctx, ` +UPDATE gateway_rate_limit_reservations +SET status = $2, + reason = NULLIF($3, ''), + actual_amount = CASE WHEN $2 = 'committed' THEN $4 ELSE actual_amount END, + finalized_at = now(), + updated_at = now() +WHERE id = $1::uuid + AND status = 'reserved' +RETURNING scope_type, scope_key, metric, window_start, reserved_amount::float8`, + reservation.ReservationID, + status, + reason, + actualAmount, + ).Scan(&stored.ScopeType, &stored.ScopeKey, &stored.Metric, &stored.WindowStart, &stored.Amount) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + continue + } + return err + } + if status == "committed" { + if err := commitCounterReservation(ctx, tx, stored.ScopeType, stored.ScopeKey, stored.Metric, stored.WindowStart, stored.Amount, actualAmount); err != nil { + return err + } + continue + } + if err := releaseCounterReservation(ctx, tx, stored.ScopeType, stored.ScopeKey, stored.Metric, stored.WindowStart, stored.Amount); err != nil { + return err + } + } + return tx.Commit(ctx) +} + +func commitCounterReservation(ctx context.Context, tx pgx.Tx, scopeType string, scopeKey string, metric string, windowStart time.Time, reservedAmount float64, actualAmount float64) error { + _, err := tx.Exec(ctx, ` +UPDATE gateway_rate_limit_counters +SET reserved_value = GREATEST(reserved_value - $5, 0), + used_value = used_value + $6, + updated_at = now() +WHERE scope_type = $1 + AND scope_key = $2 + AND metric = $3 + AND window_start = $4`, + scopeType, scopeKey, metric, windowStart, reservedAmount, actualAmount) + return err +} + +func releaseCounterReservation(ctx context.Context, tx pgx.Tx, scopeType string, scopeKey string, metric string, windowStart time.Time, reservedAmount float64) error { + _, err := tx.Exec(ctx, ` +UPDATE gateway_rate_limit_counters +SET reserved_value = GREATEST(reserved_value - $5, 0), + updated_at = now() +WHERE scope_type = $1 + AND scope_key = $2 + AND metric = $3 + AND window_start = $4`, + scopeType, scopeKey, metric, windowStart, reservedAmount) + return err +} diff --git a/apps/api/internal/store/runtime_policies.go b/apps/api/internal/store/runtime_policies.go index dc15697..dc28538 100644 --- a/apps/api/internal/store/runtime_policies.go +++ b/apps/api/internal/store/runtime_policies.go @@ -134,6 +134,21 @@ WHERE id = $1::uuid`, platformID, cooldownSeconds) return err } +func (s *Store) CooldownCandidatePlatformModel(ctx context.Context, platformModelID string, cooldownSeconds int) error { + if strings.TrimSpace(platformModelID) == "" { + return nil + } + if cooldownSeconds <= 0 { + cooldownSeconds = 300 + } + _, err := s.pool.Exec(ctx, ` +UPDATE platform_models +SET cooldown_until = now() + ($2::int * interval '1 second'), + updated_at = now() +WHERE id = $1::uuid`, platformModelID, cooldownSeconds) + return err +} + func (s *Store) DemoteCandidatePlatformPriority(ctx context.Context, platformID string, demoteStep int) error { if strings.TrimSpace(platformID) == "" { return nil diff --git a/apps/api/internal/store/runtime_types.go b/apps/api/internal/store/runtime_types.go index faeb7e7..f08b848 100644 --- a/apps/api/internal/store/runtime_types.go +++ b/apps/api/internal/store/runtime_types.go @@ -2,6 +2,7 @@ package store import ( "errors" + "strings" "time" ) @@ -10,6 +11,27 @@ var ( ErrRateLimited = errors.New("rate limit exceeded") ) +type ModelCandidateUnavailableError struct { + Code string + Message string +} + +func (e *ModelCandidateUnavailableError) Error() string { + return e.Message +} + +func (e *ModelCandidateUnavailableError) Unwrap() error { + return ErrNoModelCandidate +} + +func ModelCandidateErrorCode(err error) string { + var candidateErr *ModelCandidateUnavailableError + if errors.As(err, &candidateErr) && strings.TrimSpace(candidateErr.Code) != "" { + return candidateErr.Code + } + return "no_model_candidate" +} + type CreatePlatformModelInput struct { PlatformID string `json:"platformId"` BaseModelID string `json:"baseModelId"` @@ -81,6 +103,7 @@ type RuntimeModelCandidate struct { } type RateLimitReservation struct { + ReservationID string ScopeType string ScopeKey string Metric string @@ -88,10 +111,12 @@ type RateLimitReservation struct { Amount float64 WindowSeconds int LeaseTTLSeconds int + WindowStart time.Time } type RateLimitResult struct { - LeaseIDs []string + LeaseIDs []string + Reservations []RateLimitReservation } type CreateTaskAttemptInput struct { diff --git a/apps/api/migrations/0029_rate_limit_reservations.sql b/apps/api/migrations/0029_rate_limit_reservations.sql new file mode 100644 index 0000000..19e690b --- /dev/null +++ b/apps/api/migrations/0029_rate_limit_reservations.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS gateway_rate_limit_reservations ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE, + attempt_id uuid REFERENCES gateway_task_attempts(id) ON DELETE SET NULL, + scope_type text NOT NULL, + scope_key text NOT NULL, + metric text NOT NULL, + window_start timestamptz NOT NULL, + limit_value numeric NOT NULL, + reserved_amount numeric NOT NULL DEFAULT 0, + actual_amount numeric NOT NULL DEFAULT 0, + status text NOT NULL DEFAULT 'reserved', + reason text, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + finalized_at timestamptz +); + +CREATE INDEX IF NOT EXISTS idx_rate_limit_reservations_active + ON gateway_rate_limit_reservations(status, scope_type, scope_key, metric, window_start); + +CREATE INDEX IF NOT EXISTS idx_rate_limit_reservations_task + ON gateway_rate_limit_reservations(task_id, attempt_id); diff --git a/apps/api/migrations/0030_platform_model_cooldown.sql b/apps/api/migrations/0030_platform_model_cooldown.sql new file mode 100644 index 0000000..ad4b09d --- /dev/null +++ b/apps/api/migrations/0030_platform_model_cooldown.sql @@ -0,0 +1,5 @@ +ALTER TABLE IF EXISTS platform_models + ADD COLUMN IF NOT EXISTS cooldown_until timestamptz; + +CREATE INDEX IF NOT EXISTS idx_platform_models_cooldown + ON platform_models(cooldown_until); diff --git a/apps/web/src/App.tsx b/apps/web/src/App.tsx index f5011fc..82a8849 100644 --- a/apps/web/src/App.tsx +++ b/apps/web/src/App.tsx @@ -18,6 +18,7 @@ import type { GatewayWalletTransaction, IntegrationPlatform, ModelCatalogResponse, + ModelRateLimitStatus, PlatformModel, PricingRule, PricingRuleSet, @@ -54,6 +55,7 @@ import { listBaseModels, listCatalogProviders, listModelCatalog, + listModelRateLimitStatuses, listModels, listPlayableApiKeys, listPlayableModels, @@ -140,6 +142,7 @@ type DataKey = | 'runnerPolicy' | 'runtimePolicySets' | 'rateLimitWindows' + | 'modelRateLimits' | 'tenants' | 'users' | 'userGroups' @@ -183,6 +186,7 @@ export function App() { const [accessRules, setAccessRules] = useState([]); const [auditLogs, setAuditLogs] = useState([]); const [rateLimitWindows, setRateLimitWindows] = useState([]); + const [modelRateLimits, setModelRateLimits] = useState([]); const [tenants, setTenants] = useState([]); const [users, setUsers] = useState([]); const [userGroups, setUserGroups] = useState([]); @@ -239,6 +243,23 @@ export function App() { useEffect(() => { void ensureRouteData(token); }, [activePage, adminSection, taskListRequestKey, transactionListRequestKey, workspaceSection, token]); + useEffect(() => { + if (!token || activePage !== 'admin' || adminSection !== 'platforms') return undefined; + const timer = window.setInterval(() => { + void Promise.all([listModelRateLimitStatuses(token), listPlatforms(token)]) + .then(([rateLimitResponse, platformResponse]) => { + setModelRateLimits(rateLimitResponse.items); + setPlatforms(platformResponse.items); + loadedDataKeysRef.current.add('modelRateLimits'); + loadedDataKeysRef.current.add('platforms'); + }) + .catch(() => { + loadedDataKeysRef.current.delete('modelRateLimits'); + loadedDataKeysRef.current.delete('platforms'); + }); + }, 3000); + return () => window.clearInterval(timer); + }, [activePage, adminSection, token]); useEffect(() => { function handlePopState() { applyRoute(parseAppRoute()); @@ -278,9 +299,10 @@ export function App() { platforms, pricingRules, pricingRuleSets, - providers, - rateLimitWindows, - runtimePolicySets, + providers, + rateLimitWindows, + modelRateLimits, + runtimePolicySets, taskResult, tasks, tenants, @@ -288,7 +310,7 @@ export function App() { users, walletAccounts, walletTransactions, - }), [accessRules, apiKeys, auditLogs, baseModels, modelCatalog, models, networkProxyConfig, platforms, pricingRuleSets, pricingRules, providers, rateLimitWindows, runnerPolicy, runtimePolicySets, taskResult, tasks, tenants, userGroups, users, walletAccounts, walletTransactions]); + }), [accessRules, apiKeys, auditLogs, baseModels, modelCatalog, modelRateLimits, models, networkProxyConfig, platforms, pricingRuleSets, pricingRules, providers, rateLimitWindows, runnerPolicy, runtimePolicySets, taskResult, tasks, tenants, userGroups, users, walletAccounts, walletTransactions]); async function refresh(nextToken = token) { await ensureRouteData(nextToken, true); @@ -392,6 +414,9 @@ export function App() { case 'rateLimitWindows': setRateLimitWindows((await listRateLimitWindows(nextToken)).items); return; + case 'modelRateLimits': + setModelRateLimits((await listModelRateLimitStatuses(nextToken)).items); + return; case 'tenants': setTenants((await listTenants(nextToken)).items); return; @@ -514,7 +539,7 @@ export function App() { const modelsResponse = await replacePlatformModels(token, platform.id, modelBindings); setPlatforms((current) => [platformForState, ...current.filter((item) => item.id !== platform.id)]); setModels((current) => [...current.filter((model) => model.platformId !== platform.id), ...modelsResponse.items]); - invalidateDataKeys('modelCatalog'); + invalidateDataKeys('modelCatalog', 'modelRateLimits'); setCoreState('ready'); setCoreMessage(input.platformId ? `平台已更新,当前绑定 ${input.models.length} 个模型。` @@ -533,7 +558,7 @@ export function App() { await deletePlatform(token, platformId); setPlatforms((current) => current.filter((item) => item.id !== platformId)); setModels((current) => current.filter((item) => item.platformId !== platformId)); - invalidateDataKeys('modelCatalog'); + invalidateDataKeys('modelCatalog', 'modelRateLimits'); setCoreState('ready'); setCoreMessage('平台已删除。'); } catch (err) { @@ -788,6 +813,7 @@ export function App() { setAccessRules([]); setAuditLogs([]); setRateLimitWindows([]); + setModelRateLimits([]); setTenants([]); setUsers([]); setUserGroups([]); @@ -1132,7 +1158,7 @@ function dataKeysForRoute( if (activePage !== 'admin') return []; switch (adminSection) { case 'overview': - return ['platforms', 'models', 'providers', 'pricingRules', 'runtimePolicySets', 'rateLimitWindows', 'tenants', 'users', 'userGroups', 'accessRules']; + return ['platforms', 'models', 'providers', 'pricingRules', 'runtimePolicySets', 'rateLimitWindows', 'modelRateLimits', 'tenants', 'users', 'userGroups', 'accessRules']; case 'globalModels': return ['providers']; case 'pricing': @@ -1142,7 +1168,7 @@ function dataKeysForRoute( case 'baseModels': return ['baseModels', 'providers', 'pricingRuleSets', 'runtimePolicySets']; case 'platforms': - return ['platforms', 'models', 'providers', 'baseModels', 'pricingRuleSets', 'networkProxyConfig']; + return ['platforms', 'models', 'modelRateLimits', 'providers', 'baseModels', 'pricingRuleSets', 'networkProxyConfig']; case 'tenants': return ['tenants', 'userGroups']; case 'users': diff --git a/apps/web/src/api.ts b/apps/web/src/api.ts index 6256420..0d96d30 100644 --- a/apps/web/src/api.ts +++ b/apps/web/src/api.ts @@ -22,6 +22,7 @@ import type { IntegrationPlatform, ListResponse, ModelCatalogResponse, + ModelRateLimitStatus, PlatformModel, PlayableGatewayApiKey, PricingRule, @@ -696,6 +697,10 @@ export async function listRateLimitWindows(token: string): Promise>('/api/admin/runtime/rate-limit-windows', { token }); } +export async function listModelRateLimitStatuses(token: string): Promise> { + return request>('/api/admin/runtime/model-rate-limits', { token }); +} + export async function getNetworkProxyConfig(token: string): Promise { return request('/api/admin/config/network-proxy', { token }); } diff --git a/apps/web/src/app-state.ts b/apps/web/src/app-state.ts index 440b5b0..31ec637 100644 --- a/apps/web/src/app-state.ts +++ b/apps/web/src/app-state.ts @@ -13,6 +13,7 @@ import type { GatewayWalletTransaction, IntegrationPlatform, ModelCatalogResponse, + ModelRateLimitStatus, PlatformModel, PricingRule, PricingRuleSet, @@ -35,6 +36,7 @@ export interface ConsoleData { pricingRuleSets: PricingRuleSet[]; providers: CatalogProvider[]; rateLimitWindows: RateLimitWindow[]; + modelRateLimits: ModelRateLimitStatus[]; runtimePolicySets: RuntimePolicySet[]; taskResult: GatewayTask | null; tasks: GatewayTask[]; diff --git a/apps/web/src/pages/AdminPage.tsx b/apps/web/src/pages/AdminPage.tsx index 480aab4..b1a55ab 100644 --- a/apps/web/src/pages/AdminPage.tsx +++ b/apps/web/src/pages/AdminPage.tsx @@ -137,8 +137,9 @@ export function AdminPage(props: { Promise; }) { const defaultProvider = props.providers[0]?.providerKey ?? props.baseModels[0]?.providerKey ?? ''; + const [now, setNow] = useState(() => Date.now()); const [dialogOpen, setDialogOpen] = useState(false); - const [viewMode, setViewMode] = useState<'platforms' | 'models'>('platforms'); + const [viewMode, setViewMode] = useState<'platforms' | 'models' | 'limits'>('platforms'); const [modelQuery, setModelQuery] = useState(''); const [selectedPlatformId, setSelectedPlatformId] = useState(''); const [validationMessage, setValidationMessage] = useState(''); @@ -72,6 +74,11 @@ export function PlatformManagementPanel(props: { }); }, [modelQuery, platformMap, props.platformModels, selectedModelPlatformId]); + useEffect(() => { + const timer = window.setInterval(() => setNow(Date.now()), 1000); + return () => window.clearInterval(timer); + }, []); + function openCreateDialog() { const provider = providerOptions[0] ?? ''; const providerName = providerDisplayName(provider, providerMap); @@ -166,12 +173,14 @@ export function PlatformManagementPanel(props: { -
- - -
- {viewMode === 'platforms' ? ( - + + + + + {viewMode === 'platforms' ? ( + - ) : ( - - )} + now={now} + onPlatformChange={setSelectedPlatformId} + /> + ) : ( + + )} {props.message &&

{props.message}

}
@@ -391,6 +403,7 @@ function ModelBindingPolicy(props: { form: PlatformWizardForm; onChange: (value: } function PlatformTable(props: { + now: number; platformModelCount: Map; platforms: IntegrationPlatform[]; providerMap: Map; @@ -428,6 +441,7 @@ function PlatformTable(props: { const pricing = platformPricingSummary(platform, props.pricingRuleSets); const rateLimit = platformRateLimitSummary(platform.rateLimitPolicy); const runtime = platformRuntimeSummary(platform); + const platformCooldownMs = cooldownRemainingMs(platform.cooldownUntil, props.now); return ( @@ -458,8 +472,14 @@ function PlatformTable(props: { {props.platformModelCount.get(platform.id) ?? 0} - {platform.status} - {runtime} + + {platformCooldownMs > 0 ? ( + 冷却中 + ) : ( + {platform.status} + )} + + {platformCooldownMs > 0 ? `剩余 ${formatCooldownRemaining(platformCooldownMs)}` : runtime} @@ -487,6 +507,7 @@ function PlatformModelTable(props: { platformMap: Map; platforms: IntegrationPlatform[]; providerMap: Map; + now: number; onModelQueryChange: (value: string) => void; onPlatformChange: (value: string) => void; }) { @@ -520,12 +541,14 @@ function PlatformModelTable(props: { const provider = platform ? props.providerMap.get(platform.provider) : undefined; const baseModel = findBaseModelForPlatformModel(platform, props.baseModels, model); const modelIconPath = readPlatformModelIconPath(model, baseModel); + const modelCooldownMs = cooldownRemainingMs(model.cooldownUntil, props.now); return ( {model.modelType.join(', ')}, - {model.enabled ? 'enabled' : 'disabled'}, + {model.modelType.join(', ')}, + ...(modelCooldownMs > 0 ? [冷却中 · {formatCooldownRemaining(modelCooldownMs)}] : []), + {model.enabled ? 'enabled' : 'disabled'}, ]} chips={platformModelChips(model)} iconPath={modelIconPath || provider?.iconPath} @@ -545,6 +568,62 @@ function PlatformModelTable(props: { ) )} + + ); + } + +function RateLimitStatusTable(props: { statuses: ModelRateLimitStatus[]; platformMap: Map; now: number }) { + if (!props.statuses.length) { + return ; + } + return ( +
+
+ 按综合满载率从高到低排序 + 每 3 秒刷新一次;TPM 显示已结算 + 预占。 +
+
+ + + 模型 + 平台 + 并发 + TPM + RPM + 状态 + 满载率 + + {props.statuses.map((status) => { + const platform = props.platformMap.get(status.platformId); + return ( + + + + {status.displayName || status.modelAlias || status.modelName} + {status.providerModelName || status.modelName} + + + + + {platform ? platformDisplayName(platform) : status.platformName} + {status.provider} + + + {metricCell(status.concurrent)} + {metricCell(status.tpm, true)} + {metricCell(status.rpm)} + {modelRuntimeStatusCell(status, props.now)} + + + {formatPercent(status.loadRatio)} + + + + + ); + })} +
+
); } @@ -1112,7 +1191,63 @@ function rateLimitMetricText(metric: string) { concurrent: '并发', queue_size: '队列', }; - return labels[metric] ?? metric; + return labels[metric] ?? metric; + } + +function metricCell(metric: ModelRateLimitStatus['rpm'], includeReserved = false) { + if (!metric.limited) return {formatLimit(metric.currentValue)} / 不限未配置上限; + return ( + + {formatLimit(metric.currentValue)} / {formatLimit(metric.limitValue)} + {includeReserved && metric.reservedValue > 0 ? `已用 ${formatLimit(metric.usedValue)} · 预占 ${formatLimit(metric.reservedValue)}` : `窗口 ${formatPercent(metric.ratio)}`} + + ); +} + +function modelRuntimeStatusCell(status: ModelRateLimitStatus, now: number) { + const modelCooldownMs = cooldownRemainingMs(status.modelCooldownUntil, now); + const platformCooldownMs = cooldownRemainingMs(status.platformCooldownUntil, now); + if (modelCooldownMs > 0) { + return ( + + 模型冷却中 + 剩余 {formatCooldownRemaining(modelCooldownMs)} + + ); + } + if (platformCooldownMs > 0) { + return ( + + 平台冷却中 + 剩余 {formatCooldownRemaining(platformCooldownMs)} + + ); + } + return ( + + {status.enabled ? '可用' : '已停用'} + {status.enabled ? '参与路由' : '不参与路由'} + + ); +} + +function cooldownRemainingMs(cooldownUntil: string | undefined, now: number) { + if (!cooldownUntil) return 0; + const until = Date.parse(cooldownUntil); + if (!Number.isFinite(until)) return 0; + return Math.max(until - now, 0); +} + +function formatCooldownRemaining(milliseconds: number) { + const minutes = milliseconds / 60000; + if (minutes >= 1) return `${trimNumber(Math.ceil(minutes * 10) / 10)} 分钟`; + const seconds = Math.ceil(milliseconds / 1000); + return `${Math.max(seconds, 1)} 秒`; +} + +function formatPercent(value: number) { + if (!Number.isFinite(value) || value <= 0) return '0%'; + return `${trimNumber(value * 100)}%`; } function platformRuntimeSummary(platform: IntegrationPlatform) { diff --git a/apps/web/src/styles/pages.css b/apps/web/src/styles/pages.css index afaca74..ed795ae 100644 --- a/apps/web/src/styles/pages.css +++ b/apps/web/src/styles/pages.css @@ -964,6 +964,104 @@ gap: 12px; } +.platformLimitView { + display: grid; + grid-template-rows: auto minmax(0, 1fr); + gap: 10px; + min-height: 0; + max-height: calc(100dvh - 220px); + overflow: hidden; +} + +.platformLimitHeader { + display: flex; + align-items: center; + justify-content: space-between; + gap: 12px; + color: var(--muted-foreground); + font-size: var(--font-size-xs); +} + +.platformLimitHeader span { + display: inline-flex; + align-items: center; + gap: 6px; + color: var(--text-strong); + font-weight: var(--font-weight-semibold); +} + +.platformLimitTableViewport { + min-width: 0; + min-height: 0; + max-height: calc(100dvh - 270px); + overflow: auto; + border: 1px solid var(--border-subtle); + border-radius: 10px; + background: #fff; + overscroll-behavior: contain; +} + +.platformLimitTable { + width: max-content; + min-width: 100%; + overflow: visible; + border: 0; + border-radius: 0; +} + +.platformLimitTable .shTableHeader { + position: sticky; + top: 0; + z-index: 2; +} + +.platformLimitTable .shTableRow { + grid-template-columns: clamp(150px, 16vw, 220px) minmax(148px, 1fr) minmax(104px, max-content) minmax(136px, max-content) minmax(122px, max-content) minmax(132px, max-content) minmax(128px, max-content); + min-width: 920px; +} + +.platformLimitTable .shTableHead, +.platformLimitTable .shTableCell { + padding-right: 10px; + padding-left: 10px; +} + +.rateMetricCell, +.rateLoadCell { + display: grid; + min-width: 0; + gap: 4px; +} + +.rateMetricCell strong, +.rateLoadCell strong { + color: var(--text-strong); + font-size: var(--font-size-sm); +} + +.rateMetricCell small { + overflow: hidden; + color: var(--muted-foreground); + font-size: var(--font-size-xs); + text-overflow: ellipsis; + white-space: nowrap; +} + +.rateLoadTrack { + display: block; + height: 6px; + overflow: hidden; + border-radius: 999px; + background: #eef2f6; +} + +.rateLoadTrack i { + display: block; + height: 100%; + border-radius: inherit; + background: #0f766e; +} + .platformModelToolbar { display: grid; grid-template-columns: minmax(220px, 0.6fr) minmax(260px, 1fr); diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts index 9085d1d..2f47344 100644 --- a/packages/contracts/src/index.ts +++ b/packages/contracts/src/index.ts @@ -69,6 +69,7 @@ export interface IntegrationPlatform { credentialsPreview?: Record; retryPolicy?: RateLimitPolicy | Record; rateLimitPolicy?: RateLimitPolicy; + cooldownUntil?: string; createdAt: string; updatedAt: string; } @@ -644,6 +645,7 @@ export interface PlatformModel { rateLimitPolicy?: RateLimitPolicy; runtimePolicySetId?: string; runtimePolicyOverride?: RuntimePolicyOverride; + cooldownUntil?: string; enabled: boolean; createdAt: string; updatedAt: string; @@ -748,6 +750,36 @@ export interface RateLimitWindow { updatedAt: string; } +export interface RateLimitMetricStatus { + currentValue: number; + usedValue: number; + reservedValue: number; + limitValue: number; + limited: boolean; + ratio: number; + resetAt?: string; +} + +export interface ModelRateLimitStatus { + platformModelId: string; + platformId: string; + platformName: string; + provider: string; + modelName: string; + providerModelName?: string; + modelAlias?: string; + displayName: string; + modelType: string[]; + enabled: boolean; + rateLimitPolicy?: RateLimitPolicy | Record; + platformCooldownUntil?: string; + modelCooldownUntil?: string; + concurrent: RateLimitMetricStatus; + rpm: RateLimitMetricStatus; + tpm: RateLimitMetricStatus; + loadRatio: number; +} + export interface GatewayNetworkProxyConfig { globalHttpProxy?: string; globalHttpProxySet: boolean;