easyai-ai-gateway/apps/api/internal/store/runtime_policies.go

291 lines
9.7 KiB
Go

package store
import (
"context"
"database/sql"
"encoding/json"
"strings"
"time"
"github.com/jackc/pgx/v5"
)
const runtimePolicyColumns = `
id::text, policy_key, name, COALESCE(description, ''), rate_limit_policy, retry_policy,
auto_disable_policy, degrade_policy, metadata, status, created_at, updated_at`
type RuntimePolicySetInput struct {
PolicyKey string `json:"policyKey"`
Name string `json:"name"`
Description string `json:"description"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy"`
RetryPolicy map[string]any `json:"retryPolicy"`
AutoDisablePolicy map[string]any `json:"autoDisablePolicy"`
DegradePolicy map[string]any `json:"degradePolicy"`
Metadata map[string]any `json:"metadata"`
Status string `json:"status"`
}
type runtimePolicyScanner interface {
Scan(dest ...any) error
}
type PlatformDynamicPriorityState struct {
PlatformID string `json:"platformId"`
Priority int `json:"priority"`
DynamicPriority *int `json:"dynamicPriority,omitempty"`
EffectivePriority int `json:"effectivePriority"`
UpdatedAt time.Time `json:"updatedAt"`
}
func (s *Store) ListRuntimePolicySets(ctx context.Context) ([]RuntimePolicySet, error) {
rows, err := s.pool.Query(ctx, `SELECT `+runtimePolicyColumns+` FROM model_runtime_policy_sets ORDER BY policy_key ASC`)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]RuntimePolicySet, 0)
for rows.Next() {
item, err := scanRuntimePolicySet(rows)
if err != nil {
return nil, err
}
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) CreateRuntimePolicySet(ctx context.Context, input RuntimePolicySetInput) (RuntimePolicySet, error) {
input = normalizeRuntimePolicyInput(input)
rateLimitPolicy, _ := json.Marshal(emptyObjectIfNil(input.RateLimitPolicy))
retryPolicy, _ := json.Marshal(emptyObjectIfNil(input.RetryPolicy))
autoDisablePolicy, _ := json.Marshal(emptyObjectIfNil(input.AutoDisablePolicy))
degradePolicy, _ := json.Marshal(emptyObjectIfNil(input.DegradePolicy))
metadata, _ := json.Marshal(emptyObjectIfNil(input.Metadata))
return scanRuntimePolicySet(s.pool.QueryRow(ctx, `
INSERT INTO model_runtime_policy_sets (
policy_key, name, description, rate_limit_policy, retry_policy, auto_disable_policy, degrade_policy, metadata, status
)
VALUES ($1, $2, NULLIF($3, ''), $4, $5, $6, $7, $8, $9)
RETURNING `+runtimePolicyColumns,
input.PolicyKey, input.Name, input.Description, rateLimitPolicy, retryPolicy,
autoDisablePolicy, degradePolicy, metadata, input.Status,
))
}
func (s *Store) UpdateRuntimePolicySet(ctx context.Context, id string, input RuntimePolicySetInput) (RuntimePolicySet, error) {
input = normalizeRuntimePolicyInput(input)
rateLimitPolicy, _ := json.Marshal(emptyObjectIfNil(input.RateLimitPolicy))
retryPolicy, _ := json.Marshal(emptyObjectIfNil(input.RetryPolicy))
autoDisablePolicy, _ := json.Marshal(emptyObjectIfNil(input.AutoDisablePolicy))
degradePolicy, _ := json.Marshal(emptyObjectIfNil(input.DegradePolicy))
metadata, _ := json.Marshal(emptyObjectIfNil(input.Metadata))
return scanRuntimePolicySet(s.pool.QueryRow(ctx, `
UPDATE model_runtime_policy_sets
SET policy_key = $2,
name = $3,
description = NULLIF($4, ''),
rate_limit_policy = $5,
retry_policy = $6,
auto_disable_policy = $7,
degrade_policy = $8,
metadata = $9,
status = $10,
updated_at = now()
WHERE id = $1::uuid
RETURNING `+runtimePolicyColumns,
id, input.PolicyKey, input.Name, input.Description, rateLimitPolicy, retryPolicy,
autoDisablePolicy, degradePolicy, metadata, input.Status,
))
}
func (s *Store) DeleteRuntimePolicySet(ctx context.Context, id string) error {
var policyKey string
if err := s.pool.QueryRow(ctx, `SELECT policy_key FROM model_runtime_policy_sets WHERE id = $1::uuid`, id).Scan(&policyKey); err != nil {
return err
}
if policyKey == "default-runtime-v1" {
return ErrProtectedDefault
}
result, err := s.pool.Exec(ctx, `DELETE FROM model_runtime_policy_sets WHERE id = $1::uuid`, id)
if err != nil {
return err
}
if result.RowsAffected() == 0 {
return pgx.ErrNoRows
}
return nil
}
func (s *Store) DisableCandidatePlatform(ctx context.Context, platformID string) error {
if strings.TrimSpace(platformID) == "" {
return nil
}
_, err := s.pool.Exec(ctx, `
UPDATE integration_platforms
SET status = 'disabled',
updated_at = now()
WHERE id = $1::uuid`, platformID)
return err
}
func (s *Store) CooldownCandidatePlatform(ctx context.Context, platformID string, cooldownSeconds int) error {
if strings.TrimSpace(platformID) == "" {
return nil
}
if cooldownSeconds <= 0 {
cooldownSeconds = 300
}
_, err := s.pool.Exec(ctx, `
UPDATE integration_platforms
SET cooldown_until = now() + ($2::int * interval '1 second'),
updated_at = now()
WHERE id = $1::uuid`, platformID, cooldownSeconds)
return err
}
func (s *Store) CooldownCandidatePlatformModel(ctx context.Context, platformModelID string, cooldownSeconds int) error {
if strings.TrimSpace(platformModelID) == "" {
return nil
}
if cooldownSeconds <= 0 {
cooldownSeconds = 300
}
_, err := s.pool.Exec(ctx, `
UPDATE platform_models
SET cooldown_until = now() + ($2::int * interval '1 second'),
updated_at = now()
WHERE id = $1::uuid`, platformModelID, cooldownSeconds)
return err
}
func (s *Store) DemoteCandidatePlatformPriority(ctx context.Context, platformID string, platformModelID string, requestedModel string, modelType string) (int, error) {
if strings.TrimSpace(platformID) == "" || strings.TrimSpace(platformModelID) == "" || strings.TrimSpace(requestedModel) == "" {
return 0, nil
}
var dynamicPriority int
err := s.pool.QueryRow(ctx, `
WITH current_model AS (
SELECT id, platform_id
FROM platform_models
WHERE id = $2::uuid
AND platform_id = $1::uuid
),
peer_priority AS (
SELECT MAX(COALESCE(peer.dynamic_priority, peer.priority)) AS max_priority
FROM current_model current
JOIN platform_models peer_model ON peer_model.platform_id <> current.platform_id
JOIN integration_platforms peer ON peer.id = peer_model.platform_id
LEFT JOIN base_model_catalog peer_base ON peer_base.id = peer_model.base_model_id
WHERE peer.status = 'enabled'
AND peer.deleted_at IS NULL
AND (peer.cooldown_until IS NULL OR peer.cooldown_until <= now())
AND peer_model.enabled = true
AND (peer_model.cooldown_until IS NULL OR peer_model.cooldown_until <= now())
AND (NULLIF($4::text, '') IS NULL OR peer_model.model_type @> jsonb_build_array($4::text))
AND (
(COALESCE(peer_model.model_alias, '') <> '' AND peer_model.model_alias = $3::text)
OR (
COALESCE(peer_model.model_alias, '') = ''
AND (
peer_model.model_name = $3::text
OR peer_base.canonical_model_key = $3::text
OR peer_base.provider_model_name = $3::text
)
)
)
)
UPDATE integration_platforms target
SET dynamic_priority = COALESCE((SELECT max_priority FROM peer_priority), target.priority) + 1,
updated_at = now()
WHERE target.id = $1::uuid
AND target.deleted_at IS NULL
AND EXISTS (SELECT 1 FROM current_model)
RETURNING dynamic_priority`, platformID, platformModelID, requestedModel, modelType).Scan(&dynamicPriority)
return dynamicPriority, err
}
func (s *Store) UpdatePlatformDynamicPriority(ctx context.Context, platformID string, dynamicPriority *int) (PlatformDynamicPriorityState, error) {
if strings.TrimSpace(platformID) == "" {
return PlatformDynamicPriorityState{}, pgx.ErrNoRows
}
value := 0
reset := dynamicPriority == nil
if dynamicPriority != nil {
value = *dynamicPriority
}
return scanPlatformDynamicPriorityState(s.pool.QueryRow(ctx, `
UPDATE integration_platforms
SET dynamic_priority = CASE WHEN $2::boolean THEN priority ELSE $3::int END,
updated_at = now()
WHERE id = $1::uuid
AND deleted_at IS NULL
RETURNING id::text, priority, dynamic_priority, COALESCE(dynamic_priority, priority), updated_at`, platformID, reset, value))
}
func scanPlatformDynamicPriorityState(scanner runtimePolicyScanner) (PlatformDynamicPriorityState, error) {
var item PlatformDynamicPriorityState
var dynamicPriority sql.NullInt64
if err := scanner.Scan(
&item.PlatformID,
&item.Priority,
&dynamicPriority,
&item.EffectivePriority,
&item.UpdatedAt,
); err != nil {
return PlatformDynamicPriorityState{}, err
}
item.DynamicPriority = intPointerFromNull(dynamicPriority)
return item, nil
}
func intPointerFromNull(value sql.NullInt64) *int {
if !value.Valid {
return nil
}
converted := int(value.Int64)
return &converted
}
func scanRuntimePolicySet(scanner runtimePolicyScanner) (RuntimePolicySet, error) {
var item RuntimePolicySet
var rateLimitPolicy []byte
var retryPolicy []byte
var autoDisablePolicy []byte
var degradePolicy []byte
var metadata []byte
if err := scanner.Scan(
&item.ID,
&item.PolicyKey,
&item.Name,
&item.Description,
&rateLimitPolicy,
&retryPolicy,
&autoDisablePolicy,
&degradePolicy,
&metadata,
&item.Status,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return RuntimePolicySet{}, err
}
item.RateLimitPolicy = decodeObject(rateLimitPolicy)
item.RetryPolicy = decodeObject(retryPolicy)
item.AutoDisablePolicy = decodeObject(autoDisablePolicy)
item.DegradePolicy = decodeObject(degradePolicy)
item.Metadata = decodeObject(metadata)
return item, nil
}
func normalizeRuntimePolicyInput(input RuntimePolicySetInput) RuntimePolicySetInput {
input.PolicyKey = strings.TrimSpace(input.PolicyKey)
input.Name = strings.TrimSpace(input.Name)
input.Description = strings.TrimSpace(input.Description)
input.Status = strings.TrimSpace(input.Status)
if input.Status == "" {
input.Status = "active"
}
return input
}