package store import ( "context" "database/sql" "encoding/json" "strings" "time" "github.com/jackc/pgx/v5" ) const runtimePolicyColumns = ` id::text, policy_key, name, COALESCE(description, ''), rate_limit_policy, retry_policy, auto_disable_policy, degrade_policy, metadata, status, created_at, updated_at` type RuntimePolicySetInput struct { PolicyKey string `json:"policyKey"` Name string `json:"name"` Description string `json:"description"` RateLimitPolicy map[string]any `json:"rateLimitPolicy"` RetryPolicy map[string]any `json:"retryPolicy"` AutoDisablePolicy map[string]any `json:"autoDisablePolicy"` DegradePolicy map[string]any `json:"degradePolicy"` Metadata map[string]any `json:"metadata"` Status string `json:"status"` } type runtimePolicyScanner interface { Scan(dest ...any) error } type PlatformDynamicPriorityState struct { PlatformID string `json:"platformId"` Priority int `json:"priority"` DynamicPriority *int `json:"dynamicPriority,omitempty"` EffectivePriority int `json:"effectivePriority"` UpdatedAt time.Time `json:"updatedAt"` } func (s *Store) ListRuntimePolicySets(ctx context.Context) ([]RuntimePolicySet, error) { rows, err := s.pool.Query(ctx, `SELECT `+runtimePolicyColumns+` FROM model_runtime_policy_sets ORDER BY policy_key ASC`) if err != nil { return nil, err } defer rows.Close() items := make([]RuntimePolicySet, 0) for rows.Next() { item, err := scanRuntimePolicySet(rows) if err != nil { return nil, err } items = append(items, item) } return items, rows.Err() } func (s *Store) CreateRuntimePolicySet(ctx context.Context, input RuntimePolicySetInput) (RuntimePolicySet, error) { input = normalizeRuntimePolicyInput(input) rateLimitPolicy, _ := json.Marshal(emptyObjectIfNil(input.RateLimitPolicy)) retryPolicy, _ := json.Marshal(emptyObjectIfNil(input.RetryPolicy)) autoDisablePolicy, _ := json.Marshal(emptyObjectIfNil(input.AutoDisablePolicy)) degradePolicy, _ := json.Marshal(emptyObjectIfNil(input.DegradePolicy)) metadata, _ := json.Marshal(emptyObjectIfNil(input.Metadata)) return scanRuntimePolicySet(s.pool.QueryRow(ctx, ` INSERT INTO model_runtime_policy_sets ( policy_key, name, description, rate_limit_policy, retry_policy, auto_disable_policy, degrade_policy, metadata, status ) VALUES ($1, $2, NULLIF($3, ''), $4, $5, $6, $7, $8, $9) RETURNING `+runtimePolicyColumns, input.PolicyKey, input.Name, input.Description, rateLimitPolicy, retryPolicy, autoDisablePolicy, degradePolicy, metadata, input.Status, )) } func (s *Store) UpdateRuntimePolicySet(ctx context.Context, id string, input RuntimePolicySetInput) (RuntimePolicySet, error) { input = normalizeRuntimePolicyInput(input) rateLimitPolicy, _ := json.Marshal(emptyObjectIfNil(input.RateLimitPolicy)) retryPolicy, _ := json.Marshal(emptyObjectIfNil(input.RetryPolicy)) autoDisablePolicy, _ := json.Marshal(emptyObjectIfNil(input.AutoDisablePolicy)) degradePolicy, _ := json.Marshal(emptyObjectIfNil(input.DegradePolicy)) metadata, _ := json.Marshal(emptyObjectIfNil(input.Metadata)) return scanRuntimePolicySet(s.pool.QueryRow(ctx, ` UPDATE model_runtime_policy_sets SET policy_key = $2, name = $3, description = NULLIF($4, ''), rate_limit_policy = $5, retry_policy = $6, auto_disable_policy = $7, degrade_policy = $8, metadata = $9, status = $10, updated_at = now() WHERE id = $1::uuid RETURNING `+runtimePolicyColumns, id, input.PolicyKey, input.Name, input.Description, rateLimitPolicy, retryPolicy, autoDisablePolicy, degradePolicy, metadata, input.Status, )) } func (s *Store) DeleteRuntimePolicySet(ctx context.Context, id string) error { var policyKey string if err := s.pool.QueryRow(ctx, `SELECT policy_key FROM model_runtime_policy_sets WHERE id = $1::uuid`, id).Scan(&policyKey); err != nil { return err } if policyKey == "default-runtime-v1" { return ErrProtectedDefault } result, err := s.pool.Exec(ctx, `DELETE FROM model_runtime_policy_sets WHERE id = $1::uuid`, id) if err != nil { return err } if result.RowsAffected() == 0 { return pgx.ErrNoRows } return nil } func (s *Store) DisableCandidatePlatform(ctx context.Context, platformID string) error { if strings.TrimSpace(platformID) == "" { return nil } _, err := s.pool.Exec(ctx, ` UPDATE integration_platforms SET status = 'disabled', updated_at = now() WHERE id = $1::uuid`, platformID) return err } func (s *Store) CooldownCandidatePlatform(ctx context.Context, platformID string, cooldownSeconds int) error { if strings.TrimSpace(platformID) == "" { return nil } if cooldownSeconds <= 0 { cooldownSeconds = 300 } _, err := s.pool.Exec(ctx, ` UPDATE integration_platforms SET cooldown_until = now() + ($2::int * interval '1 second'), updated_at = now() WHERE id = $1::uuid`, platformID, cooldownSeconds) return err } func (s *Store) CooldownCandidatePlatformModel(ctx context.Context, platformModelID string, cooldownSeconds int) error { if strings.TrimSpace(platformModelID) == "" { return nil } if cooldownSeconds <= 0 { cooldownSeconds = 300 } _, err := s.pool.Exec(ctx, ` UPDATE platform_models SET cooldown_until = now() + ($2::int * interval '1 second'), updated_at = now() WHERE id = $1::uuid`, platformModelID, cooldownSeconds) return err } func (s *Store) DemoteCandidatePlatformPriority(ctx context.Context, platformID string, platformModelID string, requestedModel string, modelType string) (int, error) { if strings.TrimSpace(platformID) == "" || strings.TrimSpace(platformModelID) == "" || strings.TrimSpace(requestedModel) == "" { return 0, nil } var dynamicPriority int err := s.pool.QueryRow(ctx, ` WITH current_model AS ( SELECT id, platform_id FROM platform_models WHERE id = $2::uuid AND platform_id = $1::uuid ), peer_priority AS ( SELECT MAX(COALESCE(peer.dynamic_priority, peer.priority)) AS max_priority FROM current_model current JOIN platform_models peer_model ON peer_model.platform_id <> current.platform_id JOIN integration_platforms peer ON peer.id = peer_model.platform_id LEFT JOIN base_model_catalog peer_base ON peer_base.id = peer_model.base_model_id WHERE peer.status = 'enabled' AND peer.deleted_at IS NULL AND (peer.cooldown_until IS NULL OR peer.cooldown_until <= now()) AND peer_model.enabled = true AND (peer_model.cooldown_until IS NULL OR peer_model.cooldown_until <= now()) AND (NULLIF($4::text, '') IS NULL OR peer_model.model_type @> jsonb_build_array($4::text)) AND ( (COALESCE(peer_model.model_alias, '') <> '' AND peer_model.model_alias = $3::text) OR ( COALESCE(peer_model.model_alias, '') = '' AND ( peer_model.model_name = $3::text OR peer_base.canonical_model_key = $3::text OR peer_base.provider_model_name = $3::text ) ) ) ) UPDATE integration_platforms target SET dynamic_priority = COALESCE((SELECT max_priority FROM peer_priority), target.priority) + 1, updated_at = now() WHERE target.id = $1::uuid AND target.deleted_at IS NULL AND EXISTS (SELECT 1 FROM current_model) RETURNING dynamic_priority`, platformID, platformModelID, requestedModel, modelType).Scan(&dynamicPriority) return dynamicPriority, err } func (s *Store) UpdatePlatformDynamicPriority(ctx context.Context, platformID string, dynamicPriority *int) (PlatformDynamicPriorityState, error) { if strings.TrimSpace(platformID) == "" { return PlatformDynamicPriorityState{}, pgx.ErrNoRows } value := 0 reset := dynamicPriority == nil if dynamicPriority != nil { value = *dynamicPriority } return scanPlatformDynamicPriorityState(s.pool.QueryRow(ctx, ` UPDATE integration_platforms SET dynamic_priority = CASE WHEN $2::boolean THEN priority ELSE $3::int END, updated_at = now() WHERE id = $1::uuid AND deleted_at IS NULL RETURNING id::text, priority, dynamic_priority, COALESCE(dynamic_priority, priority), updated_at`, platformID, reset, value)) } func scanPlatformDynamicPriorityState(scanner runtimePolicyScanner) (PlatformDynamicPriorityState, error) { var item PlatformDynamicPriorityState var dynamicPriority sql.NullInt64 if err := scanner.Scan( &item.PlatformID, &item.Priority, &dynamicPriority, &item.EffectivePriority, &item.UpdatedAt, ); err != nil { return PlatformDynamicPriorityState{}, err } item.DynamicPriority = intPointerFromNull(dynamicPriority) return item, nil } func intPointerFromNull(value sql.NullInt64) *int { if !value.Valid { return nil } converted := int(value.Int64) return &converted } func scanRuntimePolicySet(scanner runtimePolicyScanner) (RuntimePolicySet, error) { var item RuntimePolicySet var rateLimitPolicy []byte var retryPolicy []byte var autoDisablePolicy []byte var degradePolicy []byte var metadata []byte if err := scanner.Scan( &item.ID, &item.PolicyKey, &item.Name, &item.Description, &rateLimitPolicy, &retryPolicy, &autoDisablePolicy, °radePolicy, &metadata, &item.Status, &item.CreatedAt, &item.UpdatedAt, ); err != nil { return RuntimePolicySet{}, err } item.RateLimitPolicy = decodeObject(rateLimitPolicy) item.RetryPolicy = decodeObject(retryPolicy) item.AutoDisablePolicy = decodeObject(autoDisablePolicy) item.DegradePolicy = decodeObject(degradePolicy) item.Metadata = decodeObject(metadata) return item, nil } func normalizeRuntimePolicyInput(input RuntimePolicySetInput) RuntimePolicySetInput { input.PolicyKey = strings.TrimSpace(input.PolicyKey) input.Name = strings.TrimSpace(input.Name) input.Description = strings.TrimSpace(input.Description) input.Status = strings.TrimSpace(input.Status) if input.Status == "" { input.Status = "active" } return input }