fix(rate-limits): treat empty policies as unlimited

This commit is contained in:
wangbo 2026-05-24 22:28:25 +08:00
parent 6d99e26e2a
commit 71950d2b4f
9 changed files with 168 additions and 45 deletions

View File

@ -476,16 +476,17 @@ func discountTitle(label string, discount float64) string {
}
func effectiveModelRateLimits(model store.PlatformModel, platform store.Platform, runtimePolicyMap map[string]store.RuntimePolicySet) ModelCatalogRateLimits {
overridePolicy := objectValue(model.RuntimePolicyOverride["rateLimitPolicy"])
overridePolicyRaw, hasOverridePolicy := model.RuntimePolicyOverride["rateLimitPolicy"]
overridePolicy := objectValue(overridePolicyRaw)
runtimePolicy := map[string]any(nil)
if model.RuntimePolicySetID != "" {
runtimePolicy = runtimePolicyMap[model.RuntimePolicySetID].RateLimitPolicy
}
policies := []map[string]any{
overridePolicy,
model.RateLimitPolicy,
runtimePolicy,
platform.RateLimitPolicy,
policies := []rateLimitPolicySource{
{policy: overridePolicy, authoritative: hasOverridePolicy},
{policy: model.RateLimitPolicy, authoritative: len(model.RateLimitPolicy) > 0},
{policy: runtimePolicy, authoritative: strings.TrimSpace(model.RuntimePolicySetID) != ""},
{policy: platform.RateLimitPolicy},
}
limits := ModelCatalogRateLimits{
RPM: firstRateLimit(policies, "rpm"),
@ -496,18 +497,26 @@ func effectiveModelRateLimits(model store.PlatformModel, platform store.Platform
return limits
}
func firstRateLimit(policies []map[string]any, metric string) *float64 {
for _, policy := range policies {
if value := readRateLimit(policy, metric); value != nil {
return value
type rateLimitPolicySource struct {
policy map[string]any
authoritative bool
}
func firstRateLimit(policies []rateLimitPolicySource, metric string) *float64 {
for _, source := range policies {
if value, ok := readRateLimit(source.policy, metric); ok {
return floatPointer(value)
}
if source.authoritative {
return floatPointer(0)
}
}
return nil
}
func readRateLimit(policy map[string]any, metric string) *float64 {
func readRateLimit(policy map[string]any, metric string) (float64, bool) {
if len(policy) == 0 {
return nil
return 0, false
}
if rules, ok := policy["rules"].([]any); ok {
for _, item := range rules {
@ -516,22 +525,22 @@ func readRateLimit(policy map[string]any, metric string) *float64 {
continue
}
if limit, ok := numberValue(rule["limit"]); ok {
return &limit
return limit, true
}
}
}
for _, key := range rateLimitKeys(metric) {
if value, ok := numberValue(policy[key]); ok {
return &value
return value, true
}
}
platformLimits := objectValue(policy["platformLimits"])
for _, key := range rateLimitKeys(metric) {
if value, ok := numberValue(platformLimits[key]); ok {
return &value
return value, true
}
}
return nil
return 0, false
}
func rateLimitKeys(metric string) []string {
@ -1367,12 +1376,16 @@ func numberValue(value any) (float64, bool) {
}
func formatOptionalNumber(value *float64) string {
if value == nil {
return "-"
if value == nil || *value <= 0 {
return "不限"
}
return formatLimitNumber(*value)
}
func floatPointer(value float64) *float64 {
return &value
}
func formatLimitNumber(value float64) string {
switch {
case absFloat(value) >= 10000:

View File

@ -83,11 +83,18 @@ func (s *Service) rateLimitReservations(ctx context.Context, user *auth.User, ca
func effectiveRateLimitPolicy(candidate store.RuntimeModelCandidate) map[string]any {
policy := candidate.PlatformRateLimitPolicy
if hasRules(candidate.RuntimeRateLimitPolicy) {
if strings.TrimSpace(candidate.RuntimePolicySetID) != "" {
policy = candidate.RuntimeRateLimitPolicy
} else if hasRules(candidate.RuntimeRateLimitPolicy) {
policy = mergeMap(policy, candidate.RuntimeRateLimitPolicy)
}
if nested, ok := candidate.RuntimePolicyOverride["rateLimitPolicy"].(map[string]any); ok && len(nested) > 0 {
policy = mergeMap(policy, nested)
if _, hasOverride := candidate.RuntimePolicyOverride["rateLimitPolicy"]; hasOverride {
nested, _ := candidate.RuntimePolicyOverride["rateLimitPolicy"].(map[string]any)
if len(nested) == 0 {
policy = nil
} else {
policy = mergeMap(policy, nested)
}
}
if hasRules(candidate.ModelRateLimitPolicy) {
policy = mergeMap(policy, candidate.ModelRateLimitPolicy)
@ -123,6 +130,9 @@ func reservationsFromPolicy(scopeType string, scopeKey string, scopeName string,
rule, _ := rawRule.(map[string]any)
metric := strings.TrimSpace(stringFromMap(rule, "metric"))
limit := floatFromAny(rule["limit"])
if metric == "" || limit <= 0 {
continue
}
amount := 1.0
if strings.HasPrefix(metric, "tpm") {
amount = float64(estimatedTokens)

View File

@ -4,6 +4,7 @@ import (
"testing"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/clients"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/store"
)
func TestTokenUsageAmountsUsesActualUsageForTPM(t *testing.T) {
@ -27,3 +28,39 @@ func TestTokenUsageAmountsFallsBackToInputOutputTotal(t *testing.T) {
t.Fatalf("expected total token fallback 8, got %v", got["tpm_total"])
}
}
func TestEffectiveRateLimitPolicyTreatsEmptyRuntimePolicyAsUnlimited(t *testing.T) {
policy := effectiveRateLimitPolicy(store.RuntimeModelCandidate{
PlatformRateLimitPolicy: map[string]any{"rules": []any{
map[string]any{"metric": "rpm", "limit": 500},
}},
RuntimePolicySetID: "runtime-policy-1",
RuntimeRateLimitPolicy: map[string]any{"rules": []any{}},
})
if hasRules(policy) {
t.Fatalf("expected empty runtime policy to clear inherited limits, got %+v", policy)
}
}
func TestReservationsFromPolicySkipsNonPositiveLimits(t *testing.T) {
reservations := reservationsFromPolicy(
"platform_model",
"model-1",
"Model 1",
nil,
map[string]any{"rules": []any{
map[string]any{"metric": "rpm", "limit": -1},
map[string]any{"metric": "tpm_total", "limit": 0},
map[string]any{"metric": "concurrent", "limit": 2},
}},
map[string]any{"prompt": "hello"},
)
if len(reservations) != 1 {
t.Fatalf("expected only the positive concurrent rule to reserve, got %+v", reservations)
}
if reservations[0].Metric != "concurrent" || reservations[0].Limit != 2 {
t.Fatalf("expected concurrent reservation with limit 2, got %+v", reservations[0])
}
}

View File

@ -230,7 +230,7 @@ ORDER BY effective_priority ASC,
item.WaitingCount = maxFloat(queuedWaiting, stateWaitingCount)
item.LastAssignedUnix = lastAssignedUnix
applyRuntimeCandidateLoad(&item, runtimeCandidateLoadInput{
Policy: effectiveModelRateLimitPolicy(item.PlatformRateLimitPolicy, item.RuntimeRateLimitPolicy, item.RuntimePolicyOverride, item.ModelRateLimitPolicy),
Policy: effectiveModelRateLimitPolicy(item.PlatformRateLimitPolicy, item.RuntimeRateLimitPolicy, item.RuntimePolicySetID, item.RuntimePolicyOverride, item.ModelRateLimitPolicy),
ConcurrentActive: concurrentActive,
QueuedWaiting: queuedWaiting,
RPMUsed: rpmUsed,

View File

@ -88,7 +88,8 @@ func (s *Store) ListModelRateLimitStatuses(ctx context.Context) ([]ModelRateLimi
p.priority, p.dynamic_priority, COALESCE(p.dynamic_priority, p.priority),
m.model_name, COALESCE(NULLIF(m.provider_model_name, ''), m.model_name), COALESCE(m.model_alias, ''),
m.model_type, m.display_name, m.enabled,
p.rate_limit_policy, COALESCE(rp.rate_limit_policy, '{}'::jsonb), COALESCE(NULLIF(m.runtime_policy_override, '{}'::jsonb), b.runtime_policy_override, '{}'::jsonb), m.rate_limit_policy,
p.rate_limit_policy, COALESCE(rp.rate_limit_policy, '{}'::jsonb), COALESCE(m.runtime_policy_set_id::text, b.runtime_policy_set_id::text, ''),
COALESCE(NULLIF(m.runtime_policy_override, '{}'::jsonb), b.runtime_policy_override, '{}'::jsonb), m.rate_limit_policy,
COALESCE(to_char(p.cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''),
COALESCE(to_char(m.cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''),
COALESCE(con.active, 0)::float8,
@ -163,6 +164,7 @@ ORDER BY p.priority ASC, m.model_name ASC`)
var modelTypeBytes []byte
var platformPolicyBytes []byte
var runtimePolicyBytes []byte
var runtimePolicySetID string
var runtimeOverrideBytes []byte
var modelPolicyBytes []byte
var platformDynamicPriority sql.NullInt64
@ -193,6 +195,7 @@ ORDER BY p.priority ASC, m.model_name ASC`)
&item.Enabled,
&platformPolicyBytes,
&runtimePolicyBytes,
&runtimePolicySetID,
&runtimeOverrideBytes,
&modelPolicyBytes,
&platformCooldownUntil,
@ -213,6 +216,7 @@ ORDER BY p.priority ASC, m.model_name ASC`)
policy := effectiveModelRateLimitPolicy(
decodeObject(platformPolicyBytes),
decodeObject(runtimePolicyBytes),
runtimePolicySetID,
decodeObject(runtimeOverrideBytes),
decodeObject(modelPolicyBytes),
)
@ -432,13 +436,20 @@ func platformPolicyEventFromPayload(id string, taskID string, eventType string,
}
}
func effectiveModelRateLimitPolicy(platformPolicy map[string]any, runtimePolicy map[string]any, runtimeOverride map[string]any, modelPolicy map[string]any) map[string]any {
func effectiveModelRateLimitPolicy(platformPolicy map[string]any, runtimePolicy map[string]any, runtimePolicySetID string, runtimeOverride map[string]any, modelPolicy map[string]any) map[string]any {
policy := platformPolicy
if hasRateLimitRules(runtimePolicy) {
if strings.TrimSpace(runtimePolicySetID) != "" {
policy = runtimePolicy
} else if hasRateLimitRules(runtimePolicy) {
policy = shallowMergeMap(policy, runtimePolicy)
}
if nested, ok := runtimeOverride["rateLimitPolicy"].(map[string]any); ok && len(nested) > 0 {
policy = shallowMergeMap(policy, nested)
if _, hasOverride := runtimeOverride["rateLimitPolicy"]; hasOverride {
nested, _ := runtimeOverride["rateLimitPolicy"].(map[string]any)
if len(nested) == 0 {
policy = nil
} else {
policy = shallowMergeMap(policy, nested)
}
}
if hasRateLimitRules(modelPolicy) {
policy = shallowMergeMap(policy, modelPolicy)

View File

@ -16,6 +16,7 @@ func TestEffectiveModelRateLimitPolicyTreatsModelRulesAsAuthoritative(t *testing
map[string]any{"metric": "tpm_total", "limit": 240000},
map[string]any{"metric": "concurrent", "limit": 6},
}},
"runtime-policy-1",
map[string]any{},
map[string]any{"rules": []any{
map[string]any{"metric": "rpm", "limit": 30},
@ -34,6 +35,48 @@ func TestEffectiveModelRateLimitPolicyTreatsModelRulesAsAuthoritative(t *testing
}
}
func TestEffectiveModelRateLimitPolicyTreatsEmptyRuntimePolicyAsUnlimited(t *testing.T) {
policy := effectiveModelRateLimitPolicy(
map[string]any{"rules": []any{
map[string]any{"metric": "rpm", "limit": 500},
map[string]any{"metric": "tpm_total", "limit": 100000},
}},
map[string]any{"rules": []any{}},
"runtime-policy-1",
map[string]any{},
map[string]any{},
)
if got := rateLimitForMetric(policy, "rpm"); got != 0 {
t.Fatalf("expected empty runtime policy rpm to mean unlimited, got %v", got)
}
if got := rateLimitForMetric(policy, "tpm_total"); got != 0 {
t.Fatalf("expected empty runtime policy tpm to mean unlimited, got %v", got)
}
}
func TestEffectiveModelRateLimitPolicyTreatsNegativeLimitAsUnlimited(t *testing.T) {
policy := effectiveModelRateLimitPolicy(
map[string]any{"rules": []any{
map[string]any{"metric": "rpm", "limit": 500},
}},
map[string]any{"rules": []any{
map[string]any{"metric": "rpm", "limit": -1},
}},
"runtime-policy-1",
map[string]any{},
map[string]any{},
)
if got := rateLimitForMetric(policy, "rpm"); got != -1 {
t.Fatalf("expected negative runtime rpm marker to be preserved, got %v", got)
}
status := metricStatus(10, 10, 0, rateLimitForMetric(policy, "rpm"), "")
if status.Limited {
t.Fatalf("expected negative runtime rpm marker to be reported as unlimited, got %+v", status)
}
}
func TestPriorityDemotionRecordFromEventPayloadKeepsReason(t *testing.T) {
createdAt := time.Date(2026, 5, 12, 9, 30, 0, 0, time.UTC)
record := priorityDemotionRecordFromEventPayload("event-1", "task-1", "fallback message", map[string]any{

View File

@ -343,10 +343,10 @@ export function PlatformManagementPanel(props: {
</FormSection>
<FormSection icon={<ShieldCheck size={16} />} title="限流策略">
<Label>RPM / <Input value={form.rpmLimit} placeholder="不填则不限制" inputMode="numeric" onChange={(event) => setForm({ ...form, rpmLimit: event.target.value })} /></Label>
<Label>RPS / <Input value={form.rpsLimit} placeholder="不填则不限制" inputMode="numeric" onChange={(event) => setForm({ ...form, rpsLimit: event.target.value })} /></Label>
<Label>TPM / Token<Input value={form.tpmLimit} placeholder="不填则不限制" inputMode="numeric" onChange={(event) => setForm({ ...form, tpmLimit: event.target.value })} /></Label>
<Label><Input value={form.concurrencyLimit} placeholder="不填则不限制" inputMode="numeric" onChange={(event) => setForm({ ...form, concurrencyLimit: event.target.value })} /></Label>
<Label>RPM / <Input value={form.rpmLimit} placeholder="不填或负数为不限" inputMode="numeric" onChange={(event) => setForm({ ...form, rpmLimit: event.target.value })} /></Label>
<Label>RPS / <Input value={form.rpsLimit} placeholder="不填或负数为不限" inputMode="numeric" onChange={(event) => setForm({ ...form, rpsLimit: event.target.value })} /></Label>
<Label>TPM / Token<Input value={form.tpmLimit} placeholder="不填或负数为不限" inputMode="numeric" onChange={(event) => setForm({ ...form, tpmLimit: event.target.value })} /></Label>
<Label><Input value={form.concurrencyLimit} placeholder="不填或负数为不限" inputMode="numeric" onChange={(event) => setForm({ ...form, concurrencyLimit: event.target.value })} /></Label>
<div className="platformTogglePair">
<ToggleField checked={form.supportBase64Input} label="支持 Base64 输入" onChange={(checked) => setForm({ ...form, supportBase64Input: checked })} />
<ToggleField checked={form.supportUrlInput} label="支持 URL 输入" onChange={(checked) => setForm({ ...form, supportUrlInput: checked })} />
@ -1172,13 +1172,13 @@ function formatDiscountFactor(value: number | undefined) {
function platformRateLimitSummary(policy: IntegrationPlatform['rateLimitPolicy']) {
const rules = Array.isArray(policy?.rules) ? policy.rules : [];
if (!rules.length) {
return { title: '未设置', subtitle: '跟随全局或模型策略' };
return { title: '不限', subtitle: '未配置限流上限' };
}
const labels = rules
.filter((rule) => typeof rule.limit === 'number' && Number.isFinite(rule.limit))
.filter((rule) => typeof rule.limit === 'number' && Number.isFinite(rule.limit) && rule.limit > 0)
.map((rule) => `${rateLimitMetricText(rule.metric)} ${formatLimit(rule.limit)}`);
if (!labels.length) {
return { title: '未设置', subtitle: '跟随全局或模型策略' };
return { title: '不限', subtitle: '未配置限流上限' };
}
return {
title: labels.slice(0, 2).join(' · '),

View File

@ -270,9 +270,9 @@ export function RuntimePoliciesPanel(props: {
<section className="runtimePolicySection spanTwo">
<header><strong></strong><span>TPM / RPM / </span></header>
<div className="runtimePolicyRows">
<Label>RPM / <Input value={form.rpm} inputMode="numeric" onChange={(event) => setForm({ ...form, rpm: event.target.value })} /></Label>
<Label>TPM / Token<Input value={form.tpm} inputMode="numeric" onChange={(event) => setForm({ ...form, tpm: event.target.value })} /></Label>
<Label><Input value={form.concurrency} inputMode="numeric" onChange={(event) => setForm({ ...form, concurrency: event.target.value })} /></Label>
<Label>RPM / <Input value={form.rpm} placeholder="不填或负数为不限" inputMode="numeric" onChange={(event) => setForm({ ...form, rpm: event.target.value })} /></Label>
<Label>TPM / Token<Input value={form.tpm} placeholder="不填或负数为不限" inputMode="numeric" onChange={(event) => setForm({ ...form, tpm: event.target.value })} /></Label>
<Label><Input value={form.concurrency} placeholder="不填或负数为不限" inputMode="numeric" onChange={(event) => setForm({ ...form, concurrency: event.target.value })} /></Label>
</div>
</section>
@ -760,9 +760,9 @@ function policyToForm(policy: RuntimePolicySet): RuntimePolicyForm {
policyKey: policy.policyKey,
name: policy.name,
description: policy.description ?? '',
rpm: String(readRateLimit(rateRules, 'rpm') || ''),
tpm: String(readRateLimit(rateRules, 'tpm_total') || ''),
concurrency: String(readRateLimit(rateRules, 'concurrent') || ''),
rpm: formRateLimitText(readRateLimit(rateRules, 'rpm')),
tpm: formRateLimitText(readRateLimit(rateRules, 'tpm_total')),
concurrency: formRateLimitText(readRateLimit(rateRules, 'concurrent')),
retryEnabled: readBool(retry.enabled, true),
retryMaxAttempts: String(readNumber(retry.maxAttempts, 2)),
retryAllowKeywords: tagsFromValue(retry.allowKeywords),
@ -825,9 +825,9 @@ function isDefaultPolicy(policy: RuntimePolicySet) {
function rateLimitSummary(policy: RuntimePolicySet) {
const rules = Array.isArray(policy.rateLimitPolicy?.rules) ? policy.rateLimitPolicy.rules : [];
const rpm = readRateLimit(rules, 'rpm') || '-';
const tpm = readRateLimit(rules, 'tpm_total') || '-';
const concurrent = readRateLimit(rules, 'concurrent') || '-';
const rpm = rateLimitText(readRateLimit(rules, 'rpm'));
const tpm = rateLimitText(readRateLimit(rules, 'tpm_total'));
const concurrent = rateLimitText(readRateLimit(rules, 'concurrent'));
return `RPM ${rpm} / TPM ${tpm} / 并发 ${concurrent}`;
}
@ -848,7 +848,16 @@ function degradeSummary(policy: RuntimePolicySet) {
function readRateLimit(rules: unknown[], metric: string) {
const rule = rules.find((item) => readObject(item).metric === metric);
return readNumber(readObject(rule).limit, 0);
const limit = Number(readObject(rule).limit);
return Number.isFinite(limit) ? limit : undefined;
}
function formRateLimitText(value: number | undefined) {
return value === undefined ? '' : String(value);
}
function rateLimitText(value: number | undefined) {
return value !== undefined && value > 0 ? String(value) : '不限';
}
function stringifyKeywords(value: unknown) {

View File

@ -397,7 +397,7 @@ function rateLimitPolicyPayload(form: Pick<PlatformWizardForm, 'rpmLimit' | 'rps
limitRule('tpm_total', form.tpmLimit),
limitRule('concurrent', form.concurrencyLimit),
].filter((rule): rule is NonNullable<ReturnType<typeof limitRule>> => Boolean(rule));
return rules.length ? { rules } : {};
return { rules };
}
function networkProxyPayload(form: PlatformWizardForm) {