easyai-ai-gateway/apps/api/internal/store/postgres.go

1964 lines
63 KiB
Go

package store
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
"strings"
"time"
"unicode"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/auth"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/crypto/bcrypt"
)
type Store struct {
pool *pgxpool.Pool
}
var (
ErrInvalidCredentials = errors.New("invalid account or password")
ErrInvalidInvitation = errors.New("invalid or expired invitation code")
ErrLocalUserRequired = errors.New("local gateway user is required")
ErrProtectedDefault = errors.New("protected default resource cannot be deleted")
ErrUserAlreadyExists = errors.New("user already exists")
ErrWeakPassword = errors.New("password must be at least 8 characters")
)
func Connect(ctx context.Context, databaseURL string) (*Store, error) {
pool, err := pgxpool.New(ctx, databaseURL)
if err != nil {
return nil, err
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, err
}
return &Store{pool: pool}, nil
}
func (s *Store) Close() {
s.pool.Close()
}
func (s *Store) Ping(ctx context.Context) error {
return s.pool.Ping(ctx)
}
type Platform struct {
ID string `json:"id"`
Provider string `json:"provider"`
PlatformKey string `json:"platformKey"`
Name string `json:"name"`
InternalName string `json:"internalName,omitempty"`
BaseURL string `json:"baseUrl,omitempty"`
AuthType string `json:"authType"`
Status string `json:"status"`
Priority int `json:"priority"`
DefaultPricingMode string `json:"defaultPricingMode"`
DefaultDiscountFactor float64 `json:"defaultDiscountFactor"`
PricingRuleSetID string `json:"pricingRuleSetId,omitempty"`
Config map[string]any `json:"config,omitempty"`
CredentialsPreview map[string]any `json:"credentialsPreview,omitempty"`
RetryPolicy map[string]any `json:"retryPolicy,omitempty"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type CreatePlatformInput struct {
Provider string `json:"provider"`
PlatformKey string `json:"platformKey"`
Name string `json:"name"`
InternalName string `json:"internalName"`
BaseURL string `json:"baseUrl"`
AuthType string `json:"authType"`
Credentials map[string]any `json:"credentials"`
Config map[string]any `json:"config"`
RetryPolicy map[string]any `json:"retryPolicy"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy"`
DefaultPricingMode string `json:"defaultPricingMode"`
DefaultDiscountFactor float64 `json:"defaultDiscountFactor"`
PricingRuleSetID string `json:"pricingRuleSetId"`
Priority int `json:"priority"`
}
type CreateAPIKeyInput struct {
Name string `json:"name"`
Scopes []string `json:"scopes"`
ExpiresAt string `json:"expiresAt"`
}
type APIKey struct {
ID string `json:"id"`
GatewayTenantID string `json:"gatewayTenantId,omitempty"`
GatewayUserID string `json:"gatewayUserId"`
TenantID string `json:"tenantId,omitempty"`
TenantKey string `json:"tenantKey,omitempty"`
UserID string `json:"userId,omitempty"`
KeyPrefix string `json:"keyPrefix"`
Name string `json:"name"`
Scopes []string `json:"scopes,omitempty"`
UserGroupID string `json:"userGroupId,omitempty"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"`
QuotaPolicy map[string]any `json:"quotaPolicy,omitempty"`
Status string `json:"status"`
ExpiresAt string `json:"expiresAt,omitempty"`
LastUsedAt string `json:"lastUsedAt,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type PlayableAPIKey struct {
APIKey
Secret string `json:"secret"`
}
type CreatedAPIKey struct {
APIKey APIKey `json:"apiKey"`
Secret string `json:"secret"`
}
type PlatformModel struct {
ID string `json:"id"`
PlatformID string `json:"platformId"`
BaseModelID string `json:"baseModelId,omitempty"`
Provider string `json:"provider,omitempty"`
PlatformName string `json:"platformName,omitempty"`
ModelName string `json:"modelName"`
ModelAlias string `json:"modelAlias,omitempty"`
ModelType string `json:"modelType"`
DisplayName string `json:"displayName"`
CapabilityOverride map[string]any `json:"capabilityOverride,omitempty"`
Capabilities map[string]any `json:"capabilities,omitempty"`
PricingMode string `json:"pricingMode"`
DiscountFactor float64 `json:"discountFactor,omitempty"`
PricingRuleSetID string `json:"pricingRuleSetId,omitempty"`
BillingConfigOverride map[string]any `json:"billingConfigOverride,omitempty"`
BillingConfig map[string]any `json:"billingConfig,omitempty"`
PermissionConfig map[string]any `json:"permissionConfig,omitempty"`
RetryPolicy map[string]any `json:"retryPolicy,omitempty"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"`
RuntimePolicySetID string `json:"runtimePolicySetId,omitempty"`
RuntimePolicyOverride map[string]any `json:"runtimePolicyOverride,omitempty"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type AccessRule struct {
ID string `json:"id"`
SubjectType string `json:"subjectType"`
SubjectID string `json:"subjectId"`
ResourceType string `json:"resourceType"`
ResourceID string `json:"resourceId"`
Effect string `json:"effect"`
Priority int `json:"priority"`
MinPermissionLevel int `json:"minPermissionLevel"`
Conditions map[string]any `json:"conditions,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Status string `json:"status"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type CatalogProvider struct {
ID string `json:"id"`
ProviderKey string `json:"providerKey"`
Code string `json:"code"`
DisplayName string `json:"displayName"`
ProviderType string `json:"providerType"`
IconPath string `json:"iconPath,omitempty"`
DefaultBaseURL string `json:"defaultBaseUrl,omitempty"`
DefaultAuthType string `json:"defaultAuthType,omitempty"`
Source string `json:"source,omitempty"`
CapabilitySchema map[string]any `json:"capabilitySchema,omitempty"`
DefaultRateLimitPolicy map[string]any `json:"defaultRateLimitPolicy,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Status string `json:"status"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type BaseModel struct {
ID string `json:"id"`
ProviderKey string `json:"providerKey"`
CanonicalModelKey string `json:"canonicalModelKey"`
ProviderModelName string `json:"providerModelName"`
ModelType StringList `json:"modelType"`
ModelAlias string `json:"modelAlias"`
DisplayName string `json:"-"`
Capabilities map[string]any `json:"capabilities,omitempty"`
BaseBillingConfig map[string]any `json:"baseBillingConfig,omitempty"`
DefaultRateLimitPolicy map[string]any `json:"defaultRateLimitPolicy,omitempty"`
PricingRuleSetID string `json:"pricingRuleSetId,omitempty"`
RuntimePolicySetID string `json:"runtimePolicySetId,omitempty"`
RuntimePolicyOverride map[string]any `json:"runtimePolicyOverride,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
CatalogType string `json:"catalogType"`
DefaultSnapshot map[string]any `json:"defaultSnapshot,omitempty"`
CustomizedAt string `json:"customizedAt,omitempty"`
PricingVersion int `json:"pricingVersion"`
Status string `json:"status"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type RuntimePolicySet struct {
ID string `json:"id"`
PolicyKey string `json:"policyKey"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"`
RetryPolicy map[string]any `json:"retryPolicy,omitempty"`
AutoDisablePolicy map[string]any `json:"autoDisablePolicy,omitempty"`
DegradePolicy map[string]any `json:"degradePolicy,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Status string `json:"status"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type PricingRule struct {
ID string `json:"id"`
RuleSetID string `json:"ruleSetId,omitempty"`
RuleKey string `json:"ruleKey"`
DisplayName string `json:"displayName"`
ScopeType string `json:"scopeType"`
ScopeID string `json:"scopeId,omitempty"`
ResourceType string `json:"resourceType"`
Unit string `json:"unit"`
BasePrice float64 `json:"basePrice"`
Currency string `json:"currency"`
BaseWeight map[string]any `json:"baseWeight,omitempty"`
DynamicWeight map[string]any `json:"dynamicWeight,omitempty"`
CalculatorType string `json:"calculatorType"`
DimensionSchema map[string]any `json:"dimensionSchema,omitempty"`
FormulaConfig map[string]any `json:"formulaConfig,omitempty"`
Priority int `json:"priority"`
Status string `json:"status"`
Metadata map[string]any `json:"metadata,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type PricingRuleSet struct {
ID string `json:"id"`
RuleSetKey string `json:"ruleSetKey"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Category string `json:"category"`
Currency string `json:"currency"`
Status string `json:"status"`
Metadata map[string]any `json:"metadata,omitempty"`
Rules []PricingRule `json:"rules,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type GatewayTenant struct {
ID string `json:"id"`
TenantKey string `json:"tenantKey"`
Source string `json:"source"`
ExternalTenantID string `json:"externalTenantId,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
DefaultUserGroupID string `json:"defaultUserGroupId,omitempty"`
PlanKey string `json:"planKey,omitempty"`
BillingProfile map[string]any `json:"billingProfile,omitempty"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"`
AuthPolicy map[string]any `json:"authPolicy,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Status string `json:"status"`
SyncedAt string `json:"syncedAt,omitempty"`
SourceUpdatedAt string `json:"sourceUpdatedAt,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type LocalRegisterInput struct {
Username string `json:"username"`
Email string `json:"email"`
Password string `json:"password"`
DisplayName string `json:"displayName"`
InvitationCode string `json:"invitationCode"`
}
type LocalLoginInput struct {
Account string `json:"account"`
Password string `json:"password"`
}
type GatewayUser struct {
ID string `json:"id"`
UserKey string `json:"userKey"`
Source string `json:"source"`
ExternalUserID string `json:"externalUserId,omitempty"`
Username string `json:"username"`
DisplayName string `json:"displayName,omitempty"`
Email string `json:"email,omitempty"`
Phone string `json:"phone,omitempty"`
AvatarURL string `json:"avatarUrl,omitempty"`
GatewayTenantID string `json:"gatewayTenantId,omitempty"`
TenantID string `json:"tenantId,omitempty"`
TenantKey string `json:"tenantKey,omitempty"`
DefaultUserGroupID string `json:"defaultUserGroupId,omitempty"`
Roles []string `json:"roles,omitempty"`
AuthProfile map[string]any `json:"authProfile,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Status string `json:"status"`
LastLoginAt string `json:"lastLoginAt,omitempty"`
SyncedAt string `json:"syncedAt,omitempty"`
SourceUpdatedAt string `json:"sourceUpdatedAt,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type UserGroup struct {
ID string `json:"id"`
GroupKey string `json:"groupKey"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Source string `json:"source"`
Priority int `json:"priority"`
RechargeDiscountPolicy map[string]any `json:"rechargeDiscountPolicy,omitempty"`
BillingDiscountPolicy map[string]any `json:"billingDiscountPolicy,omitempty"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"`
QuotaPolicy map[string]any `json:"quotaPolicy,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Status string `json:"status"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type RateLimitWindow struct {
ScopeType string `json:"scopeType"`
ScopeKey string `json:"scopeKey"`
Metric string `json:"metric"`
WindowStart time.Time `json:"windowStart"`
LimitValue float64 `json:"limitValue"`
UsedValue float64 `json:"usedValue"`
ReservedValue float64 `json:"reservedValue"`
ResetAt time.Time `json:"resetAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type CreateTaskInput struct {
Kind string `json:"kind"`
Model string `json:"model"`
RunMode string `json:"runMode"`
Request map[string]any `json:"request"`
}
type GatewayTask struct {
ID string `json:"id"`
Kind string `json:"kind"`
RunMode string `json:"runMode"`
UserID string `json:"userId"`
GatewayUserID string `json:"gatewayUserId,omitempty"`
UserSource string `json:"userSource,omitempty"`
GatewayTenantID string `json:"gatewayTenantId,omitempty"`
TenantID string `json:"tenantId,omitempty"`
TenantKey string `json:"tenantKey,omitempty"`
APIKeyID string `json:"apiKeyId,omitempty"`
APIKeyName string `json:"apiKeyName,omitempty"`
APIKeyPrefix string `json:"apiKeyPrefix,omitempty"`
UserGroupID string `json:"userGroupId,omitempty"`
UserGroupKey string `json:"userGroupKey,omitempty"`
Model string `json:"model"`
ModelType string `json:"modelType,omitempty"`
RequestedModel string `json:"requestedModel,omitempty"`
ResolvedModel string `json:"resolvedModel,omitempty"`
RequestID string `json:"requestId,omitempty"`
Request map[string]any `json:"request,omitempty"`
Status string `json:"status"`
Result map[string]any `json:"result,omitempty"`
Billings []any `json:"billings,omitempty"`
Usage map[string]any `json:"usage"`
Metrics map[string]any `json:"metrics"`
BillingSummary map[string]any `json:"billingSummary"`
FinalChargeAmount float64 `json:"finalChargeAmount"`
ResponseStartedAt string `json:"responseStartedAt,omitempty"`
ResponseFinishedAt string `json:"responseFinishedAt,omitempty"`
ResponseDurationMS int64 `json:"responseDurationMs"`
FinishedAt string `json:"finishedAt,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
const gatewayTaskColumns = `
id::text, kind, run_mode, user_id, COALESCE(gateway_user_id::text, ''), user_source,
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(api_key_id, ''), COALESCE(api_key_name, ''), COALESCE(api_key_prefix, ''),
COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model,
COALESCE(model_type, ''), COALESCE(requested_model, ''), COALESCE(resolved_model, ''), COALESCE(request_id, ''),
request, status, COALESCE(result, '{}'::jsonb), COALESCE(billings, '[]'::jsonb),
COALESCE(usage, '{}'::jsonb), COALESCE(metrics, '{}'::jsonb), COALESCE(billing_summary, '{}'::jsonb),
COALESCE(final_charge_amount, 0)::float8, COALESCE(response_started_at::text, ''),
COALESCE(response_finished_at::text, ''), COALESCE(response_duration_ms, 0), COALESCE(error, ''),
created_at, updated_at, COALESCE(finished_at::text, '')`
type TaskEvent struct {
ID string `json:"id"`
TaskID string `json:"taskId"`
Seq int64 `json:"seq"`
EventType string `json:"eventType"`
Status string `json:"status,omitempty"`
Phase string `json:"phase,omitempty"`
Progress float64 `json:"progress,omitempty"`
Message string `json:"message,omitempty"`
Payload map[string]any `json:"payload,omitempty"`
Simulated bool `json:"simulated"`
CreatedAt time.Time `json:"createdAt"`
}
func (s *Store) ListPlatforms(ctx context.Context) ([]Platform, error) {
rows, err := s.pool.Query(ctx, `
SELECT id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority,
default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''),
config, credentials, retry_policy, rate_limit_policy, created_at, updated_at
FROM integration_platforms
ORDER BY priority ASC, created_at DESC`)
if err != nil {
return nil, err
}
defer rows.Close()
platforms := make([]Platform, 0)
for rows.Next() {
var platform Platform
var configBytes []byte
var credentialsBytes []byte
var retryPolicyBytes []byte
var rateLimitPolicyBytes []byte
if err := rows.Scan(
&platform.ID,
&platform.Provider,
&platform.PlatformKey,
&platform.Name,
&platform.InternalName,
&platform.BaseURL,
&platform.AuthType,
&platform.Status,
&platform.Priority,
&platform.DefaultPricingMode,
&platform.DefaultDiscountFactor,
&platform.PricingRuleSetID,
&configBytes,
&credentialsBytes,
&retryPolicyBytes,
&rateLimitPolicyBytes,
&platform.CreatedAt,
&platform.UpdatedAt,
); err != nil {
return nil, err
}
platform.Config = decodeObject(configBytes)
platform.CredentialsPreview = maskCredentialsPreview(credentialsBytes)
platform.RetryPolicy = decodeObject(retryPolicyBytes)
platform.RateLimitPolicy = decodeObject(rateLimitPolicyBytes)
platforms = append(platforms, platform)
}
return platforms, rows.Err()
}
func (s *Store) CreatePlatform(ctx context.Context, input CreatePlatformInput) (Platform, error) {
credentials, _ := json.Marshal(emptyObjectIfNil(input.Credentials))
config, _ := json.Marshal(emptyObjectIfNil(input.Config))
retryPolicy, _ := json.Marshal(emptyObjectIfNil(input.RetryPolicy))
rateLimitPolicy, _ := json.Marshal(emptyObjectIfNil(input.RateLimitPolicy))
if input.DefaultPricingMode == "" {
input.DefaultPricingMode = "inherit_discount"
}
if input.DefaultDiscountFactor == 0 {
input.DefaultDiscountFactor = 1
}
if input.Priority == 0 {
input.Priority = 100
}
var platform Platform
var configBytes []byte
var credentialsResultBytes []byte
var retryPolicyBytes []byte
var rateLimitPolicyBytes []byte
err := s.pool.QueryRow(ctx, `
INSERT INTO integration_platforms (
provider, platform_key, name, internal_name, base_url, auth_type, credentials, config,
default_pricing_mode, default_discount_factor, pricing_rule_set_id,
priority, retry_policy, rate_limit_policy
)
VALUES (
$1, COALESCE(NULLIF($2, ''), 'platform_' || replace(gen_random_uuid()::text, '-', '')), $3, NULLIF($4, ''), $5, $6, $7, $8,
$9, $10, NULLIF($11, '')::uuid, $12, $13, $14
)
RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority,
default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''),
config, credentials, retry_policy, rate_limit_policy, created_at, updated_at`,
input.Provider, input.PlatformKey, input.Name, strings.TrimSpace(input.InternalName), input.BaseURL, input.AuthType, credentials, config,
input.DefaultPricingMode, input.DefaultDiscountFactor, input.PricingRuleSetID, input.Priority,
string(retryPolicy), string(rateLimitPolicy),
).Scan(
&platform.ID,
&platform.Provider,
&platform.PlatformKey,
&platform.Name,
&platform.InternalName,
&platform.BaseURL,
&platform.AuthType,
&platform.Status,
&platform.Priority,
&platform.DefaultPricingMode,
&platform.DefaultDiscountFactor,
&platform.PricingRuleSetID,
&configBytes,
&credentialsResultBytes,
&retryPolicyBytes,
&rateLimitPolicyBytes,
&platform.CreatedAt,
&platform.UpdatedAt,
)
if err != nil {
return Platform{}, err
}
platform.Config = decodeObject(configBytes)
platform.CredentialsPreview = maskCredentialsPreview(credentialsResultBytes)
platform.RetryPolicy = decodeObject(retryPolicyBytes)
platform.RateLimitPolicy = decodeObject(rateLimitPolicyBytes)
return platform, nil
}
func (s *Store) UpdatePlatform(ctx context.Context, id string, input CreatePlatformInput) (Platform, error) {
var credentials any
if input.Credentials != nil {
credentialsBytes, _ := json.Marshal(emptyObjectIfNil(input.Credentials))
credentials = string(credentialsBytes)
}
config, _ := json.Marshal(emptyObjectIfNil(input.Config))
retryPolicy, _ := json.Marshal(emptyObjectIfNil(input.RetryPolicy))
rateLimitPolicy, _ := json.Marshal(emptyObjectIfNil(input.RateLimitPolicy))
if input.DefaultPricingMode == "" {
input.DefaultPricingMode = "inherit_discount"
}
if input.DefaultDiscountFactor == 0 {
input.DefaultDiscountFactor = 1
}
if input.Priority == 0 {
input.Priority = 100
}
var platform Platform
var configBytes []byte
var credentialsResultBytes []byte
var retryPolicyBytes []byte
var rateLimitPolicyBytes []byte
err := s.pool.QueryRow(ctx, `
UPDATE integration_platforms
SET provider = $2,
platform_key = COALESCE(NULLIF($3, ''), platform_key),
name = $4,
internal_name = NULLIF($5, ''),
base_url = $6,
auth_type = $7,
credentials = CASE
WHEN $8::jsonb IS NULL THEN credentials
WHEN $8::jsonb = '{}'::jsonb THEN '{}'::jsonb
ELSE credentials || $8::jsonb
END,
config = $9,
default_pricing_mode = $10,
default_discount_factor = $11,
pricing_rule_set_id = NULLIF($12, '')::uuid,
priority = $13,
retry_policy = $14,
rate_limit_policy = $15,
updated_at = now()
WHERE id = $1::uuid
RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority,
default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''),
config, credentials, retry_policy, rate_limit_policy, created_at, updated_at`,
id,
input.Provider,
input.PlatformKey,
input.Name,
strings.TrimSpace(input.InternalName),
input.BaseURL,
input.AuthType,
credentials,
string(config),
input.DefaultPricingMode,
input.DefaultDiscountFactor,
input.PricingRuleSetID,
input.Priority,
string(retryPolicy),
string(rateLimitPolicy),
).Scan(
&platform.ID,
&platform.Provider,
&platform.PlatformKey,
&platform.Name,
&platform.InternalName,
&platform.BaseURL,
&platform.AuthType,
&platform.Status,
&platform.Priority,
&platform.DefaultPricingMode,
&platform.DefaultDiscountFactor,
&platform.PricingRuleSetID,
&configBytes,
&credentialsResultBytes,
&retryPolicyBytes,
&rateLimitPolicyBytes,
&platform.CreatedAt,
&platform.UpdatedAt,
)
if err != nil {
return Platform{}, err
}
platform.Config = decodeObject(configBytes)
platform.CredentialsPreview = maskCredentialsPreview(credentialsResultBytes)
platform.RetryPolicy = decodeObject(retryPolicyBytes)
platform.RateLimitPolicy = decodeObject(rateLimitPolicyBytes)
return platform, nil
}
func (s *Store) DeletePlatform(ctx context.Context, id string) error {
result, err := s.pool.Exec(ctx, `DELETE FROM integration_platforms WHERE id = $1::uuid`, id)
if err != nil {
return err
}
if result.RowsAffected() == 0 {
return pgx.ErrNoRows
}
return nil
}
func (s *Store) ListModels(ctx context.Context) ([]PlatformModel, error) {
return s.listModels(ctx, "")
}
func (s *Store) ListPlatformModels(ctx context.Context, platformID string) ([]PlatformModel, error) {
return s.listModels(ctx, strings.TrimSpace(platformID))
}
func (s *Store) listModels(ctx context.Context, platformID string) ([]PlatformModel, error) {
args := []any{}
where := ""
if platformID != "" {
where = "WHERE m.platform_id = $1::uuid"
args = append(args, platformID)
}
rows, err := s.pool.Query(ctx, `
SELECT m.id::text, m.platform_id::text, COALESCE(m.base_model_id::text, ''), p.provider, p.name,
m.model_name, COALESCE(m.model_alias, ''), m.model_type, m.display_name,
m.capability_override, m.capabilities, m.pricing_mode, COALESCE(m.discount_factor, 0)::float8,
COALESCE(m.pricing_rule_set_id::text, ''), m.billing_config_override, m.billing_config,
m.permission_config, m.retry_policy, m.rate_limit_policy, COALESCE(m.runtime_policy_set_id::text, ''), m.runtime_policy_override,
m.enabled, m.created_at, m.updated_at
FROM platform_models m
JOIN integration_platforms p ON p.id = m.platform_id
`+where+`
ORDER BY m.model_type ASC, m.model_name ASC`, args...)
if err != nil {
return nil, err
}
defer rows.Close()
models := make([]PlatformModel, 0)
for rows.Next() {
var model PlatformModel
var capabilityOverride []byte
var capabilities []byte
var billingConfigOverride []byte
var billingConfig []byte
var permissionConfig []byte
var retryPolicy []byte
var rateLimitPolicy []byte
var runtimePolicyOverride []byte
if err := rows.Scan(
&model.ID,
&model.PlatformID,
&model.BaseModelID,
&model.Provider,
&model.PlatformName,
&model.ModelName,
&model.ModelAlias,
&model.ModelType,
&model.DisplayName,
&capabilityOverride,
&capabilities,
&model.PricingMode,
&model.DiscountFactor,
&model.PricingRuleSetID,
&billingConfigOverride,
&billingConfig,
&permissionConfig,
&retryPolicy,
&rateLimitPolicy,
&model.RuntimePolicySetID,
&runtimePolicyOverride,
&model.Enabled,
&model.CreatedAt,
&model.UpdatedAt,
); err != nil {
return nil, err
}
model.CapabilityOverride = decodeObject(capabilityOverride)
model.Capabilities = decodeObject(capabilities)
model.BillingConfigOverride = decodeObject(billingConfigOverride)
model.BillingConfig = decodeObject(billingConfig)
model.PermissionConfig = decodeObject(permissionConfig)
model.RetryPolicy = decodeObject(retryPolicy)
model.RateLimitPolicy = decodeObject(rateLimitPolicy)
model.RuntimePolicyOverride = decodeObject(runtimePolicyOverride)
models = append(models, model)
}
return models, rows.Err()
}
func (s *Store) ListCatalogProviders(ctx context.Context) ([]CatalogProvider, error) {
rows, err := s.pool.Query(ctx, `
SELECT `+catalogProviderColumns+`
FROM model_catalog_providers
ORDER BY provider_key ASC`)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]CatalogProvider, 0)
for rows.Next() {
item, err := scanCatalogProvider(rows)
if err != nil {
return nil, err
}
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) ListPricingRules(ctx context.Context) ([]PricingRule, error) {
rows, err := s.pool.Query(ctx, `
SELECT id::text, scope_type, COALESCE(scope_id::text, ''), resource_type, unit,
base_price::float8, currency, base_weight, dynamic_weight,
COALESCE(rule_set_id::text, ''), rule_key, display_name, calculator_type,
dimension_schema, formula_config, priority, status, metadata, created_at, updated_at
FROM model_pricing_rules
ORDER BY COALESCE(rule_set_id::text, ''), priority ASC, resource_type ASC, created_at DESC`)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]PricingRule, 0)
for rows.Next() {
var item PricingRule
var baseWeight []byte
var dynamicWeight []byte
var dimensionSchema []byte
var formulaConfig []byte
var metadata []byte
if err := rows.Scan(
&item.ID,
&item.ScopeType,
&item.ScopeID,
&item.ResourceType,
&item.Unit,
&item.BasePrice,
&item.Currency,
&baseWeight,
&dynamicWeight,
&item.RuleSetID,
&item.RuleKey,
&item.DisplayName,
&item.CalculatorType,
&dimensionSchema,
&formulaConfig,
&item.Priority,
&item.Status,
&metadata,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return nil, err
}
item.BaseWeight = decodeObject(baseWeight)
item.DynamicWeight = decodeObject(dynamicWeight)
item.DimensionSchema = decodeObject(dimensionSchema)
item.FormulaConfig = decodeObject(formulaConfig)
item.Metadata = decodeObject(metadata)
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) ListTenants(ctx context.Context) ([]GatewayTenant, error) {
rows, err := s.pool.Query(ctx, `
SELECT id::text, tenant_key, source, COALESCE(external_tenant_id, ''), name, COALESCE(description, ''),
COALESCE(default_user_group_id::text, ''), COALESCE(plan_key, ''), billing_profile, rate_limit_policy,
auth_policy, metadata, status, COALESCE(synced_at::text, ''), COALESCE(source_updated_at::text, ''),
created_at, updated_at
FROM gateway_tenants
WHERE deleted_at IS NULL
ORDER BY created_at DESC`)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]GatewayTenant, 0)
for rows.Next() {
var item GatewayTenant
var billingProfile []byte
var rateLimitPolicy []byte
var authPolicy []byte
var metadata []byte
if err := rows.Scan(
&item.ID,
&item.TenantKey,
&item.Source,
&item.ExternalTenantID,
&item.Name,
&item.Description,
&item.DefaultUserGroupID,
&item.PlanKey,
&billingProfile,
&rateLimitPolicy,
&authPolicy,
&metadata,
&item.Status,
&item.SyncedAt,
&item.SourceUpdatedAt,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return nil, err
}
item.BillingProfile = decodeObject(billingProfile)
item.RateLimitPolicy = decodeObject(rateLimitPolicy)
item.AuthPolicy = decodeObject(authPolicy)
item.Metadata = decodeObject(metadata)
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) ListUsers(ctx context.Context) ([]GatewayUser, error) {
rows, err := s.pool.Query(ctx, `
SELECT id::text, user_key, source, COALESCE(external_user_id, ''), username,
COALESCE(display_name, ''), COALESCE(email, ''), COALESCE(phone, ''), COALESCE(avatar_url, ''),
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(default_user_group_id::text, ''), roles, auth_profile, metadata,
status, COALESCE(last_login_at::text, ''), COALESCE(synced_at::text, ''), COALESCE(source_updated_at::text, ''),
created_at, updated_at
FROM gateway_users
WHERE deleted_at IS NULL
ORDER BY created_at DESC`)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]GatewayUser, 0)
for rows.Next() {
var item GatewayUser
var roles []byte
var authProfile []byte
var metadata []byte
if err := rows.Scan(
&item.ID,
&item.UserKey,
&item.Source,
&item.ExternalUserID,
&item.Username,
&item.DisplayName,
&item.Email,
&item.Phone,
&item.AvatarURL,
&item.GatewayTenantID,
&item.TenantID,
&item.TenantKey,
&item.DefaultUserGroupID,
&roles,
&authProfile,
&metadata,
&item.Status,
&item.LastLoginAt,
&item.SyncedAt,
&item.SourceUpdatedAt,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return nil, err
}
item.Roles = decodeStringArray(roles)
item.AuthProfile = decodeObject(authProfile)
item.Metadata = decodeObject(metadata)
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) ListUserGroups(ctx context.Context) ([]UserGroup, error) {
rows, err := s.pool.Query(ctx, `
SELECT id::text, group_key, name, COALESCE(description, ''), source, priority,
recharge_discount_policy, billing_discount_policy, rate_limit_policy, quota_policy, metadata,
status, created_at, updated_at
FROM gateway_user_groups
ORDER BY priority ASC, group_key ASC`)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]UserGroup, 0)
for rows.Next() {
var item UserGroup
var rechargeDiscountPolicy []byte
var billingDiscountPolicy []byte
var rateLimitPolicy []byte
var quotaPolicy []byte
var metadata []byte
if err := rows.Scan(
&item.ID,
&item.GroupKey,
&item.Name,
&item.Description,
&item.Source,
&item.Priority,
&rechargeDiscountPolicy,
&billingDiscountPolicy,
&rateLimitPolicy,
&quotaPolicy,
&metadata,
&item.Status,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return nil, err
}
item.RechargeDiscountPolicy = decodeObject(rechargeDiscountPolicy)
item.BillingDiscountPolicy = decodeObject(billingDiscountPolicy)
item.RateLimitPolicy = decodeObject(rateLimitPolicy)
item.QuotaPolicy = decodeObject(quotaPolicy)
item.Metadata = decodeObject(metadata)
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) ListAPIKeys(ctx context.Context, user *auth.User) ([]APIKey, error) {
gatewayUserID := localGatewayUserID(user)
if gatewayUserID == "" {
return []APIKey{}, nil
}
rows, err := s.pool.Query(ctx, `
SELECT id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text,
COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''),
key_prefix, name, scopes, COALESCE(user_group_id::text, ''),
rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''),
COALESCE(last_used_at::text, ''), created_at, updated_at
FROM gateway_api_keys
WHERE gateway_user_id = $1::uuid AND deleted_at IS NULL
ORDER BY created_at DESC`, gatewayUserID)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]APIKey, 0)
for rows.Next() {
item, err := scanAPIKey(rows)
if err != nil {
return nil, err
}
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) ListPlayableAPIKeys(ctx context.Context, user *auth.User) ([]PlayableAPIKey, error) {
items, err := s.listRecoverableAPIKeys(ctx, user)
if err != nil {
return nil, err
}
if len(items) > 0 {
return items, nil
}
created, err := s.CreateAPIKey(ctx, CreateAPIKeyInput{
Name: "Playground API Key",
Scopes: []string{"chat", "image", "video"},
}, user)
if err != nil {
return nil, err
}
return []PlayableAPIKey{{APIKey: created.APIKey, Secret: created.Secret}}, nil
}
func (s *Store) listRecoverableAPIKeys(ctx context.Context, user *auth.User) ([]PlayableAPIKey, error) {
gatewayUserID := localGatewayUserID(user)
if gatewayUserID == "" {
return nil, ErrLocalUserRequired
}
rows, err := s.pool.Query(ctx, `
SELECT id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text,
COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''),
key_prefix, name, scopes, COALESCE(user_group_id::text, ''),
rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''),
COALESCE(last_used_at::text, ''), created_at, updated_at, COALESCE(key_secret, '')
FROM gateway_api_keys
WHERE gateway_user_id = $1::uuid
AND status = 'active'
AND deleted_at IS NULL
AND COALESCE(key_secret, '') <> ''
AND (expires_at IS NULL OR expires_at > now())
ORDER BY created_at DESC`, gatewayUserID)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]PlayableAPIKey, 0)
for rows.Next() {
var item PlayableAPIKey
apiKey, err := scanAPIKeyWithSecret(rows, &item.Secret)
if err != nil {
return nil, err
}
item.APIKey = apiKey
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) CreateAPIKey(ctx context.Context, input CreateAPIKeyInput, user *auth.User) (CreatedAPIKey, error) {
gatewayUserID := localGatewayUserID(user)
if gatewayUserID == "" {
return CreatedAPIKey{}, ErrLocalUserRequired
}
name := strings.TrimSpace(input.Name)
if name == "" {
name = "Default API Key"
}
scopes := input.Scopes
if len(scopes) == 0 {
scopes = []string{"chat", "image", "video"}
}
secret, err := generateAPIKeySecret()
if err != nil {
return CreatedAPIKey{}, err
}
keyHash, err := bcrypt.GenerateFromPassword([]byte(secret), bcrypt.DefaultCost)
if err != nil {
return CreatedAPIKey{}, err
}
scopesJSON, err := json.Marshal(scopes)
if err != nil {
return CreatedAPIKey{}, err
}
var item APIKey
var scopesBytes []byte
var rateLimitPolicy []byte
var quotaPolicy []byte
err = s.pool.QueryRow(ctx, `
INSERT INTO gateway_api_keys (
gateway_tenant_id, gateway_user_id, tenant_id, tenant_key, user_id,
key_prefix, key_secret, key_hash, name, scopes, expires_at
)
VALUES (NULLIF($1, '')::uuid, $2::uuid, NULLIF($3, ''), NULLIF($4, ''), NULLIF($5, ''),
$6, $7, $8, $9, $10::jsonb, NULLIF($11, '')::timestamptz)
RETURNING id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text,
COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''),
key_prefix, name, scopes, COALESCE(user_group_id::text, ''),
rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''),
COALESCE(last_used_at::text, ''), created_at, updated_at`,
user.GatewayTenantID, gatewayUserID, user.TenantID, user.TenantKey, user.ID,
apiKeyPrefix(secret), secret, string(keyHash), name, string(scopesJSON), strings.TrimSpace(input.ExpiresAt),
).Scan(
&item.ID,
&item.GatewayTenantID,
&item.GatewayUserID,
&item.TenantID,
&item.TenantKey,
&item.UserID,
&item.KeyPrefix,
&item.Name,
&scopesBytes,
&item.UserGroupID,
&rateLimitPolicy,
&quotaPolicy,
&item.Status,
&item.ExpiresAt,
&item.LastUsedAt,
&item.CreatedAt,
&item.UpdatedAt,
)
if err != nil {
return CreatedAPIKey{}, err
}
item.Scopes = decodeStringArray(scopesBytes)
item.RateLimitPolicy = decodeObject(rateLimitPolicy)
item.QuotaPolicy = decodeObject(quotaPolicy)
return CreatedAPIKey{APIKey: item, Secret: secret}, nil
}
func (s *Store) DisableAPIKey(ctx context.Context, apiKeyID string, user *auth.User) (APIKey, error) {
gatewayUserID := localGatewayUserID(user)
if gatewayUserID == "" {
return APIKey{}, ErrLocalUserRequired
}
var item APIKey
var scopesBytes []byte
var rateLimitPolicy []byte
var quotaPolicy []byte
err := s.pool.QueryRow(ctx, `
UPDATE gateway_api_keys
SET status = 'disabled', updated_at = now()
WHERE id = $1::uuid AND gateway_user_id = $2::uuid AND deleted_at IS NULL
RETURNING id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text,
COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''),
key_prefix, name, scopes, COALESCE(user_group_id::text, ''),
rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''),
COALESCE(last_used_at::text, ''), created_at, updated_at`,
apiKeyID, gatewayUserID,
).Scan(
&item.ID,
&item.GatewayTenantID,
&item.GatewayUserID,
&item.TenantID,
&item.TenantKey,
&item.UserID,
&item.KeyPrefix,
&item.Name,
&scopesBytes,
&item.UserGroupID,
&rateLimitPolicy,
&quotaPolicy,
&item.Status,
&item.ExpiresAt,
&item.LastUsedAt,
&item.CreatedAt,
&item.UpdatedAt,
)
if err != nil {
return APIKey{}, err
}
item.Scopes = decodeStringArray(scopesBytes)
item.RateLimitPolicy = decodeObject(rateLimitPolicy)
item.QuotaPolicy = decodeObject(quotaPolicy)
return item, nil
}
func (s *Store) VerifyLocalAPIKey(ctx context.Context, secret string) (*auth.User, error) {
prefix := apiKeyPrefix(secret)
if prefix == "" {
return nil, auth.ErrUnauthorized
}
rows, err := s.pool.Query(ctx, `
SELECT k.id::text, k.key_hash, k.key_prefix, k.name, COALESCE(k.user_group_id::text, ''),
u.id::text, u.username, u.roles, COALESCE(u.gateway_tenant_id::text, ''),
COALESCE(u.tenant_id, ''), COALESCE(u.tenant_key, '')
FROM gateway_api_keys k
JOIN gateway_users u ON u.id = k.gateway_user_id
WHERE k.key_prefix = $1
AND k.status = 'active'
AND k.deleted_at IS NULL
AND u.status = 'active'
AND u.deleted_at IS NULL
AND (k.expires_at IS NULL OR k.expires_at > now())`, prefix)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var apiKeyID string
var hash string
var keyPrefix string
var keyName string
var userGroupID string
var gatewayUserID string
var username string
var rolesBytes []byte
var gatewayTenantID string
var tenantID string
var tenantKey string
if err := rows.Scan(&apiKeyID, &hash, &keyPrefix, &keyName, &userGroupID, &gatewayUserID, &username, &rolesBytes, &gatewayTenantID, &tenantID, &tenantKey); err != nil {
return nil, err
}
if bcrypt.CompareHashAndPassword([]byte(hash), []byte(secret)) != nil {
continue
}
if _, err := s.pool.Exec(ctx, `UPDATE gateway_api_keys SET last_used_at = now(), updated_at = now() WHERE id = $1::uuid`, apiKeyID); err != nil {
return nil, err
}
return &auth.User{
ID: gatewayUserID,
Username: username,
Roles: decodeStringArray(rolesBytes),
TenantID: tenantID,
GatewayTenantID: gatewayTenantID,
TenantKey: tenantKey,
Source: "gateway",
GatewayUserID: gatewayUserID,
UserGroupID: userGroupID,
APIKeyID: apiKeyID,
APIKeyName: keyName,
APIKeyPrefix: keyPrefix,
}, nil
}
if err := rows.Err(); err != nil {
return nil, err
}
return nil, auth.ErrUnauthorized
}
func (s *Store) RegisterLocalUser(ctx context.Context, input LocalRegisterInput) (GatewayUser, error) {
account := normalizeAccount(firstNonEmpty(input.Username, input.Email))
if account == "" {
return GatewayUser{}, errors.New("username or email is required")
}
if len(input.Password) < 8 {
return GatewayUser{}, ErrWeakPassword
}
tenantKey := "default"
tenantName := "Default Tenant"
displayName := strings.TrimSpace(input.DisplayName)
username := strings.TrimSpace(input.Username)
if username == "" {
username = account
}
email := strings.TrimSpace(strings.ToLower(input.Email))
invitationCode := strings.TrimSpace(input.InvitationCode)
passwordHash, err := bcrypt.GenerateFromPassword([]byte(input.Password), bcrypt.DefaultCost)
if err != nil {
return GatewayUser{}, err
}
tx, err := s.pool.Begin(ctx)
if err != nil {
return GatewayUser{}, err
}
defer tx.Rollback(ctx)
var tenantID string
userGroupID := ""
role := "user"
invitationID := ""
invitedBy := ""
var isBootstrapUser bool
if err := tx.QueryRow(ctx, `
SELECT NOT EXISTS (
SELECT 1 FROM gateway_users WHERE source = 'gateway' AND deleted_at IS NULL
)`).Scan(&isBootstrapUser); err != nil {
return GatewayUser{}, err
}
if isBootstrapUser {
role = "manager"
}
if err := tx.QueryRow(ctx, `
INSERT INTO gateway_tenants (tenant_key, source, external_tenant_id, name)
VALUES ($1, 'gateway', $1, $2)
ON CONFLICT (tenant_key) DO UPDATE SET updated_at=now()
RETURNING id::text`,
tenantKey, tenantName,
).Scan(&tenantID); err != nil {
return GatewayUser{}, err
}
_ = tx.QueryRow(ctx, `
SELECT COALESCE(default_user_group_id::text, '')
FROM gateway_tenants
WHERE id = $1::uuid`, tenantID).Scan(&userGroupID)
if userGroupID == "" {
_ = tx.QueryRow(ctx, `
SELECT id::text
FROM gateway_user_groups
WHERE group_key = 'default' AND status = 'active'
LIMIT 1`).Scan(&userGroupID)
}
if invitationCode != "" {
if err := tx.QueryRow(ctx, `
SELECT i.id::text, COALESCE(i.created_by::text, '')
FROM gateway_invitations i
WHERE lower(i.invite_code) = lower($1)
AND i.status = 'active'
AND (i.expires_at IS NULL OR i.expires_at > now())
AND (i.max_uses IS NULL OR i.used_count < i.max_uses)
FOR UPDATE OF i`,
invitationCode,
).Scan(&invitationID, &invitedBy); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return GatewayUser{}, ErrInvalidInvitation
}
return GatewayUser{}, err
}
}
rolesJSON, err := json.Marshal([]string{role})
if err != nil {
return GatewayUser{}, err
}
userMetadataJSON := []byte("{}")
if invitationID != "" {
userMetadataJSON, err = json.Marshal(map[string]any{
"registration": map[string]any{
"invitationId": invitationID,
"invitationCode": invitationCode,
"invitedBy": invitedBy,
},
})
if err != nil {
return GatewayUser{}, err
}
}
var user GatewayUser
var roles []byte
var authProfile []byte
var metadata []byte
if err := tx.QueryRow(ctx, `
INSERT INTO gateway_users (
user_key, source, external_user_id, username, display_name, email,
password_hash, gateway_tenant_id, tenant_id, tenant_key, default_user_group_id, roles, metadata, status
)
VALUES ($1, 'gateway', $2, $3, NULLIF($4, ''), NULLIF($5, ''), $6, $7::uuid, $8, $8, NULLIF($9, '')::uuid, $10::jsonb, $11::jsonb, 'active')
RETURNING id::text, user_key, source, COALESCE(external_user_id, ''), username,
COALESCE(display_name, ''), COALESCE(email, ''), COALESCE(phone, ''), COALESCE(avatar_url, ''),
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(default_user_group_id::text, ''), roles, auth_profile, metadata,
status, COALESCE(last_login_at::text, ''), COALESCE(synced_at::text, ''), COALESCE(source_updated_at::text, ''),
created_at, updated_at`,
"gateway:"+account, account, username, displayName, email, string(passwordHash), tenantID, tenantKey, userGroupID, string(rolesJSON), string(userMetadataJSON),
).Scan(
&user.ID,
&user.UserKey,
&user.Source,
&user.ExternalUserID,
&user.Username,
&user.DisplayName,
&user.Email,
&user.Phone,
&user.AvatarURL,
&user.GatewayTenantID,
&user.TenantID,
&user.TenantKey,
&user.DefaultUserGroupID,
&roles,
&authProfile,
&metadata,
&user.Status,
&user.LastLoginAt,
&user.SyncedAt,
&user.SourceUpdatedAt,
&user.CreatedAt,
&user.UpdatedAt,
); err != nil {
if isUniqueViolation(err) {
return GatewayUser{}, ErrUserAlreadyExists
}
return GatewayUser{}, err
}
if invitationID != "" {
if _, err := tx.Exec(ctx, `
UPDATE gateway_invitations
SET used_count = used_count + 1, updated_at = now()
WHERE id = $1::uuid`, invitationID); err != nil {
return GatewayUser{}, err
}
}
if _, err := tx.Exec(ctx, `
INSERT INTO gateway_wallet_accounts (
gateway_tenant_id, gateway_user_id, tenant_id, tenant_key, user_id, currency
)
VALUES ($1::uuid, $2::uuid, $3, $3, $4, 'resource')
ON CONFLICT (gateway_user_id, currency) DO NOTHING`,
user.GatewayTenantID, user.ID, user.TenantKey, user.ID,
); err != nil {
return GatewayUser{}, err
}
if err := tx.Commit(ctx); err != nil {
return GatewayUser{}, err
}
user.Roles = decodeStringArray(roles)
user.AuthProfile = decodeObject(authProfile)
user.Metadata = decodeObject(metadata)
return user, nil
}
func (s *Store) AuthenticateLocalUser(ctx context.Context, input LocalLoginInput) (GatewayUser, error) {
account := normalizeAccount(input.Account)
if account == "" || input.Password == "" {
return GatewayUser{}, ErrInvalidCredentials
}
var user GatewayUser
var passwordHash string
var roles []byte
var authProfile []byte
var metadata []byte
err := s.pool.QueryRow(ctx, `
SELECT id::text, user_key, source, COALESCE(external_user_id, ''), username,
COALESCE(display_name, ''), COALESCE(email, ''), COALESCE(phone, ''), COALESCE(avatar_url, ''),
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(default_user_group_id::text, ''), roles, auth_profile, metadata,
status, COALESCE(password_hash, ''), COALESCE(last_login_at::text, ''), COALESCE(synced_at::text, ''),
COALESCE(source_updated_at::text, ''), created_at, updated_at
FROM gateway_users
WHERE source='gateway'
AND deleted_at IS NULL
AND (external_user_id=$1 OR lower(username)=$1 OR lower(COALESCE(email, ''))=$1)
ORDER BY created_at ASC
LIMIT 1`, account,
).Scan(
&user.ID,
&user.UserKey,
&user.Source,
&user.ExternalUserID,
&user.Username,
&user.DisplayName,
&user.Email,
&user.Phone,
&user.AvatarURL,
&user.GatewayTenantID,
&user.TenantID,
&user.TenantKey,
&user.DefaultUserGroupID,
&roles,
&authProfile,
&metadata,
&user.Status,
&passwordHash,
&user.LastLoginAt,
&user.SyncedAt,
&user.SourceUpdatedAt,
&user.CreatedAt,
&user.UpdatedAt,
)
if err != nil {
if IsNotFound(err) {
return GatewayUser{}, ErrInvalidCredentials
}
return GatewayUser{}, err
}
if user.Status != "active" || passwordHash == "" {
return GatewayUser{}, ErrInvalidCredentials
}
if err := bcrypt.CompareHashAndPassword([]byte(passwordHash), []byte(input.Password)); err != nil {
return GatewayUser{}, ErrInvalidCredentials
}
user.Roles = decodeStringArray(roles)
user.AuthProfile = decodeObject(authProfile)
user.Metadata = decodeObject(metadata)
_, _ = s.pool.Exec(ctx, `UPDATE gateway_users SET last_login_at=now(), updated_at=now() WHERE id=$1`, user.ID)
return user, nil
}
func (s *Store) ListRateLimitWindows(ctx context.Context) ([]RateLimitWindow, error) {
rows, err := s.pool.Query(ctx, `
SELECT scope_type, scope_key, metric, window_start, limit_value::float8, used_value::float8,
reserved_value::float8, reset_at, updated_at
FROM gateway_rate_limit_counters
WHERE reset_at >= now() - interval '5 minutes'
ORDER BY window_start DESC, scope_type ASC, scope_key ASC, metric ASC`)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]RateLimitWindow, 0)
for rows.Next() {
var item RateLimitWindow
if err := rows.Scan(
&item.ScopeType,
&item.ScopeKey,
&item.Metric,
&item.WindowStart,
&item.LimitValue,
&item.UsedValue,
&item.ReservedValue,
&item.ResetAt,
&item.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) CreateTask(ctx context.Context, input CreateTaskInput, user *auth.User) (GatewayTask, error) {
requestBody, _ := json.Marshal(input.Request)
runMode := normalizeRunMode(input.RunMode, input.Request)
status := "queued"
resultBody, _ := json.Marshal(map[string]any(nil))
billingsBody, _ := json.Marshal([]any(nil))
tx, err := s.pool.Begin(ctx)
if err != nil {
return GatewayTask{}, err
}
defer tx.Rollback(ctx)
task, err := scanGatewayTask(tx.QueryRow(ctx, `
INSERT INTO gateway_tasks (
kind, run_mode, user_id, gateway_user_id, user_source, gateway_tenant_id, tenant_id, tenant_key,
api_key_id, api_key_name, api_key_prefix, user_group_id, user_group_key,
model, requested_model, request, status, result, billings, finished_at
)
VALUES ($1, $2, $3, NULLIF($4, '')::uuid, COALESCE(NULLIF($5, ''), 'gateway'), NULLIF($6, '')::uuid, NULLIF($7, ''), NULLIF($8, ''), NULLIF($9, ''), NULLIF($10, ''), NULLIF($11, ''), NULLIF($12, '')::uuid, NULLIF($13, ''), $14, $14, $15, $16, $17::jsonb, $18::jsonb, CASE WHEN $19 THEN now() ELSE NULL END)
RETURNING `+gatewayTaskColumns,
input.Kind, runMode, user.ID, user.GatewayUserID, user.Source, user.GatewayTenantID, user.TenantID, user.TenantKey, user.APIKeyID, user.APIKeyName, user.APIKeyPrefix, user.UserGroupID, user.UserGroupKey, input.Model, requestBody, status, resultBody, billingsBody, false,
))
if err != nil {
return GatewayTask{}, err
}
events := taskEventsForCreate(task.ID, runMode, status, nil)
for _, event := range events {
payload, _ := json.Marshal(event.Payload)
if _, err := tx.Exec(ctx, `
INSERT INTO gateway_task_events (task_id, seq, event_type, status, phase, progress, message, payload, simulated)
VALUES ($1::uuid, $2, $3, NULLIF($4, ''), NULLIF($5, ''), $6, NULLIF($7, ''), $8::jsonb, $9)`,
task.ID, event.Seq, event.EventType, event.Status, event.Phase, event.Progress, event.Message, string(payload), event.Simulated,
); err != nil {
return GatewayTask{}, err
}
}
if err := tx.Commit(ctx); err != nil {
return GatewayTask{}, err
}
return task, nil
}
func (s *Store) GetTask(ctx context.Context, taskID string) (GatewayTask, error) {
task, err := scanGatewayTask(s.pool.QueryRow(ctx, `
SELECT `+gatewayTaskColumns+`
FROM gateway_tasks
WHERE id=$1`, taskID,
))
if err != nil {
return GatewayTask{}, err
}
return task, nil
}
type taskScanner interface {
Scan(dest ...any) error
}
func scanGatewayTask(scanner taskScanner) (GatewayTask, error) {
var task GatewayTask
var requestBytes []byte
var resultBytes []byte
var billingsBytes []byte
var usageBytes []byte
var metricsBytes []byte
var billingSummaryBytes []byte
if err := scanner.Scan(
&task.ID,
&task.Kind,
&task.RunMode,
&task.UserID,
&task.GatewayUserID,
&task.UserSource,
&task.GatewayTenantID,
&task.TenantID,
&task.TenantKey,
&task.APIKeyID,
&task.APIKeyName,
&task.APIKeyPrefix,
&task.UserGroupID,
&task.UserGroupKey,
&task.Model,
&task.ModelType,
&task.RequestedModel,
&task.ResolvedModel,
&task.RequestID,
&requestBytes,
&task.Status,
&resultBytes,
&billingsBytes,
&usageBytes,
&metricsBytes,
&billingSummaryBytes,
&task.FinalChargeAmount,
&task.ResponseStartedAt,
&task.ResponseFinishedAt,
&task.ResponseDurationMS,
&task.Error,
&task.CreatedAt,
&task.UpdatedAt,
&task.FinishedAt,
); err != nil {
return GatewayTask{}, err
}
task.Request = decodeObject(requestBytes)
task.Result = decodeObject(resultBytes)
task.Billings = decodeArray(billingsBytes)
task.Usage = decodeObject(usageBytes)
task.Metrics = decodeObject(metricsBytes)
task.BillingSummary = decodeObject(billingSummaryBytes)
return task, nil
}
func (s *Store) ListTaskEvents(ctx context.Context, taskID string) ([]TaskEvent, error) {
rows, err := s.pool.Query(ctx, `
SELECT id::text, task_id::text, seq, event_type, COALESCE(status, ''), COALESCE(phase, ''),
COALESCE(progress, 0)::float8, COALESCE(message, ''), payload, simulated, created_at
FROM gateway_task_events
WHERE task_id = $1::uuid
ORDER BY seq ASC`, taskID)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]TaskEvent, 0)
for rows.Next() {
var item TaskEvent
var payload []byte
if err := rows.Scan(
&item.ID,
&item.TaskID,
&item.Seq,
&item.EventType,
&item.Status,
&item.Phase,
&item.Progress,
&item.Message,
&payload,
&item.Simulated,
&item.CreatedAt,
); err != nil {
return nil, err
}
item.Payload = decodeObject(payload)
items = append(items, item)
}
return items, rows.Err()
}
func IsNotFound(err error) bool {
return err == pgx.ErrNoRows
}
func IsUniqueViolation(err error) bool {
return isUniqueViolation(err)
}
func isUniqueViolation(err error) bool {
var pgErr *pgconn.PgError
return errors.As(err, &pgErr) && pgErr.Code == "23505"
}
type apiKeyScanner interface {
Scan(dest ...any) error
}
func scanAPIKey(scanner apiKeyScanner) (APIKey, error) {
var item APIKey
var scopesBytes []byte
var rateLimitPolicy []byte
var quotaPolicy []byte
if err := scanner.Scan(
&item.ID,
&item.GatewayTenantID,
&item.GatewayUserID,
&item.TenantID,
&item.TenantKey,
&item.UserID,
&item.KeyPrefix,
&item.Name,
&scopesBytes,
&item.UserGroupID,
&rateLimitPolicy,
&quotaPolicy,
&item.Status,
&item.ExpiresAt,
&item.LastUsedAt,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return APIKey{}, err
}
item.Scopes = decodeStringArray(scopesBytes)
item.RateLimitPolicy = decodeObject(rateLimitPolicy)
item.QuotaPolicy = decodeObject(quotaPolicy)
return item, nil
}
func scanAPIKeyWithSecret(scanner apiKeyScanner, secret *string) (APIKey, error) {
var item APIKey
var scopesBytes []byte
var rateLimitPolicy []byte
var quotaPolicy []byte
if err := scanner.Scan(
&item.ID,
&item.GatewayTenantID,
&item.GatewayUserID,
&item.TenantID,
&item.TenantKey,
&item.UserID,
&item.KeyPrefix,
&item.Name,
&scopesBytes,
&item.UserGroupID,
&rateLimitPolicy,
&quotaPolicy,
&item.Status,
&item.ExpiresAt,
&item.LastUsedAt,
&item.CreatedAt,
&item.UpdatedAt,
secret,
); err != nil {
return APIKey{}, err
}
item.Scopes = decodeStringArray(scopesBytes)
item.RateLimitPolicy = decodeObject(rateLimitPolicy)
item.QuotaPolicy = decodeObject(quotaPolicy)
return item, nil
}
func localGatewayUserID(user *auth.User) string {
if user == nil {
return ""
}
if user.GatewayUserID != "" {
return user.GatewayUserID
}
if user.Source == "" || user.Source == "gateway" {
return user.ID
}
return ""
}
func generateAPIKeySecret() (string, error) {
bytes := make([]byte, 32)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return "sk-gw-" + base64.RawURLEncoding.EncodeToString(bytes), nil
}
func apiKeyPrefix(secret string) string {
secret = strings.TrimSpace(secret)
if !strings.HasPrefix(secret, "sk-gw-") {
return ""
}
if len(secret) <= 18 {
return secret
}
return secret[:18]
}
func normalizeRunMode(input string, request map[string]any) string {
mode := strings.ToLower(strings.TrimSpace(input))
if mode == "" && request != nil {
if raw, ok := request["runMode"].(string); ok {
mode = strings.ToLower(strings.TrimSpace(raw))
}
if raw, ok := request["mode"].(string); ok && mode == "" {
mode = strings.ToLower(strings.TrimSpace(raw))
}
if raw, ok := request["simulation"].(bool); ok && raw {
mode = "simulation"
}
if raw, ok := request["testMode"].(bool); ok && raw {
mode = "simulation"
}
}
if mode == "test" || mode == "dry-run" || mode == "dry_run" {
return "simulation"
}
if mode == "simulation" {
return "simulation"
}
return "production"
}
func simulationResult(kind string, model string) map[string]any {
switch kind {
case "chat.completions":
return map[string]any{
"id": "chatcmpl-simulated",
"object": "chat.completion",
"model": model,
"choices": []any{map[string]any{"index": 0, "finish_reason": "stop", "message": map[string]any{"role": "assistant", "content": "simulation response"}}},
"usage": map[string]any{"prompt_tokens": 12, "completion_tokens": 8, "total_tokens": 20},
}
case "images.generations":
return map[string]any{
"id": "img-simulated",
"model": model,
"data": []any{map[string]any{"url": "/static/simulation/image.png", "revised_prompt": "simulation image"}},
}
case "videos.generations":
return map[string]any{
"id": "video-simulated",
"model": model,
"data": []any{map[string]any{"url": "/static/simulation/video.mp4", "duration": 5}},
}
default:
return map[string]any{"id": "task-simulated", "model": model, "kind": kind, "ok": true}
}
}
func simulationBillings(kind string, model string) []any {
resourceType := "task"
unit := "item"
if kind == "chat.completions" {
resourceType = "text_total"
unit = "1k_tokens"
}
if kind == "images.generations" {
resourceType = "image"
unit = "image"
}
if kind == "videos.generations" {
resourceType = "video"
unit = "second"
}
return []any{map[string]any{
"model": model,
"resourceType": resourceType,
"unit": unit,
"quantity": 1,
"amount": 0,
"currency": "resource",
"simulated": true,
}}
}
func taskEventsForCreate(taskID string, runMode string, status string, result map[string]any) []TaskEvent {
return []TaskEvent{{
TaskID: taskID,
Seq: 1,
EventType: "task.accepted",
Status: "queued",
Phase: "queued",
Progress: 0,
Message: "task accepted",
Payload: map[string]any{"taskId": taskID},
Simulated: runMode == "simulation",
}}
}
func decodeObject(bytes []byte) map[string]any {
if len(bytes) == 0 {
return nil
}
var out map[string]any
if err := json.Unmarshal(bytes, &out); err != nil {
return nil
}
return out
}
func maskCredentialsPreview(bytes []byte) map[string]any {
credentials := decodeObject(bytes)
if len(credentials) == 0 {
return nil
}
out := make(map[string]any, len(credentials))
for key, value := range credentials {
out[key] = maskCredentialValue(value)
}
return out
}
func maskCredentialValue(value any) any {
switch typed := value.(type) {
case string:
return maskSecret(typed)
case map[string]any:
out := make(map[string]any, len(typed))
for key, nested := range typed {
out[key] = maskCredentialValue(nested)
}
return out
case []any:
out := make([]any, 0, len(typed))
for _, nested := range typed {
out = append(out, maskCredentialValue(nested))
}
return out
default:
return value
}
}
func maskSecret(value string) string {
value = strings.TrimSpace(value)
if value == "" {
return ""
}
if len(value) <= 6 {
return strings.Repeat("*", len(value))
}
return value[:3] + strings.Repeat("*", len(value)-6) + value[len(value)-3:]
}
func decodeArray(bytes []byte) []any {
if len(bytes) == 0 {
return nil
}
var out []any
if err := json.Unmarshal(bytes, &out); err != nil {
return nil
}
return out
}
func decodeStringArray(bytes []byte) []string {
if len(bytes) == 0 {
return nil
}
var out []string
if err := json.Unmarshal(bytes, &out); err == nil {
return out
}
return nil
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return value
}
}
return ""
}
func normalizeAccount(value string) string {
return strings.ToLower(strings.TrimSpace(value))
}
func normalizeKey(value string) string {
value = strings.ToLower(strings.TrimSpace(value))
var b strings.Builder
lastDash := false
for _, r := range value {
switch {
case unicode.IsLetter(r), unicode.IsDigit(r):
b.WriteRune(r)
lastDash = false
case r == '-' || r == '_' || r == '.' || unicode.IsSpace(r):
if !lastDash && b.Len() > 0 {
b.WriteByte('-')
lastDash = true
}
}
}
out := strings.Trim(b.String(), "-")
if out == "" {
return "default"
}
return out
}