421 lines
14 KiB
Go
421 lines
14 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type RateLimitMetricStatus struct {
|
|
CurrentValue float64 `json:"currentValue"`
|
|
UsedValue float64 `json:"usedValue"`
|
|
ReservedValue float64 `json:"reservedValue"`
|
|
LimitValue float64 `json:"limitValue"`
|
|
Limited bool `json:"limited"`
|
|
Ratio float64 `json:"ratio"`
|
|
ResetAt string `json:"resetAt,omitempty"`
|
|
}
|
|
|
|
type ModelRateLimitStatus struct {
|
|
PlatformModelID string `json:"platformModelId"`
|
|
PlatformID string `json:"platformId"`
|
|
PlatformName string `json:"platformName"`
|
|
Provider string `json:"provider"`
|
|
PlatformPriority int `json:"platformPriority"`
|
|
PlatformDynamicPriority *int `json:"platformDynamicPriority,omitempty"`
|
|
PlatformEffectivePriority int `json:"platformEffectivePriority"`
|
|
ModelName string `json:"modelName"`
|
|
ProviderModelName string `json:"providerModelName,omitempty"`
|
|
ModelAlias string `json:"modelAlias,omitempty"`
|
|
DisplayName string `json:"displayName"`
|
|
ModelType []string `json:"modelType"`
|
|
Enabled bool `json:"enabled"`
|
|
RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"`
|
|
PlatformCooldownUntil string `json:"platformCooldownUntil,omitempty"`
|
|
ModelCooldownUntil string `json:"modelCooldownUntil,omitempty"`
|
|
Concurrent RateLimitMetricStatus `json:"concurrent"`
|
|
QueuedTasks float64 `json:"queuedTasks"`
|
|
RPM RateLimitMetricStatus `json:"rpm"`
|
|
TPM RateLimitMetricStatus `json:"tpm"`
|
|
LoadRatio float64 `json:"loadRatio"`
|
|
RecentPriorityDemotions []PriorityDemotionRecord `json:"recentPriorityDemotions,omitempty"`
|
|
}
|
|
|
|
type PriorityDemotionRecord struct {
|
|
ID string `json:"id"`
|
|
TaskID string `json:"taskId"`
|
|
PlatformID string `json:"platformId"`
|
|
PlatformModelID string `json:"platformModelId,omitempty"`
|
|
Reason string `json:"reason,omitempty"`
|
|
ErrorCode string `json:"errorCode,omitempty"`
|
|
ErrorMessage string `json:"errorMessage,omitempty"`
|
|
Category string `json:"category,omitempty"`
|
|
StatusCode int `json:"statusCode,omitempty"`
|
|
PolicySource string `json:"policySource,omitempty"`
|
|
Policy string `json:"policy,omitempty"`
|
|
PolicyRule string `json:"policyRule,omitempty"`
|
|
MatchedValue string `json:"matchedValue,omitempty"`
|
|
DynamicPriority int `json:"dynamicPriority,omitempty"`
|
|
CreatedAt time.Time `json:"createdAt"`
|
|
}
|
|
|
|
func (s *Store) ListModelRateLimitStatuses(ctx context.Context) ([]ModelRateLimitStatus, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT m.id::text, m.platform_id::text, p.name, p.provider,
|
|
p.priority, p.dynamic_priority, COALESCE(p.dynamic_priority, p.priority),
|
|
m.model_name, COALESCE(NULLIF(m.provider_model_name, ''), m.model_name), COALESCE(m.model_alias, ''),
|
|
m.model_type, m.display_name, m.enabled,
|
|
p.rate_limit_policy, COALESCE(rp.rate_limit_policy, '{}'::jsonb), COALESCE(NULLIF(m.runtime_policy_override, '{}'::jsonb), b.runtime_policy_override, '{}'::jsonb), m.rate_limit_policy,
|
|
COALESCE(to_char(p.cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''),
|
|
COALESCE(to_char(m.cooldown_until AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"'), ''),
|
|
COALESCE(con.active, 0)::float8,
|
|
COALESCE(queued.waiting, 0)::float8,
|
|
COALESCE(rpm.used_value, 0)::float8, COALESCE(rpm.reserved_value, 0)::float8, COALESCE(rpm.reset_at::text, ''),
|
|
COALESCE(tpm.used_value, 0)::float8, COALESCE(tpm.reserved_value, 0)::float8, COALESCE(tpm.reset_at::text, '')
|
|
FROM platform_models m
|
|
JOIN integration_platforms p ON p.id = m.platform_id
|
|
LEFT JOIN base_model_catalog b ON b.id = m.base_model_id
|
|
LEFT JOIN model_runtime_policy_sets rp ON rp.id = COALESCE(m.runtime_policy_set_id, b.runtime_policy_set_id)
|
|
LEFT JOIN (
|
|
SELECT scope_key, SUM(lease_value) AS active
|
|
FROM gateway_concurrency_leases
|
|
WHERE scope_type = 'platform_model'
|
|
AND released_at IS NULL
|
|
AND expires_at > now()
|
|
GROUP BY scope_key
|
|
) con ON con.scope_key = m.id::text
|
|
LEFT JOIN (
|
|
SELECT queued_sources.platform_model_id, COUNT(DISTINCT queued_sources.task_id) AS waiting
|
|
FROM (
|
|
SELECT t.id::text AS task_id, qm.id::text AS platform_model_id
|
|
FROM gateway_tasks t
|
|
JOIN integration_platforms qp ON TRUE
|
|
JOIN platform_models qm ON qm.platform_id = qp.id
|
|
WHERE t.async_mode = true
|
|
AND t.status = 'queued'
|
|
AND NULLIF(t.model_type, '') IS NOT NULL
|
|
AND qm.model_type @> jsonb_build_array(t.model_type)
|
|
AND t.queue_key = qp.platform_key || ':' || t.model_type || ':' || COALESCE(NULLIF(qm.provider_model_name, ''), qm.model_name)
|
|
AND NOT EXISTS (SELECT 1 FROM gateway_task_attempts existing_attempt WHERE existing_attempt.task_id = t.id)
|
|
UNION ALL
|
|
SELECT latest.task_id::text AS task_id, latest.platform_model_id
|
|
FROM (
|
|
SELECT DISTINCT ON (a.task_id) a.task_id, a.platform_model_id::text AS platform_model_id
|
|
FROM gateway_tasks t
|
|
JOIN gateway_task_attempts a ON a.task_id = t.id
|
|
WHERE t.async_mode = true
|
|
AND t.status = 'queued'
|
|
AND a.platform_model_id IS NOT NULL
|
|
ORDER BY a.task_id, a.attempt_no DESC, a.started_at DESC
|
|
) latest
|
|
) queued_sources
|
|
GROUP BY queued_sources.platform_model_id
|
|
) queued ON queued.platform_model_id = m.id::text
|
|
LEFT JOIN (
|
|
SELECT DISTINCT ON (scope_key) scope_key, used_value, reserved_value, reset_at
|
|
FROM gateway_rate_limit_counters
|
|
WHERE scope_type = 'platform_model'
|
|
AND metric = 'rpm'
|
|
AND reset_at > now()
|
|
ORDER BY scope_key, window_start DESC
|
|
) rpm ON rpm.scope_key = m.id::text
|
|
LEFT JOIN (
|
|
SELECT scope_key, SUM(used_value) AS used_value, SUM(reserved_value) AS reserved_value, MAX(reset_at) AS reset_at
|
|
FROM gateway_rate_limit_counters
|
|
WHERE scope_type = 'platform_model'
|
|
AND metric LIKE 'tpm%'
|
|
AND reset_at > now()
|
|
GROUP BY scope_key
|
|
) tpm ON tpm.scope_key = m.id::text
|
|
WHERE p.deleted_at IS NULL
|
|
ORDER BY p.priority ASC, m.model_name ASC`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
items := make([]ModelRateLimitStatus, 0)
|
|
for rows.Next() {
|
|
var item ModelRateLimitStatus
|
|
var modelTypeBytes []byte
|
|
var platformPolicyBytes []byte
|
|
var runtimePolicyBytes []byte
|
|
var runtimeOverrideBytes []byte
|
|
var modelPolicyBytes []byte
|
|
var platformDynamicPriority sql.NullInt64
|
|
var platformCooldownUntil string
|
|
var modelCooldownUntil string
|
|
var concurrentCurrent float64
|
|
var queuedTasks float64
|
|
var rpmUsed float64
|
|
var rpmReserved float64
|
|
var rpmResetAt string
|
|
var tpmUsed float64
|
|
var tpmReserved float64
|
|
var tpmResetAt string
|
|
if err := rows.Scan(
|
|
&item.PlatformModelID,
|
|
&item.PlatformID,
|
|
&item.PlatformName,
|
|
&item.Provider,
|
|
&item.PlatformPriority,
|
|
&platformDynamicPriority,
|
|
&item.PlatformEffectivePriority,
|
|
&item.ModelName,
|
|
&item.ProviderModelName,
|
|
&item.ModelAlias,
|
|
&modelTypeBytes,
|
|
&item.DisplayName,
|
|
&item.Enabled,
|
|
&platformPolicyBytes,
|
|
&runtimePolicyBytes,
|
|
&runtimeOverrideBytes,
|
|
&modelPolicyBytes,
|
|
&platformCooldownUntil,
|
|
&modelCooldownUntil,
|
|
&concurrentCurrent,
|
|
&queuedTasks,
|
|
&rpmUsed,
|
|
&rpmReserved,
|
|
&rpmResetAt,
|
|
&tpmUsed,
|
|
&tpmReserved,
|
|
&tpmResetAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
item.PlatformDynamicPriority = intPointerFromNull(platformDynamicPriority)
|
|
item.ModelType = decodeStringArray(modelTypeBytes)
|
|
policy := effectiveModelRateLimitPolicy(
|
|
decodeObject(platformPolicyBytes),
|
|
decodeObject(runtimePolicyBytes),
|
|
decodeObject(runtimeOverrideBytes),
|
|
decodeObject(modelPolicyBytes),
|
|
)
|
|
item.PlatformCooldownUntil = platformCooldownUntil
|
|
item.ModelCooldownUntil = modelCooldownUntil
|
|
item.RateLimitPolicy = policy
|
|
item.QueuedTasks = queuedTasks
|
|
item.Concurrent = metricStatus(concurrentCurrent, concurrentCurrent, 0, rateLimitForMetric(policy, "concurrent"), "")
|
|
item.RPM = metricStatus(rpmUsed+rpmReserved, rpmUsed, rpmReserved, rateLimitForMetric(policy, "rpm"), rpmResetAt)
|
|
item.TPM = metricStatus(tpmUsed+tpmReserved, tpmUsed, tpmReserved, tpmLimit(policy), tpmResetAt)
|
|
item.LoadRatio = maxFloat(item.Concurrent.Ratio, item.RPM.Ratio, item.TPM.Ratio)
|
|
items = append(items, item)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
demotions, err := s.listRecentPriorityDemotionsByPlatform(ctx, items, 10)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for index := range items {
|
|
items[index].RecentPriorityDemotions = demotions[items[index].PlatformID]
|
|
}
|
|
sort.SliceStable(items, func(i, j int) bool {
|
|
if items[i].LoadRatio == items[j].LoadRatio {
|
|
return strings.ToLower(items[i].DisplayName) < strings.ToLower(items[j].DisplayName)
|
|
}
|
|
return items[i].LoadRatio > items[j].LoadRatio
|
|
})
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) listRecentPriorityDemotionsByPlatform(ctx context.Context, statuses []ModelRateLimitStatus, limit int) (map[string][]PriorityDemotionRecord, error) {
|
|
out := map[string][]PriorityDemotionRecord{}
|
|
if limit <= 0 || len(statuses) == 0 {
|
|
return out, nil
|
|
}
|
|
seen := map[string]bool{}
|
|
platformIDs := make([]string, 0, len(statuses))
|
|
for _, status := range statuses {
|
|
platformID := strings.TrimSpace(status.PlatformID)
|
|
if platformID == "" || seen[platformID] {
|
|
continue
|
|
}
|
|
seen[platformID] = true
|
|
platformIDs = append(platformIDs, platformID)
|
|
}
|
|
if len(platformIDs) == 0 {
|
|
return out, nil
|
|
}
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id::text, task_id::text, COALESCE(message, ''), payload, created_at
|
|
FROM (
|
|
SELECT e.*,
|
|
row_number() OVER (
|
|
PARTITION BY e.payload->>'platformId'
|
|
ORDER BY e.created_at DESC, e.seq DESC
|
|
) AS demotion_rank
|
|
FROM gateway_task_events e
|
|
WHERE e.event_type = 'task.policy.priority_demoted'
|
|
AND e.payload->>'platformId' = ANY($1::text[])
|
|
) ranked
|
|
WHERE demotion_rank <= $2
|
|
ORDER BY payload->>'platformId' ASC, created_at DESC`, platformIDs, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var id string
|
|
var taskID string
|
|
var message string
|
|
var payloadBytes []byte
|
|
var createdAt time.Time
|
|
if err := rows.Scan(&id, &taskID, &message, &payloadBytes, &createdAt); err != nil {
|
|
return nil, err
|
|
}
|
|
record := priorityDemotionRecordFromEventPayload(id, taskID, message, decodeObject(payloadBytes), createdAt)
|
|
if record.PlatformID == "" {
|
|
continue
|
|
}
|
|
out[record.PlatformID] = append(out[record.PlatformID], record)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func priorityDemotionRecordFromEventPayload(id string, taskID string, message string, payload map[string]any, createdAt time.Time) PriorityDemotionRecord {
|
|
errorMessage := stringValue(payload["errorMessage"])
|
|
if errorMessage == "" {
|
|
errorMessage = stringValue(payload["message"])
|
|
}
|
|
if errorMessage == "" {
|
|
errorMessage = strings.TrimSpace(message)
|
|
}
|
|
errorCode := stringValue(payload["errorCode"])
|
|
if errorCode == "" {
|
|
errorCode = stringValue(payload["code"])
|
|
}
|
|
return PriorityDemotionRecord{
|
|
ID: id,
|
|
TaskID: taskID,
|
|
PlatformID: stringValue(payload["platformId"]),
|
|
PlatformModelID: stringValue(payload["platformModelId"]),
|
|
Reason: stringValue(payload["reason"]),
|
|
ErrorCode: errorCode,
|
|
ErrorMessage: errorMessage,
|
|
Category: stringValue(payload["category"]),
|
|
StatusCode: intValue(payload["statusCode"]),
|
|
PolicySource: stringValue(payload["policySource"]),
|
|
Policy: stringValue(payload["policy"]),
|
|
PolicyRule: stringValue(payload["policyRule"]),
|
|
MatchedValue: stringValue(payload["matchedValue"]),
|
|
DynamicPriority: intValue(payload["dynamicPriority"]),
|
|
CreatedAt: createdAt,
|
|
}
|
|
}
|
|
|
|
func effectiveModelRateLimitPolicy(platformPolicy map[string]any, runtimePolicy map[string]any, runtimeOverride map[string]any, modelPolicy map[string]any) map[string]any {
|
|
policy := platformPolicy
|
|
if hasRateLimitRules(runtimePolicy) {
|
|
policy = shallowMergeMap(policy, runtimePolicy)
|
|
}
|
|
if nested, ok := runtimeOverride["rateLimitPolicy"].(map[string]any); ok && len(nested) > 0 {
|
|
policy = shallowMergeMap(policy, nested)
|
|
}
|
|
if hasRateLimitRules(modelPolicy) {
|
|
policy = shallowMergeMap(policy, modelPolicy)
|
|
}
|
|
if hasRateLimitRules(policy) {
|
|
return policy
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func hasRateLimitRules(policy map[string]any) bool {
|
|
rules, _ := policy["rules"].([]any)
|
|
return len(rules) > 0
|
|
}
|
|
|
|
func shallowMergeMap(base map[string]any, override map[string]any) map[string]any {
|
|
out := map[string]any{}
|
|
for key, value := range base {
|
|
out[key] = value
|
|
}
|
|
for key, value := range override {
|
|
out[key] = value
|
|
}
|
|
return out
|
|
}
|
|
|
|
func rateLimitForMetric(policy map[string]any, metric string) float64 {
|
|
rules, _ := policy["rules"].([]any)
|
|
for _, rawRule := range rules {
|
|
rule, _ := rawRule.(map[string]any)
|
|
if strings.TrimSpace(stringValue(rule["metric"])) == metric {
|
|
return floatValue(rule["limit"])
|
|
}
|
|
}
|
|
return 0
|
|
}
|
|
|
|
func tpmLimit(policy map[string]any) float64 {
|
|
if limit := rateLimitForMetric(policy, "tpm_total"); limit > 0 {
|
|
return limit
|
|
}
|
|
return rateLimitForMetric(policy, "tpm_input") + rateLimitForMetric(policy, "tpm_output")
|
|
}
|
|
|
|
func metricStatus(current float64, used float64, reserved float64, limit float64, resetAt string) RateLimitMetricStatus {
|
|
status := RateLimitMetricStatus{
|
|
CurrentValue: current,
|
|
UsedValue: used,
|
|
ReservedValue: reserved,
|
|
LimitValue: limit,
|
|
Limited: limit > 0,
|
|
ResetAt: resetAt,
|
|
}
|
|
if status.Limited {
|
|
status.Ratio = current / limit
|
|
}
|
|
return status
|
|
}
|
|
|
|
func maxFloat(values ...float64) float64 {
|
|
out := 0.0
|
|
for _, value := range values {
|
|
if value > out {
|
|
out = value
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func stringValue(value any) string {
|
|
text, _ := value.(string)
|
|
return strings.TrimSpace(text)
|
|
}
|
|
|
|
func floatValue(value any) float64 {
|
|
switch typed := value.(type) {
|
|
case int:
|
|
return float64(typed)
|
|
case int64:
|
|
return float64(typed)
|
|
case float64:
|
|
return typed
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
func intValue(value any) int {
|
|
switch typed := value.(type) {
|
|
case int:
|
|
return typed
|
|
case int64:
|
|
return int(typed)
|
|
case float64:
|
|
return int(typed)
|
|
case string:
|
|
parsed, _ := strconv.Atoi(strings.TrimSpace(typed))
|
|
return parsed
|
|
default:
|
|
return 0
|
|
}
|
|
}
|