feat: add ai gateway local core flow

This commit is contained in:
wangbo 2026-05-09 16:51:28 +08:00
parent 5b20f017eb
commit c0335bd5d0
20 changed files with 2332 additions and 156 deletions

View File

@ -34,5 +34,5 @@ TASK_PROGRESS_CALLBACK_URL=http://localhost:3000/internal/platform/task-progress
TASK_PROGRESS_CALLBACK_TIMEOUT_MS=5000
TASK_PROGRESS_CALLBACK_MAX_ATTEMPTS=10
CORS_ALLOWED_ORIGIN=http://localhost:5178
CORS_ALLOWED_ORIGIN=http://localhost:5178,http://127.0.0.1:5178
VITE_GATEWAY_API_BASE_URL=http://localhost:8088

View File

@ -36,6 +36,13 @@ pnpm dev
- PostgreSQL: 目标版本 18默认使用宿主机 `localhost:5432` 上的 `easyai-pgvector` 实例,并使用独立库 `easyai_ai_gateway`
- 身份模式: 默认 `IDENTITY_MODE=hybrid`,可同时测试 Gateway 本地账号注册登录、可选邀请码和 `server-main` JWT / API Key 对接。
`pnpm dev` 会先创建数据库并执行 migration然后并行启动
- `api:dev`:通过 `scripts/go-watch.mjs` 运行 Go API监听 `.go`、`go.mod`、`go.sum` 变化并自动重启后端进程watcher 会按进程组终止旧的 `go run` 和其子进程,避免热更新时残留进程占用 API 端口。
- `web:dev`Vite React dev server。
后端热更新可通过 `GO_WATCH_SHUTDOWN_GRACE_MS``GO_WATCH_RESTART_DELAY_MS` 调整旧进程退出等待时间与重启间隔。
默认 EasyAI 部署里,`easyai-pgvector` 在容器网络内的连接串是:
```dotenv

View File

@ -52,6 +52,7 @@ type Authenticator struct {
ServerMainBaseURL string
ServerMainInternalToken string
HTTPClient *http.Client
LocalAPIKeyVerifier func(ctx context.Context, apiKey string) (*User, error)
}
func New(jwtSecret string, serverMainBaseURL string, internalToken string) *Authenticator {
@ -169,6 +170,18 @@ func (a *Authenticator) SignJWT(user *User, ttl time.Duration) (string, error) {
}
func (a *Authenticator) verifyAPIKey(ctx context.Context, apiKey string) (*User, error) {
if a.LocalAPIKeyVerifier != nil {
user, err := a.LocalAPIKeyVerifier(ctx, apiKey)
if err == nil {
return user, nil
}
if !errors.Is(err, ErrUnauthorized) {
return nil, err
}
if strings.HasPrefix(apiKey, "sk-gw-") {
return nil, ErrUnauthorized
}
}
if a.ServerMainBaseURL == "" || a.ServerMainInternalToken == "" {
return nil, ErrUnauthorized
}

View File

@ -41,7 +41,7 @@ func Load() Config {
),
TaskProgressCallbackTimeoutMS: env("TASK_PROGRESS_CALLBACK_TIMEOUT_MS", "5000"),
TaskProgressCallbackMaxAttempts: env("TASK_PROGRESS_CALLBACK_MAX_ATTEMPTS", "10"),
CORSAllowedOrigin: env("CORS_ALLOWED_ORIGIN", "http://localhost:5178"),
CORSAllowedOrigin: env("CORS_ALLOWED_ORIGIN", "http://localhost:5178,http://127.0.0.1:5178"),
LogLevel: logLevel(env("LOG_LEVEL", "info")),
}
}

View File

@ -0,0 +1,298 @@
package httpapi
import (
"bytes"
"context"
"encoding/json"
"io"
"log/slog"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"testing"
"time"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/config"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/store"
"github.com/jackc/pgx/v5/pgxpool"
)
func TestCoreLocalFlow(t *testing.T) {
databaseURL := strings.TrimSpace(os.Getenv("AI_GATEWAY_TEST_DATABASE_URL"))
if databaseURL == "" {
t.Skip("set AI_GATEWAY_TEST_DATABASE_URL to run the PostgreSQL integration flow")
}
ctx := context.Background()
applyMigration(t, ctx, databaseURL)
db, err := store.Connect(ctx, databaseURL)
if err != nil {
t.Fatalf("connect store: %v", err)
}
defer db.Close()
handler := NewServer(config.Config{
AppEnv: "test",
HTTPAddr: ":0",
DatabaseURL: databaseURL,
IdentityMode: "hybrid",
JWTSecret: "test-secret",
CORSAllowedOrigin: "*",
}, db, slog.New(slog.NewTextHandler(io.Discard, nil)))
server := httptest.NewServer(handler)
defer server.Close()
suffix := time.Now().UnixNano()
suffixText := strconv.FormatInt(suffix, 10)
username := "smoke_admin_" + suffixText
password := "password123"
var registerResponse struct {
AccessToken string `json:"accessToken"`
}
doJSON(t, server.URL, http.MethodPost, "/api/v1/auth/register", "", map[string]any{
"username": username,
"email": username + "@example.com",
"password": password,
"tenantKey": "manual-" + suffixText,
"tenantName": "Manual Tenant",
}, http.StatusCreated, &registerResponse)
if registerResponse.AccessToken == "" {
t.Fatal("register did not return access token")
}
var duplicateResponse map[string]any
doJSON(t, server.URL, http.MethodPost, "/api/v1/auth/register", "", map[string]any{
"username": username,
"email": username + "@example.com",
"password": password,
}, http.StatusConflict, &duplicateResponse)
if errorBody, ok := duplicateResponse["error"].(map[string]any); !ok || errorBody["message"] != "user already exists" {
t.Fatalf("unexpected duplicate response: %+v", duplicateResponse)
}
var loginResponse struct {
AccessToken string `json:"accessToken"`
}
doJSON(t, server.URL, http.MethodPost, "/api/v1/auth/login", "", map[string]any{
"account": username,
"password": password,
}, http.StatusOK, &loginResponse)
if loginResponse.AccessToken == "" {
t.Fatal("login did not return access token")
}
var apiKeyResponse struct {
Secret string `json:"secret"`
APIKey struct {
ID string `json:"id"`
KeyPrefix string `json:"keyPrefix"`
Status string `json:"status"`
} `json:"apiKey"`
}
doJSON(t, server.URL, http.MethodPost, "/api/v1/api-keys", loginResponse.AccessToken, map[string]any{
"name": "smoke key",
"scopes": []string{"chat", "image", "video"},
}, http.StatusCreated, &apiKeyResponse)
if !strings.HasPrefix(apiKeyResponse.Secret, "sk-gw-") || apiKeyResponse.APIKey.Status != "active" {
t.Fatalf("unexpected api key response: %+v", apiKeyResponse)
}
var me map[string]any
doJSON(t, server.URL, http.MethodGet, "/api/v1/me", apiKeyResponse.Secret, nil, http.StatusOK, &me)
if me["apiKeyId"] == "" {
t.Fatalf("api key auth did not expose apiKeyId: %+v", me)
}
if me["tenantKey"] != "default" {
t.Fatalf("register should ignore public tenant fields and use default tenant: %+v", me)
}
testPool, err := pgxpool.New(ctx, databaseURL)
if err != nil {
t.Fatalf("connect test pool: %v", err)
}
defer testPool.Close()
inviteCode := "INVITE-" + suffixText
if _, err := testPool.Exec(ctx, `
INSERT INTO gateway_invitations (invite_code, max_uses, metadata)
VALUES ($1, 5, '{"purpose":"core-flow"}'::jsonb)`, inviteCode); err != nil {
t.Fatalf("insert invitation: %v", err)
}
invitedUsername := "smoke_invited_" + suffixText
var invitedRegisterResponse struct {
AccessToken string `json:"accessToken"`
}
doJSON(t, server.URL, http.MethodPost, "/api/v1/auth/register", "", map[string]any{
"username": invitedUsername,
"email": invitedUsername + "@example.com",
"password": password,
"tenantKey": "manual-invited-" + suffixText,
"tenantName": "Manual Invited Tenant",
"invitationCode": inviteCode,
}, http.StatusCreated, &invitedRegisterResponse)
var invitedMe map[string]any
doJSON(t, server.URL, http.MethodGet, "/api/v1/me", invitedRegisterResponse.AccessToken, nil, http.StatusOK, &invitedMe)
if invitedMe["tenantKey"] != "default" {
t.Fatalf("invitation should not change tenant context: %+v", invitedMe)
}
var usedCount int
if err := testPool.QueryRow(ctx, `SELECT used_count FROM gateway_invitations WHERE invite_code = $1`, inviteCode).Scan(&usedCount); err != nil {
t.Fatalf("read invitation used_count: %v", err)
}
if usedCount != 1 {
t.Fatalf("invitation used_count = %d, want 1", usedCount)
}
var userMetadata []byte
if err := testPool.QueryRow(ctx, `SELECT metadata FROM gateway_users WHERE username = $1`, invitedUsername).Scan(&userMetadata); err != nil {
t.Fatalf("read invited user metadata: %v", err)
}
var metadata map[string]any
if err := json.Unmarshal(userMetadata, &metadata); err != nil {
t.Fatalf("decode invited user metadata: %v", err)
}
registration, ok := metadata["registration"].(map[string]any)
if !ok || registration["invitationCode"] != inviteCode {
t.Fatalf("invitation relationship was not recorded: %+v", metadata)
}
var platform struct {
ID string `json:"id"`
Provider string `json:"provider"`
PlatformKey string `json:"platformKey"`
Status string `json:"status"`
}
doJSON(t, server.URL, http.MethodPost, "/api/v1/platforms", loginResponse.AccessToken, map[string]any{
"provider": "openai",
"platformKey": "openai-smoke-" + suffixText,
"name": "OpenAI Smoke",
"baseUrl": "https://api.openai.com/v1",
"authType": "bearer",
"credentials": map[string]any{"mode": "simulation"},
"config": map[string]any{"testMode": true},
}, http.StatusCreated, &platform)
if platform.ID == "" || platform.Status != "enabled" {
t.Fatalf("unexpected platform response: %+v", platform)
}
var taskResponse struct {
Task struct {
ID string `json:"id"`
Status string `json:"status"`
RunMode string `json:"runMode"`
Result map[string]any `json:"result"`
} `json:"task"`
}
doJSON(t, server.URL, http.MethodPost, "/api/v1/chat/completions", apiKeyResponse.Secret, map[string]any{
"model": "gpt-4o-mini",
"runMode": "simulation",
"simulation": true,
"messages": []map[string]any{{"role": "user", "content": "ping"}},
}, http.StatusAccepted, &taskResponse)
if taskResponse.Task.ID == "" || taskResponse.Task.Status != "succeeded" || taskResponse.Task.RunMode != "simulation" {
t.Fatalf("unexpected task response: %+v", taskResponse.Task)
}
var taskDetail struct {
ID string `json:"id"`
Status string `json:"status"`
Result map[string]any `json:"result"`
}
doJSON(t, server.URL, http.MethodGet, "/api/v1/tasks/"+taskResponse.Task.ID, apiKeyResponse.Secret, nil, http.StatusOK, &taskDetail)
if taskDetail.Status != "succeeded" || taskDetail.Result["id"] == "" {
t.Fatalf("unexpected task detail: %+v", taskDetail)
}
req, err := http.NewRequest(http.MethodGet, server.URL+"/api/v1/tasks/"+taskResponse.Task.ID+"/events", nil)
if err != nil {
t.Fatalf("build events request: %v", err)
}
req.Header.Set("Authorization", "Bearer "+apiKeyResponse.Secret)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("events request: %v", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK || !bytes.Contains(body, []byte("task.completed")) {
t.Fatalf("unexpected events response status=%d body=%s", resp.StatusCode, string(body))
}
}
func TestOriginAllowedSupportsCommaSeparatedOrigins(t *testing.T) {
allowed := "http://localhost:5178, http://127.0.0.1:5178"
if !originAllowed("http://localhost:5178", allowed) {
t.Fatal("localhost origin should be allowed")
}
if !originAllowed("http://127.0.0.1:5178", allowed) {
t.Fatal("127.0.0.1 origin should be allowed")
}
if originAllowed("http://127.0.0.1:5179", allowed) {
t.Fatal("unexpected origin should not be allowed")
}
}
func applyMigration(t *testing.T, ctx context.Context, databaseURL string) {
t.Helper()
_, filename, _, _ := runtime.Caller(0)
migrationFiles, err := filepath.Glob(filepath.Join(filepath.Dir(filename), "..", "..", "migrations", "*.sql"))
if err != nil {
t.Fatalf("read migration files: %v", err)
}
sort.Strings(migrationFiles)
pool, err := pgxpool.New(ctx, databaseURL)
if err != nil {
t.Fatalf("connect migration db: %v", err)
}
defer pool.Close()
for _, migrationPath := range migrationFiles {
migration, err := os.ReadFile(migrationPath)
if err != nil {
t.Fatalf("read migration %s: %v", filepath.Base(migrationPath), err)
}
if _, err := pool.Exec(ctx, string(migration)); err != nil {
t.Fatalf("apply migration %s: %v", filepath.Base(migrationPath), err)
}
}
}
func doJSON(t *testing.T, baseURL string, method string, path string, token string, payload any, expectedStatus int, out any) {
t.Helper()
var body io.Reader
if payload != nil {
raw, err := json.Marshal(payload)
if err != nil {
t.Fatalf("marshal payload: %v", err)
}
body = bytes.NewReader(raw)
}
req, err := http.NewRequest(method, baseURL+path, body)
if err != nil {
t.Fatalf("build request: %v", err)
}
if payload != nil {
req.Header.Set("Content-Type", "application/json")
}
if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("%s %s: %v", method, path, err)
}
defer resp.Body.Close()
raw, _ := io.ReadAll(resp.Body)
if resp.StatusCode != expectedStatus {
t.Fatalf("%s %s status=%d want=%d body=%s", method, path, resp.StatusCode, expectedStatus, string(raw))
}
if out != nil && len(raw) > 0 {
if err := json.Unmarshal(raw, out); err != nil {
t.Fatalf("decode %s %s response: %v body=%s", method, path, err, string(raw))
}
}
}

View File

@ -54,8 +54,12 @@ func (s *Server) register(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusBadRequest, err.Error())
return
}
if errors.Is(err, store.ErrUserAlreadyExists) {
writeError(w, http.StatusConflict, err.Error())
return
}
s.logger.Error("register local user failed", "error", err)
writeError(w, http.StatusConflict, "user already exists or tenant is unavailable")
writeError(w, http.StatusInternalServerError, "register local user failed")
return
}
s.writeAuthResponse(w, http.StatusCreated, user)
@ -230,6 +234,56 @@ func (s *Server) listUserGroups(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{"items": items})
}
func (s *Server) listAPIKeys(w http.ResponseWriter, r *http.Request) {
user, _ := auth.UserFromContext(r.Context())
items, err := s.store.ListAPIKeys(r.Context(), user)
if err != nil {
s.logger.Error("list api keys failed", "error", err)
writeError(w, http.StatusInternalServerError, "list api keys failed")
return
}
writeJSON(w, http.StatusOK, map[string]any{"items": items})
}
func (s *Server) createAPIKey(w http.ResponseWriter, r *http.Request) {
user, _ := auth.UserFromContext(r.Context())
var input store.CreateAPIKeyInput
if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
created, err := s.store.CreateAPIKey(r.Context(), input, user)
if err != nil {
if errors.Is(err, store.ErrLocalUserRequired) {
writeError(w, http.StatusBadRequest, err.Error())
return
}
s.logger.Error("create api key failed", "error", err)
writeError(w, http.StatusInternalServerError, "create api key failed")
return
}
writeJSON(w, http.StatusCreated, created)
}
func (s *Server) disableAPIKey(w http.ResponseWriter, r *http.Request) {
user, _ := auth.UserFromContext(r.Context())
item, err := s.store.DisableAPIKey(r.Context(), r.PathValue("apiKeyID"), user)
if err == nil {
writeJSON(w, http.StatusOK, item)
return
}
if errors.Is(err, store.ErrLocalUserRequired) {
writeError(w, http.StatusBadRequest, err.Error())
return
}
if store.IsNotFound(err) {
writeError(w, http.StatusNotFound, "api key not found")
return
}
s.logger.Error("disable api key failed", "error", err)
writeError(w, http.StatusInternalServerError, "disable api key failed")
}
func (s *Server) estimatePricing(w http.ResponseWriter, r *http.Request) {
var body map[string]any
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
@ -275,6 +329,7 @@ func (s *Server) createTask(kind string) http.Handler {
task, err := s.store.CreateTask(r.Context(), store.CreateTaskInput{
Kind: kind,
Model: model,
RunMode: runModeFromRequest(body),
Request: body,
}, user)
if err != nil {
@ -322,24 +377,37 @@ func (s *Server) taskEvents(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
sendSSE(w, "task.accepted", map[string]any{
"taskId": task.ID,
"status": task.Status,
})
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
timer := time.NewTimer(250 * time.Millisecond)
defer timer.Stop()
select {
case <-r.Context().Done():
events, err := s.store.ListTaskEvents(r.Context(), task.ID)
if err != nil {
s.logger.Error("list task events failed", "error", err)
return
case <-timer.C:
sendSSE(w, "task.placeholder", map[string]any{
"taskId": task.ID,
"message": "runtime worker is not wired yet",
}
for _, event := range events {
sendSSE(w, event.EventType, event)
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
if len(events) == 0 {
sendSSE(w, "task.accepted", map[string]any{
"taskId": task.ID,
"status": task.Status,
})
}
}
func runModeFromRequest(body map[string]any) string {
if value, ok := body["runMode"].(string); ok {
return value
}
if value, ok := body["mode"].(string); ok {
return value
}
if value, ok := body["simulation"].(bool); ok && value {
return "simulation"
}
if value, ok := body["testMode"].(bool); ok && value {
return "simulation"
}
return ""
}

View File

@ -24,6 +24,7 @@ func NewServer(cfg config.Config, db *store.Store, logger *slog.Logger) http.Han
auth: auth.New(cfg.JWTSecret, cfg.ServerMainBaseURL, cfg.ServerMainInternalToken),
logger: logger,
}
server.auth.LocalAPIKeyVerifier = db.VerifyLocalAPIKey
mux := http.NewServeMux()
mux.HandleFunc("GET /healthz", server.health)
@ -37,6 +38,9 @@ func NewServer(cfg config.Config, db *store.Store, logger *slog.Logger) http.Han
mux.Handle("GET /api/v1/tenants", server.auth.Require(auth.PermissionPower, http.HandlerFunc(server.listTenants)))
mux.Handle("GET /api/v1/users", server.auth.Require(auth.PermissionPower, http.HandlerFunc(server.listUsers)))
mux.Handle("GET /api/v1/user-groups", server.auth.Require(auth.PermissionPower, http.HandlerFunc(server.listUserGroups)))
mux.Handle("GET /api/v1/api-keys", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.listAPIKeys)))
mux.Handle("POST /api/v1/api-keys", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.createAPIKey)))
mux.Handle("PATCH /api/v1/api-keys/{apiKeyID}/disable", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.disableAPIKey)))
mux.Handle("GET /api/v1/pricing/rules", server.auth.Require(auth.PermissionPower, http.HandlerFunc(server.listPricingRules)))
mux.Handle("POST /api/v1/pricing/estimate", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.estimatePricing)))
mux.Handle("GET /api/v1/platforms", server.auth.Require(auth.PermissionPower, http.HandlerFunc(server.listPlatforms)))
@ -55,7 +59,7 @@ func NewServer(cfg config.Config, db *store.Store, logger *slog.Logger) http.Han
func (s *Server) cors(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
origin := r.Header.Get("Origin")
if origin != "" && (s.cfg.CORSAllowedOrigin == "*" || strings.EqualFold(origin, s.cfg.CORSAllowedOrigin)) {
if origin != "" && originAllowed(origin, s.cfg.CORSAllowedOrigin) {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Vary", "Origin")
w.Header().Set("Access-Control-Allow-Credentials", "true")
@ -70,6 +74,16 @@ func (s *Server) cors(next http.Handler) http.Handler {
})
}
func originAllowed(origin string, allowed string) bool {
for _, item := range strings.Split(allowed, ",") {
item = strings.TrimSpace(item)
if item == "*" || strings.EqualFold(origin, item) {
return true
}
}
return false
}
func (s *Server) recover(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer func() {

View File

@ -2,6 +2,8 @@ package store
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
"strings"
@ -10,6 +12,7 @@ import (
"github.com/easyai/easyai-ai-gateway/apps/api/internal/auth"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/crypto/bcrypt"
)
@ -21,6 +24,8 @@ type Store struct {
var (
ErrInvalidCredentials = errors.New("invalid account or password")
ErrInvalidInvitation = errors.New("invalid or expired invitation code")
ErrLocalUserRequired = errors.New("local gateway user is required")
ErrUserAlreadyExists = errors.New("user already exists")
ErrWeakPassword = errors.New("password must be at least 8 characters")
)
@ -73,6 +78,37 @@ type CreatePlatformInput struct {
Priority int `json:"priority"`
}
type CreateAPIKeyInput struct {
Name string `json:"name"`
Scopes []string `json:"scopes"`
ExpiresAt string `json:"expiresAt"`
}
type APIKey struct {
ID string `json:"id"`
GatewayTenantID string `json:"gatewayTenantId,omitempty"`
GatewayUserID string `json:"gatewayUserId"`
TenantID string `json:"tenantId,omitempty"`
TenantKey string `json:"tenantKey,omitempty"`
UserID string `json:"userId,omitempty"`
KeyPrefix string `json:"keyPrefix"`
Name string `json:"name"`
Scopes []string `json:"scopes,omitempty"`
UserGroupID string `json:"userGroupId,omitempty"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"`
QuotaPolicy map[string]any `json:"quotaPolicy,omitempty"`
Status string `json:"status"`
ExpiresAt string `json:"expiresAt,omitempty"`
LastUsedAt string `json:"lastUsedAt,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type CreatedAPIKey struct {
APIKey APIKey `json:"apiKey"`
Secret string `json:"secret"`
}
type PlatformModel struct {
ID string `json:"id"`
PlatformID string `json:"platformId"`
@ -161,8 +197,6 @@ type LocalRegisterInput struct {
Email string `json:"email"`
Password string `json:"password"`
DisplayName string `json:"displayName"`
TenantKey string `json:"tenantKey"`
TenantName string `json:"tenantName"`
InvitationCode string `json:"invitationCode"`
}
@ -228,12 +262,14 @@ type RateLimitWindow struct {
type CreateTaskInput struct {
Kind string `json:"kind"`
Model string `json:"model"`
RunMode string `json:"runMode"`
Request map[string]any `json:"request"`
}
type GatewayTask struct {
ID string `json:"id"`
Kind string `json:"kind"`
RunMode string `json:"runMode"`
UserID string `json:"userId"`
GatewayUserID string `json:"gatewayUserId,omitempty"`
UserSource string `json:"userSource,omitempty"`
@ -252,6 +288,20 @@ type GatewayTask struct {
UpdatedAt time.Time `json:"updatedAt"`
}
type TaskEvent struct {
ID string `json:"id"`
TaskID string `json:"taskId"`
Seq int64 `json:"seq"`
EventType string `json:"eventType"`
Status string `json:"status,omitempty"`
Phase string `json:"phase,omitempty"`
Progress float64 `json:"progress,omitempty"`
Message string `json:"message,omitempty"`
Payload map[string]any `json:"payload,omitempty"`
Simulated bool `json:"simulated"`
CreatedAt time.Time `json:"createdAt"`
}
func (s *Store) ListPlatforms(ctx context.Context) ([]Platform, error) {
rows, err := s.pool.Query(ctx, `
SELECT id::text, provider, platform_key, name, COALESCE(base_url, ''), auth_type, status, priority,
@ -656,6 +706,216 @@ ORDER BY priority ASC, group_key ASC`)
return items, rows.Err()
}
func (s *Store) ListAPIKeys(ctx context.Context, user *auth.User) ([]APIKey, error) {
gatewayUserID := localGatewayUserID(user)
if gatewayUserID == "" {
return []APIKey{}, nil
}
rows, err := s.pool.Query(ctx, `
SELECT id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text,
COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''),
key_prefix, name, scopes, COALESCE(user_group_id::text, ''),
rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''),
COALESCE(last_used_at::text, ''), created_at, updated_at
FROM gateway_api_keys
WHERE gateway_user_id = $1::uuid AND deleted_at IS NULL
ORDER BY created_at DESC`, gatewayUserID)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]APIKey, 0)
for rows.Next() {
item, err := scanAPIKey(rows)
if err != nil {
return nil, err
}
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) CreateAPIKey(ctx context.Context, input CreateAPIKeyInput, user *auth.User) (CreatedAPIKey, error) {
gatewayUserID := localGatewayUserID(user)
if gatewayUserID == "" {
return CreatedAPIKey{}, ErrLocalUserRequired
}
name := strings.TrimSpace(input.Name)
if name == "" {
name = "Default API Key"
}
scopes := input.Scopes
if len(scopes) == 0 {
scopes = []string{"chat", "image", "video"}
}
secret, err := generateAPIKeySecret()
if err != nil {
return CreatedAPIKey{}, err
}
keyHash, err := bcrypt.GenerateFromPassword([]byte(secret), bcrypt.DefaultCost)
if err != nil {
return CreatedAPIKey{}, err
}
scopesJSON, err := json.Marshal(scopes)
if err != nil {
return CreatedAPIKey{}, err
}
var item APIKey
var scopesBytes []byte
var rateLimitPolicy []byte
var quotaPolicy []byte
err = s.pool.QueryRow(ctx, `
INSERT INTO gateway_api_keys (
gateway_tenant_id, gateway_user_id, tenant_id, tenant_key, user_id,
key_prefix, key_hash, name, scopes, expires_at
)
VALUES (NULLIF($1, '')::uuid, $2::uuid, NULLIF($3, ''), NULLIF($4, ''), NULLIF($5, ''),
$6, $7, $8, $9::jsonb, NULLIF($10, '')::timestamptz)
RETURNING id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text,
COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''),
key_prefix, name, scopes, COALESCE(user_group_id::text, ''),
rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''),
COALESCE(last_used_at::text, ''), created_at, updated_at`,
user.GatewayTenantID, gatewayUserID, user.TenantID, user.TenantKey, user.ID,
apiKeyPrefix(secret), string(keyHash), name, string(scopesJSON), strings.TrimSpace(input.ExpiresAt),
).Scan(
&item.ID,
&item.GatewayTenantID,
&item.GatewayUserID,
&item.TenantID,
&item.TenantKey,
&item.UserID,
&item.KeyPrefix,
&item.Name,
&scopesBytes,
&item.UserGroupID,
&rateLimitPolicy,
&quotaPolicy,
&item.Status,
&item.ExpiresAt,
&item.LastUsedAt,
&item.CreatedAt,
&item.UpdatedAt,
)
if err != nil {
return CreatedAPIKey{}, err
}
item.Scopes = decodeStringArray(scopesBytes)
item.RateLimitPolicy = decodeObject(rateLimitPolicy)
item.QuotaPolicy = decodeObject(quotaPolicy)
return CreatedAPIKey{APIKey: item, Secret: secret}, nil
}
func (s *Store) DisableAPIKey(ctx context.Context, apiKeyID string, user *auth.User) (APIKey, error) {
gatewayUserID := localGatewayUserID(user)
if gatewayUserID == "" {
return APIKey{}, ErrLocalUserRequired
}
var item APIKey
var scopesBytes []byte
var rateLimitPolicy []byte
var quotaPolicy []byte
err := s.pool.QueryRow(ctx, `
UPDATE gateway_api_keys
SET status = 'disabled', updated_at = now()
WHERE id = $1::uuid AND gateway_user_id = $2::uuid AND deleted_at IS NULL
RETURNING id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text,
COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''),
key_prefix, name, scopes, COALESCE(user_group_id::text, ''),
rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''),
COALESCE(last_used_at::text, ''), created_at, updated_at`,
apiKeyID, gatewayUserID,
).Scan(
&item.ID,
&item.GatewayTenantID,
&item.GatewayUserID,
&item.TenantID,
&item.TenantKey,
&item.UserID,
&item.KeyPrefix,
&item.Name,
&scopesBytes,
&item.UserGroupID,
&rateLimitPolicy,
&quotaPolicy,
&item.Status,
&item.ExpiresAt,
&item.LastUsedAt,
&item.CreatedAt,
&item.UpdatedAt,
)
if err != nil {
return APIKey{}, err
}
item.Scopes = decodeStringArray(scopesBytes)
item.RateLimitPolicy = decodeObject(rateLimitPolicy)
item.QuotaPolicy = decodeObject(quotaPolicy)
return item, nil
}
func (s *Store) VerifyLocalAPIKey(ctx context.Context, secret string) (*auth.User, error) {
prefix := apiKeyPrefix(secret)
if prefix == "" {
return nil, auth.ErrUnauthorized
}
rows, err := s.pool.Query(ctx, `
SELECT k.id::text, k.key_hash, COALESCE(k.user_group_id::text, ''),
u.id::text, u.username, u.roles, COALESCE(u.gateway_tenant_id::text, ''),
COALESCE(u.tenant_id, ''), COALESCE(u.tenant_key, '')
FROM gateway_api_keys k
JOIN gateway_users u ON u.id = k.gateway_user_id
WHERE k.key_prefix = $1
AND k.status = 'active'
AND k.deleted_at IS NULL
AND u.status = 'active'
AND u.deleted_at IS NULL
AND (k.expires_at IS NULL OR k.expires_at > now())`, prefix)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var apiKeyID string
var hash string
var userGroupID string
var gatewayUserID string
var username string
var rolesBytes []byte
var gatewayTenantID string
var tenantID string
var tenantKey string
if err := rows.Scan(&apiKeyID, &hash, &userGroupID, &gatewayUserID, &username, &rolesBytes, &gatewayTenantID, &tenantID, &tenantKey); err != nil {
return nil, err
}
if bcrypt.CompareHashAndPassword([]byte(hash), []byte(secret)) != nil {
continue
}
if _, err := s.pool.Exec(ctx, `UPDATE gateway_api_keys SET last_used_at = now(), updated_at = now() WHERE id = $1::uuid`, apiKeyID); err != nil {
return nil, err
}
return &auth.User{
ID: gatewayUserID,
Username: username,
Roles: decodeStringArray(rolesBytes),
TenantID: tenantID,
GatewayTenantID: gatewayTenantID,
TenantKey: tenantKey,
Source: "gateway",
GatewayUserID: gatewayUserID,
UserGroupID: userGroupID,
APIKeyID: apiKeyID,
APIKeyName: prefix,
}, nil
}
if err := rows.Err(); err != nil {
return nil, err
}
return nil, auth.ErrUnauthorized
}
func (s *Store) RegisterLocalUser(ctx context.Context, input LocalRegisterInput) (GatewayUser, error) {
account := normalizeAccount(firstNonEmpty(input.Username, input.Email))
if account == "" {
@ -664,14 +924,8 @@ func (s *Store) RegisterLocalUser(ctx context.Context, input LocalRegisterInput)
if len(input.Password) < 8 {
return GatewayUser{}, ErrWeakPassword
}
tenantKey := normalizeKey(input.TenantKey)
if tenantKey == "" {
tenantKey = "personal-" + normalizeKey(account)
}
tenantName := strings.TrimSpace(input.TenantName)
if tenantName == "" {
tenantName = tenantKey
}
tenantKey := "default"
tenantName := "Default Tenant"
displayName := strings.TrimSpace(input.DisplayName)
username := strings.TrimSpace(input.Username)
if username == "" {
@ -695,61 +949,79 @@ func (s *Store) RegisterLocalUser(ctx context.Context, input LocalRegisterInput)
userGroupID := ""
role := "user"
invitationID := ""
invitedBy := ""
var isBootstrapUser bool
if err := tx.QueryRow(ctx, `
SELECT NOT EXISTS (
SELECT 1 FROM gateway_users WHERE source = 'gateway' AND deleted_at IS NULL
)`).Scan(&isBootstrapUser); err != nil {
return GatewayUser{}, err
}
if isBootstrapUser {
role = "admin"
}
if err := tx.QueryRow(ctx, `
INSERT INTO gateway_tenants (tenant_key, source, external_tenant_id, name)
VALUES ($1, 'gateway', $1, $2)
ON CONFLICT (tenant_key) DO UPDATE SET updated_at=now()
RETURNING id::text`,
tenantKey, tenantName,
).Scan(&tenantID); err != nil {
return GatewayUser{}, err
}
if invitationCode != "" {
if err := tx.QueryRow(ctx, `
SELECT i.id::text,
i.tenant_id::text,
t.tenant_key,
t.name,
COALESCE(i.user_group_id::text, t.default_user_group_id::text, ''),
COALESCE(NULLIF(i.role, ''), 'user')
FROM gateway_tenant_invitations i
JOIN gateway_tenants t ON t.id = i.tenant_id
SELECT i.id::text, COALESCE(i.created_by::text, '')
FROM gateway_invitations i
WHERE lower(i.invite_code) = lower($1)
AND i.status = 'active'
AND t.status = 'active'
AND (i.expires_at IS NULL OR i.expires_at > now())
AND (i.max_uses IS NULL OR i.used_count < i.max_uses)
FOR UPDATE OF i`,
invitationCode,
).Scan(&invitationID, &tenantID, &tenantKey, &tenantName, &userGroupID, &role); err != nil {
).Scan(&invitationID, &invitedBy); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return GatewayUser{}, ErrInvalidInvitation
}
return GatewayUser{}, err
}
} else if err := tx.QueryRow(ctx, `
INSERT INTO gateway_tenants (tenant_key, source, external_tenant_id, name)
VALUES ($1, 'gateway', $1, $2)
ON CONFLICT (tenant_key) DO UPDATE SET updated_at=now()
RETURNING id::text`,
tenantKey, tenantName,
).Scan(&tenantID); err != nil {
return GatewayUser{}, err
}
rolesJSON, err := json.Marshal([]string{role})
if err != nil {
return GatewayUser{}, err
}
userMetadataJSON := []byte("{}")
if invitationID != "" {
userMetadataJSON, err = json.Marshal(map[string]any{
"registration": map[string]any{
"invitationId": invitationID,
"invitationCode": invitationCode,
"invitedBy": invitedBy,
},
})
if err != nil {
return GatewayUser{}, err
}
}
var user GatewayUser
var roles []byte
var authProfile []byte
var metadata []byte
if err := tx.QueryRow(ctx, `
INSERT INTO gateway_users (
user_key, source, external_user_id, username, display_name, email,
password_hash, gateway_tenant_id, tenant_id, tenant_key, default_user_group_id, roles, status
)
VALUES ($1, 'gateway', $2, $3, NULLIF($4, ''), NULLIF($5, ''), $6, $7::uuid, $8, $8, NULLIF($9, '')::uuid, $10::jsonb, 'active')
RETURNING id::text, user_key, source, COALESCE(external_user_id, ''), username,
COALESCE(display_name, ''), COALESCE(email, ''), COALESCE(phone, ''), COALESCE(avatar_url, ''),
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(default_user_group_id::text, ''), roles, auth_profile, metadata,
status, COALESCE(last_login_at::text, ''), COALESCE(synced_at::text, ''), COALESCE(source_updated_at::text, ''),
created_at, updated_at`,
"gateway:"+account, account, username, displayName, email, string(passwordHash), tenantID, tenantKey, userGroupID, string(rolesJSON),
INSERT INTO gateway_users (
user_key, source, external_user_id, username, display_name, email,
password_hash, gateway_tenant_id, tenant_id, tenant_key, default_user_group_id, roles, metadata, status
)
VALUES ($1, 'gateway', $2, $3, NULLIF($4, ''), NULLIF($5, ''), $6, $7::uuid, $8, $8, NULLIF($9, '')::uuid, $10::jsonb, $11::jsonb, 'active')
RETURNING id::text, user_key, source, COALESCE(external_user_id, ''), username,
COALESCE(display_name, ''), COALESCE(email, ''), COALESCE(phone, ''), COALESCE(avatar_url, ''),
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(default_user_group_id::text, ''), roles, auth_profile, metadata,
status, COALESCE(last_login_at::text, ''), COALESCE(synced_at::text, ''), COALESCE(source_updated_at::text, ''),
created_at, updated_at`,
"gateway:"+account, account, username, displayName, email, string(passwordHash), tenantID, tenantKey, userGroupID, string(rolesJSON), string(userMetadataJSON),
).Scan(
&user.ID,
&user.UserKey,
@ -774,33 +1046,28 @@ FOR UPDATE OF i`,
&user.CreatedAt,
&user.UpdatedAt,
); err != nil {
if isUniqueViolation(err) {
return GatewayUser{}, ErrUserAlreadyExists
}
return GatewayUser{}, err
}
if invitationID != "" {
if _, err := tx.Exec(ctx, `
UPDATE gateway_tenant_invitations
UPDATE gateway_invitations
SET used_count = used_count + 1, updated_at = now()
WHERE id = $1::uuid`, invitationID); err != nil {
return GatewayUser{}, err
}
}
if userGroupID != "" {
metadata, err := json.Marshal(map[string]any{
"source": "registration",
"invitationId": invitationID,
})
if err != nil {
return GatewayUser{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO gateway_user_group_memberships (group_id, principal_type, principal_id, source, metadata)
VALUES ($1::uuid, 'user', $2, 'gateway', $3::jsonb)
ON CONFLICT (group_id, principal_type, principal_id)
DO UPDATE SET status = 'active', updated_at = now()`,
userGroupID, user.ID, string(metadata),
); err != nil {
return GatewayUser{}, err
}
if _, err := tx.Exec(ctx, `
INSERT INTO gateway_wallet_accounts (
gateway_tenant_id, gateway_user_id, tenant_id, tenant_key, user_id, currency
)
VALUES ($1::uuid, $2::uuid, $3, $3, $4, 'resource')
ON CONFLICT (gateway_user_id, currency) DO NOTHING`,
user.GatewayTenantID, user.ID, user.TenantKey, user.ID,
); err != nil {
return GatewayUser{}, err
}
if err := tx.Commit(ctx); err != nil {
return GatewayUser{}, err
@ -913,24 +1180,58 @@ ORDER BY window_start DESC, scope_type ASC, scope_key ASC, metric ASC`)
func (s *Store) CreateTask(ctx context.Context, input CreateTaskInput, user *auth.User) (GatewayTask, error) {
requestBody, _ := json.Marshal(input.Request)
runMode := normalizeRunMode(input.RunMode, input.Request)
status := "queued"
result := map[string]any(nil)
billings := []any(nil)
finished := false
if runMode == "simulation" {
status = "succeeded"
result = simulationResult(input.Kind, input.Model)
billings = simulationBillings(input.Kind, input.Model)
finished = true
}
resultBody, _ := json.Marshal(result)
billingsBody, _ := json.Marshal(billings)
tx, err := s.pool.Begin(ctx)
if err != nil {
return GatewayTask{}, err
}
defer tx.Rollback(ctx)
var task GatewayTask
var requestBytes []byte
var resultBytes []byte
var billingsBytes []byte
err := s.pool.QueryRow(ctx, `
INSERT INTO gateway_tasks (
kind, user_id, gateway_user_id, user_source, gateway_tenant_id, tenant_id, tenant_key,
api_key_id, user_group_id, user_group_key, model, request, status
)
VALUES ($1, $2, NULLIF($3, '')::uuid, COALESCE(NULLIF($4, ''), 'gateway'), NULLIF($5, '')::uuid, NULLIF($6, ''), NULLIF($7, ''), NULLIF($8, ''), NULLIF($9, '')::uuid, NULLIF($10, ''), $11, $12, 'queued')
RETURNING id::text, kind, user_id, COALESCE(gateway_user_id::text, ''), user_source,
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model, request, status, result, billings, COALESCE(error, ''), created_at, updated_at`,
input.Kind, user.ID, user.GatewayUserID, user.Source, user.GatewayTenantID, user.TenantID, user.TenantKey, user.APIKeyID, user.UserGroupID, user.UserGroupKey, input.Model, requestBody,
).Scan(&task.ID, &task.Kind, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt)
err = tx.QueryRow(ctx, `
INSERT INTO gateway_tasks (
kind, run_mode, user_id, gateway_user_id, user_source, gateway_tenant_id, tenant_id, tenant_key,
api_key_id, user_group_id, user_group_key, model, request, status, result, billings, finished_at
)
VALUES ($1, $2, $3, NULLIF($4, '')::uuid, COALESCE(NULLIF($5, ''), 'gateway'), NULLIF($6, '')::uuid, NULLIF($7, ''), NULLIF($8, ''), NULLIF($9, ''), NULLIF($10, '')::uuid, NULLIF($11, ''), $12, $13, $14, $15::jsonb, $16::jsonb, CASE WHEN $17 THEN now() ELSE NULL END)
RETURNING id::text, kind, run_mode, user_id, COALESCE(gateway_user_id::text, ''), user_source,
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model, request, status, result, billings, COALESCE(error, ''), created_at, updated_at`,
input.Kind, runMode, user.ID, user.GatewayUserID, user.Source, user.GatewayTenantID, user.TenantID, user.TenantKey, user.APIKeyID, user.UserGroupID, user.UserGroupKey, input.Model, requestBody, status, resultBody, billingsBody, finished,
).Scan(&task.ID, &task.Kind, &task.RunMode, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt)
if err != nil {
return GatewayTask{}, err
}
events := taskEventsForCreate(task.ID, runMode, status, result)
for _, event := range events {
payload, _ := json.Marshal(event.Payload)
if _, err := tx.Exec(ctx, `
INSERT INTO gateway_task_events (task_id, seq, event_type, status, phase, progress, message, payload, simulated)
VALUES ($1::uuid, $2, $3, NULLIF($4, ''), NULLIF($5, ''), $6, NULLIF($7, ''), $8::jsonb, $9)`,
task.ID, event.Seq, event.EventType, event.Status, event.Phase, event.Progress, event.Message, string(payload), event.Simulated,
); err != nil {
return GatewayTask{}, err
}
}
if err := tx.Commit(ctx); err != nil {
return GatewayTask{}, err
}
task.Request = decodeObject(requestBytes)
task.Result = decodeObject(resultBytes)
task.Billings = decodeArray(billingsBytes)
@ -943,12 +1244,12 @@ func (s *Store) GetTask(ctx context.Context, taskID string) (GatewayTask, error)
var resultBytes []byte
var billingsBytes []byte
err := s.pool.QueryRow(ctx, `
SELECT id::text, kind, user_id, COALESCE(gateway_user_id::text, ''), user_source,
SELECT id::text, kind, run_mode, user_id, COALESCE(gateway_user_id::text, ''), user_source,
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model, request, status, result, billings, COALESCE(error, ''), created_at, updated_at
FROM gateway_tasks
WHERE id=$1`, taskID,
).Scan(&task.ID, &task.Kind, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt)
FROM gateway_tasks
WHERE id=$1`, taskID,
).Scan(&task.ID, &task.Kind, &task.RunMode, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt)
if err != nil {
return GatewayTask{}, err
}
@ -958,10 +1259,239 @@ WHERE id=$1`, taskID,
return task, nil
}
func (s *Store) ListTaskEvents(ctx context.Context, taskID string) ([]TaskEvent, error) {
rows, err := s.pool.Query(ctx, `
SELECT id::text, task_id::text, seq, event_type, COALESCE(status, ''), COALESCE(phase, ''),
COALESCE(progress, 0)::float8, COALESCE(message, ''), payload, simulated, created_at
FROM gateway_task_events
WHERE task_id = $1::uuid
ORDER BY seq ASC`, taskID)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]TaskEvent, 0)
for rows.Next() {
var item TaskEvent
var payload []byte
if err := rows.Scan(
&item.ID,
&item.TaskID,
&item.Seq,
&item.EventType,
&item.Status,
&item.Phase,
&item.Progress,
&item.Message,
&payload,
&item.Simulated,
&item.CreatedAt,
); err != nil {
return nil, err
}
item.Payload = decodeObject(payload)
items = append(items, item)
}
return items, rows.Err()
}
func IsNotFound(err error) bool {
return err == pgx.ErrNoRows
}
func isUniqueViolation(err error) bool {
var pgErr *pgconn.PgError
return errors.As(err, &pgErr) && pgErr.Code == "23505"
}
type apiKeyScanner interface {
Scan(dest ...any) error
}
func scanAPIKey(scanner apiKeyScanner) (APIKey, error) {
var item APIKey
var scopesBytes []byte
var rateLimitPolicy []byte
var quotaPolicy []byte
if err := scanner.Scan(
&item.ID,
&item.GatewayTenantID,
&item.GatewayUserID,
&item.TenantID,
&item.TenantKey,
&item.UserID,
&item.KeyPrefix,
&item.Name,
&scopesBytes,
&item.UserGroupID,
&rateLimitPolicy,
&quotaPolicy,
&item.Status,
&item.ExpiresAt,
&item.LastUsedAt,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return APIKey{}, err
}
item.Scopes = decodeStringArray(scopesBytes)
item.RateLimitPolicy = decodeObject(rateLimitPolicy)
item.QuotaPolicy = decodeObject(quotaPolicy)
return item, nil
}
func localGatewayUserID(user *auth.User) string {
if user == nil {
return ""
}
if user.GatewayUserID != "" {
return user.GatewayUserID
}
if user.Source == "" || user.Source == "gateway" {
return user.ID
}
return ""
}
func generateAPIKeySecret() (string, error) {
bytes := make([]byte, 32)
if _, err := rand.Read(bytes); err != nil {
return "", err
}
return "sk-gw-" + base64.RawURLEncoding.EncodeToString(bytes), nil
}
func apiKeyPrefix(secret string) string {
secret = strings.TrimSpace(secret)
if !strings.HasPrefix(secret, "sk-gw-") {
return ""
}
if len(secret) <= 18 {
return secret
}
return secret[:18]
}
func normalizeRunMode(input string, request map[string]any) string {
mode := strings.ToLower(strings.TrimSpace(input))
if mode == "" && request != nil {
if raw, ok := request["runMode"].(string); ok {
mode = strings.ToLower(strings.TrimSpace(raw))
}
if raw, ok := request["mode"].(string); ok && mode == "" {
mode = strings.ToLower(strings.TrimSpace(raw))
}
if raw, ok := request["simulation"].(bool); ok && raw {
mode = "simulation"
}
if raw, ok := request["testMode"].(bool); ok && raw {
mode = "simulation"
}
}
if mode == "test" || mode == "dry-run" || mode == "dry_run" {
return "simulation"
}
if mode == "simulation" {
return "simulation"
}
return "production"
}
func simulationResult(kind string, model string) map[string]any {
switch kind {
case "chat.completions":
return map[string]any{
"id": "chatcmpl-simulated",
"object": "chat.completion",
"model": model,
"choices": []any{map[string]any{"index": 0, "finish_reason": "stop", "message": map[string]any{"role": "assistant", "content": "simulation response"}}},
"usage": map[string]any{"prompt_tokens": 12, "completion_tokens": 8, "total_tokens": 20},
}
case "images.generations":
return map[string]any{
"id": "img-simulated",
"model": model,
"data": []any{map[string]any{"url": "/static/simulation/image.png", "revised_prompt": "simulation image"}},
}
case "videos.generations":
return map[string]any{
"id": "video-simulated",
"model": model,
"data": []any{map[string]any{"url": "/static/simulation/video.mp4", "duration": 5}},
}
default:
return map[string]any{"id": "task-simulated", "model": model, "kind": kind, "ok": true}
}
}
func simulationBillings(kind string, model string) []any {
resourceType := "task"
unit := "item"
if kind == "chat.completions" {
resourceType = "text_total"
unit = "1k_tokens"
}
if kind == "images.generations" {
resourceType = "image"
unit = "image"
}
if kind == "videos.generations" {
resourceType = "video"
unit = "second"
}
return []any{map[string]any{
"model": model,
"resourceType": resourceType,
"unit": unit,
"quantity": 1,
"amount": 0,
"currency": "resource",
"simulated": true,
}}
}
func taskEventsForCreate(taskID string, runMode string, status string, result map[string]any) []TaskEvent {
events := []TaskEvent{{
TaskID: taskID,
Seq: 1,
EventType: "task.accepted",
Status: "queued",
Phase: "queued",
Progress: 0,
Message: "task accepted",
Payload: map[string]any{"taskId": taskID},
Simulated: runMode == "simulation",
}}
if runMode != "simulation" {
return events
}
return append(events,
TaskEvent{
TaskID: taskID,
Seq: 2,
EventType: "task.progress",
Status: "running",
Phase: "simulation",
Progress: 0.5,
Message: "simulation client running",
Payload: map[string]any{"taskId": taskID},
Simulated: true,
},
TaskEvent{
TaskID: taskID,
Seq: 3,
EventType: "task.completed",
Status: status,
Phase: "completed",
Progress: 1,
Message: "simulation completed",
Payload: map[string]any{"taskId": taskID, "result": result},
Simulated: true,
},
)
}
func decodeObject(bytes []byte) map[string]any {
if len(bytes) == 0 {
return nil

View File

@ -209,12 +209,9 @@ CREATE INDEX IF NOT EXISTS idx_user_group_membership_principal
CREATE INDEX IF NOT EXISTS idx_user_group_membership_effective
ON gateway_user_group_memberships(effective_from, effective_to);
CREATE TABLE IF NOT EXISTS gateway_tenant_invitations (
CREATE TABLE IF NOT EXISTS gateway_invitations (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id uuid NOT NULL REFERENCES gateway_tenants(id) ON DELETE CASCADE,
invite_code text NOT NULL UNIQUE,
role text NOT NULL DEFAULT 'user',
user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL,
max_uses integer,
used_count integer NOT NULL DEFAULT 0,
expires_at timestamptz,
@ -225,11 +222,11 @@ CREATE TABLE IF NOT EXISTS gateway_tenant_invitations (
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_gateway_invitations_tenant
ON gateway_tenant_invitations(tenant_id, status);
CREATE INDEX IF NOT EXISTS idx_gateway_invitations_status
ON gateway_invitations(status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_gateway_invitations_expiry
ON gateway_tenant_invitations(expires_at);
ON gateway_invitations(expires_at);
CREATE TABLE IF NOT EXISTS gateway_api_keys (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),

View File

@ -0,0 +1,634 @@
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE TABLE IF NOT EXISTS model_catalog_providers (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
provider_key text NOT NULL UNIQUE,
display_name text NOT NULL,
provider_type text NOT NULL DEFAULT 'openai_compatible',
capability_schema jsonb NOT NULL DEFAULT '{}'::jsonb,
default_rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'active',
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS base_model_catalog (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
provider_id uuid REFERENCES model_catalog_providers(id) ON DELETE SET NULL,
provider_key text NOT NULL,
canonical_model_key text NOT NULL UNIQUE,
provider_model_name text NOT NULL,
model_type text NOT NULL,
display_name text NOT NULL,
capabilities jsonb NOT NULL DEFAULT '{}'::jsonb,
base_billing_config jsonb NOT NULL DEFAULT '{}'::jsonb,
default_rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
pricing_version integer NOT NULL DEFAULT 1,
status text NOT NULL DEFAULT 'active',
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS integration_platforms (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
provider text NOT NULL,
platform_key text NOT NULL UNIQUE DEFAULT ('platform_' || replace(gen_random_uuid()::text, '-', '')),
name text NOT NULL,
base_url text,
auth_type text NOT NULL DEFAULT 'bearer',
credentials jsonb NOT NULL DEFAULT '{}'::jsonb,
config jsonb NOT NULL DEFAULT '{}'::jsonb,
visibility_scope text NOT NULL DEFAULT 'global',
tenant_id text,
tenant_key text,
default_pricing_mode text NOT NULL DEFAULT 'inherit_discount',
default_discount_factor numeric NOT NULL DEFAULT 1,
retry_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
priority integer NOT NULL DEFAULT 100,
dynamic_priority integer,
status text NOT NULL DEFAULT 'enabled',
disabled_reason text,
cooldown_until timestamptz,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
deleted_at timestamptz
);
ALTER TABLE IF EXISTS integration_platforms
ADD COLUMN IF NOT EXISTS platform_key text,
ADD COLUMN IF NOT EXISTS visibility_scope text NOT NULL DEFAULT 'global',
ADD COLUMN IF NOT EXISTS tenant_id text,
ADD COLUMN IF NOT EXISTS tenant_key text,
ADD COLUMN IF NOT EXISTS default_pricing_mode text NOT NULL DEFAULT 'inherit_discount',
ADD COLUMN IF NOT EXISTS default_discount_factor numeric NOT NULL DEFAULT 1,
ADD COLUMN IF NOT EXISTS retry_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS dynamic_priority integer,
ADD COLUMN IF NOT EXISTS disabled_reason text,
ADD COLUMN IF NOT EXISTS cooldown_until timestamptz,
ADD COLUMN IF NOT EXISTS deleted_at timestamptz;
UPDATE integration_platforms
SET platform_key = 'platform_' || replace(id::text, '-', '')
WHERE platform_key IS NULL OR platform_key = '';
ALTER TABLE IF EXISTS integration_platforms
ALTER COLUMN platform_key SET DEFAULT ('platform_' || replace(gen_random_uuid()::text, '-', '')),
ALTER COLUMN platform_key SET NOT NULL;
CREATE TABLE IF NOT EXISTS model_pricing_rules (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
scope_type text NOT NULL,
scope_id uuid,
resource_type text NOT NULL,
unit text NOT NULL,
base_price numeric NOT NULL,
currency text NOT NULL DEFAULT 'resource',
base_weight jsonb NOT NULL DEFAULT '{}'::jsonb,
dynamic_weight jsonb NOT NULL DEFAULT '{}'::jsonb,
effective_from timestamptz,
effective_to timestamptz,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS gateway_user_groups (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
group_key text NOT NULL UNIQUE,
name text NOT NULL,
description text,
source text NOT NULL DEFAULT 'gateway',
priority integer NOT NULL DEFAULT 100,
recharge_discount_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
billing_discount_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
quota_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'active',
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS gateway_tenants (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_key text NOT NULL UNIQUE,
source text NOT NULL DEFAULT 'gateway',
external_tenant_id text,
name text NOT NULL,
description text,
default_user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL,
plan_key text,
billing_profile jsonb NOT NULL DEFAULT '{}'::jsonb,
rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
auth_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'active',
synced_at timestamptz,
source_updated_at timestamptz,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
deleted_at timestamptz,
UNIQUE(source, external_tenant_id)
);
CREATE TABLE IF NOT EXISTS gateway_users (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
user_key text NOT NULL UNIQUE,
source text NOT NULL DEFAULT 'gateway',
external_user_id text,
username text NOT NULL,
display_name text,
email text,
phone text,
avatar_url text,
password_hash text,
gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL,
tenant_id text,
tenant_key text,
default_user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL,
roles jsonb NOT NULL DEFAULT '[]'::jsonb,
auth_profile jsonb NOT NULL DEFAULT '{}'::jsonb,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'active',
last_login_at timestamptz,
synced_at timestamptz,
source_updated_at timestamptz,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
deleted_at timestamptz,
UNIQUE(source, external_user_id)
);
CREATE TABLE IF NOT EXISTS gateway_user_group_memberships (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
group_id uuid NOT NULL REFERENCES gateway_user_groups(id) ON DELETE CASCADE,
principal_type text NOT NULL,
principal_id text NOT NULL,
source text NOT NULL DEFAULT 'gateway',
priority integer NOT NULL DEFAULT 100,
effective_from timestamptz,
effective_to timestamptz,
status text NOT NULL DEFAULT 'active',
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
UNIQUE(group_id, principal_type, principal_id)
);
DO $$
BEGIN
IF to_regclass('public.gateway_tenant_invitations') IS NOT NULL
AND to_regclass('public.gateway_invitations') IS NULL THEN
ALTER TABLE gateway_tenant_invitations RENAME TO gateway_invitations;
END IF;
END $$;
DROP INDEX IF EXISTS idx_gateway_invitations_tenant;
CREATE TABLE IF NOT EXISTS gateway_invitations (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
invite_code text NOT NULL UNIQUE,
max_uses integer,
used_count integer NOT NULL DEFAULT 0,
expires_at timestamptz,
status text NOT NULL DEFAULT 'active',
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_by uuid REFERENCES gateway_users(id) ON DELETE SET NULL,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
ALTER TABLE IF EXISTS gateway_invitations
DROP COLUMN IF EXISTS tenant_id,
DROP COLUMN IF EXISTS role,
DROP COLUMN IF EXISTS user_group_id;
CREATE TABLE IF NOT EXISTS gateway_api_keys (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL,
gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE CASCADE,
tenant_id text,
tenant_key text,
user_id text,
key_prefix text NOT NULL,
key_hash text NOT NULL UNIQUE,
name text NOT NULL,
scopes jsonb NOT NULL DEFAULT '[]'::jsonb,
user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL,
rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
quota_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'active',
expires_at timestamptz,
last_used_at timestamptz,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
deleted_at timestamptz
);
CREATE TABLE IF NOT EXISTS gateway_wallet_accounts (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL,
gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE CASCADE,
tenant_id text,
tenant_key text,
user_id text,
currency text NOT NULL DEFAULT 'resource',
balance numeric NOT NULL DEFAULT 0,
frozen_balance numeric NOT NULL DEFAULT 0,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'active',
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
UNIQUE(gateway_user_id, currency)
);
CREATE TABLE IF NOT EXISTS gateway_wallet_transactions (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
wallet_account_id uuid REFERENCES gateway_wallet_accounts(id) ON DELETE SET NULL,
gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL,
gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE SET NULL,
transaction_type text NOT NULL,
amount numeric NOT NULL,
balance_after numeric NOT NULL,
reference_type text,
reference_id text,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS gateway_recharge_orders (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL,
gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE SET NULL,
tenant_id text,
tenant_key text,
user_id text,
order_no text NOT NULL UNIQUE,
amount numeric NOT NULL,
bonus_amount numeric NOT NULL DEFAULT 0,
currency text NOT NULL DEFAULT 'resource',
payment_provider text,
payment_payload jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'pending',
paid_at timestamptz,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS platform_models (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
platform_id uuid REFERENCES integration_platforms(id) ON DELETE CASCADE,
base_model_id uuid REFERENCES base_model_catalog(id) ON DELETE SET NULL,
model_name text NOT NULL,
model_alias text,
model_type text NOT NULL,
display_name text NOT NULL DEFAULT '',
capability_override jsonb NOT NULL DEFAULT '{}'::jsonb,
capabilities jsonb NOT NULL DEFAULT '{}'::jsonb,
pricing_mode text NOT NULL DEFAULT 'inherit_discount',
discount_factor numeric,
billing_config_override jsonb NOT NULL DEFAULT '{}'::jsonb,
billing_config jsonb NOT NULL DEFAULT '{}'::jsonb,
permission_config jsonb NOT NULL DEFAULT '{}'::jsonb,
retry_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
enabled boolean NOT NULL DEFAULT true,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
UNIQUE(platform_id, model_name, model_type)
);
ALTER TABLE IF EXISTS platform_models
ADD COLUMN IF NOT EXISTS base_model_id uuid REFERENCES base_model_catalog(id) ON DELETE SET NULL,
ADD COLUMN IF NOT EXISTS model_alias text,
ADD COLUMN IF NOT EXISTS capability_override jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS pricing_mode text NOT NULL DEFAULT 'inherit_discount',
ADD COLUMN IF NOT EXISTS discount_factor numeric,
ADD COLUMN IF NOT EXISTS billing_config_override jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS permission_config jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS retry_policy jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb;
CREATE TABLE IF NOT EXISTS gateway_tasks (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
external_task_id text,
kind text NOT NULL,
run_mode text NOT NULL DEFAULT 'production',
user_id text NOT NULL,
gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE SET NULL,
user_source text NOT NULL DEFAULT 'gateway',
gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL,
tenant_id text,
tenant_key text,
api_key_id text,
user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL,
user_group_key text,
user_group_policy_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb,
model text NOT NULL,
model_type text,
request jsonb NOT NULL DEFAULT '{}'::jsonb,
normalized_request jsonb NOT NULL DEFAULT '{}'::jsonb,
status text NOT NULL DEFAULT 'queued',
queue_key text NOT NULL DEFAULT 'default',
priority integer NOT NULL DEFAULT 100,
idempotency_key text,
remote_task_id text,
remote_task_payload jsonb,
simulation_profile jsonb,
simulation_seed text,
locked_by text,
locked_at timestamptz,
heartbeat_at timestamptz,
next_run_at timestamptz NOT NULL DEFAULT now(),
attempt_count integer NOT NULL DEFAULT 0,
max_attempts integer NOT NULL DEFAULT 1,
result jsonb,
billings jsonb,
error text,
error_code text,
error_message text,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
finished_at timestamptz
);
ALTER TABLE IF EXISTS gateway_tasks
ADD COLUMN IF NOT EXISTS external_task_id text,
ADD COLUMN IF NOT EXISTS run_mode text NOT NULL DEFAULT 'production',
ADD COLUMN IF NOT EXISTS gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE SET NULL,
ADD COLUMN IF NOT EXISTS user_source text NOT NULL DEFAULT 'gateway',
ADD COLUMN IF NOT EXISTS gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL,
ADD COLUMN IF NOT EXISTS tenant_key text,
ADD COLUMN IF NOT EXISTS api_key_id text,
ADD COLUMN IF NOT EXISTS user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL,
ADD COLUMN IF NOT EXISTS user_group_key text,
ADD COLUMN IF NOT EXISTS user_group_policy_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS model_type text,
ADD COLUMN IF NOT EXISTS normalized_request jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS queue_key text NOT NULL DEFAULT 'default',
ADD COLUMN IF NOT EXISTS priority integer NOT NULL DEFAULT 100,
ADD COLUMN IF NOT EXISTS idempotency_key text,
ADD COLUMN IF NOT EXISTS remote_task_id text,
ADD COLUMN IF NOT EXISTS remote_task_payload jsonb,
ADD COLUMN IF NOT EXISTS simulation_profile jsonb,
ADD COLUMN IF NOT EXISTS simulation_seed text,
ADD COLUMN IF NOT EXISTS locked_by text,
ADD COLUMN IF NOT EXISTS locked_at timestamptz,
ADD COLUMN IF NOT EXISTS heartbeat_at timestamptz,
ADD COLUMN IF NOT EXISTS next_run_at timestamptz NOT NULL DEFAULT now(),
ADD COLUMN IF NOT EXISTS attempt_count integer NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS max_attempts integer NOT NULL DEFAULT 1,
ADD COLUMN IF NOT EXISTS error_code text,
ADD COLUMN IF NOT EXISTS error_message text,
ADD COLUMN IF NOT EXISTS finished_at timestamptz;
CREATE TABLE IF NOT EXISTS gateway_task_attempts (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE,
attempt_no integer NOT NULL,
platform_id uuid REFERENCES integration_platforms(id) ON DELETE SET NULL,
platform_model_id uuid REFERENCES platform_models(id) ON DELETE SET NULL,
client_id text,
queue_key text NOT NULL,
status text NOT NULL,
retryable boolean NOT NULL DEFAULT false,
simulated boolean NOT NULL DEFAULT false,
remote_task_id text,
request_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb,
response_snapshot jsonb,
error_code text,
error_message text,
started_at timestamptz NOT NULL DEFAULT now(),
finished_at timestamptz,
UNIQUE(task_id, attempt_no)
);
CREATE TABLE IF NOT EXISTS gateway_task_events (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE,
seq bigint NOT NULL,
event_type text NOT NULL,
status text,
phase text,
progress numeric,
message text,
payload jsonb NOT NULL DEFAULT '{}'::jsonb,
simulated boolean NOT NULL DEFAULT false,
created_at timestamptz NOT NULL DEFAULT now(),
UNIQUE(task_id, seq)
);
CREATE TABLE IF NOT EXISTS gateway_task_callback_outbox (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE,
event_id uuid REFERENCES gateway_task_events(id) ON DELETE SET NULL,
seq bigint NOT NULL,
callback_url text NOT NULL,
payload jsonb NOT NULL,
status text NOT NULL DEFAULT 'pending',
attempts integer NOT NULL DEFAULT 0,
next_attempt_at timestamptz NOT NULL DEFAULT now(),
last_error text,
delivered_at timestamptz,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
UNIQUE(task_id, seq, callback_url)
);
CREATE TABLE IF NOT EXISTS runtime_client_states (
client_id text PRIMARY KEY,
platform_id uuid REFERENCES integration_platforms(id) ON DELETE SET NULL,
provider text NOT NULL,
method_name text NOT NULL,
queue_key text NOT NULL,
running_count integer NOT NULL DEFAULT 0,
waiting_count integer NOT NULL DEFAULT 0,
limiter_ratio numeric NOT NULL DEFAULT 0,
cooldown_until timestamptz,
last_assigned_at timestamptz,
last_error text,
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS gateway_upload_assets (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
task_id uuid REFERENCES gateway_tasks(id) ON DELETE SET NULL,
source text NOT NULL,
server_main_file_id text,
url text NOT NULL,
object_key text,
content_type text,
size bigint,
checksum text,
metadata jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS gateway_retry_policies (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
scope_type text NOT NULL,
scope_key text NOT NULL,
enabled boolean NOT NULL DEFAULT true,
policy jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
UNIQUE(scope_type, scope_key)
);
CREATE TABLE IF NOT EXISTS gateway_rate_limit_policies (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
scope_type text NOT NULL,
scope_key text NOT NULL,
policy jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
UNIQUE(scope_type, scope_key)
);
CREATE TABLE IF NOT EXISTS gateway_rate_limit_counters (
scope_type text NOT NULL,
scope_key text NOT NULL,
metric text NOT NULL,
window_start timestamptz NOT NULL,
limit_value numeric NOT NULL,
used_value numeric NOT NULL DEFAULT 0,
reserved_value numeric NOT NULL DEFAULT 0,
reset_at timestamptz NOT NULL,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY(scope_type, scope_key, metric, window_start)
);
CREATE TABLE IF NOT EXISTS gateway_concurrency_leases (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE,
attempt_id uuid REFERENCES gateway_task_attempts(id) ON DELETE SET NULL,
scope_type text NOT NULL,
scope_key text NOT NULL,
lease_value numeric NOT NULL DEFAULT 1,
acquired_at timestamptz NOT NULL DEFAULT now(),
expires_at timestamptz NOT NULL,
released_at timestamptz
);
ALTER TABLE IF EXISTS settlement_outbox
ADD COLUMN IF NOT EXISTS event_type text NOT NULL DEFAULT 'task.settlement.requested',
ADD COLUMN IF NOT EXISTS payload jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS status text NOT NULL DEFAULT 'pending',
ADD COLUMN IF NOT EXISTS attempts integer NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS next_attempt_at timestamptz NOT NULL DEFAULT now(),
ADD COLUMN IF NOT EXISTS updated_at timestamptz NOT NULL DEFAULT now();
CREATE TABLE IF NOT EXISTS settlement_outbox (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE,
event_type text NOT NULL DEFAULT 'task.settlement.requested',
payload jsonb NOT NULL,
status text NOT NULL DEFAULT 'pending',
attempts integer NOT NULL DEFAULT 0,
next_attempt_at timestamptz NOT NULL DEFAULT now(),
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
UNIQUE(task_id, event_type)
);
CREATE UNIQUE INDEX IF NOT EXISTS uniq_integration_platforms_platform_key
ON integration_platforms(platform_key);
CREATE INDEX IF NOT EXISTS idx_model_catalog_provider_status
ON model_catalog_providers(status);
CREATE INDEX IF NOT EXISTS idx_base_model_catalog_provider
ON base_model_catalog(provider_key, model_type, status);
CREATE INDEX IF NOT EXISTS idx_base_model_catalog_capabilities
ON base_model_catalog USING gin(capabilities);
CREATE INDEX IF NOT EXISTS idx_integration_platforms_provider_status
ON integration_platforms(provider, status);
CREATE INDEX IF NOT EXISTS idx_integration_platforms_status_priority
ON integration_platforms(status, priority, dynamic_priority);
CREATE INDEX IF NOT EXISTS idx_integration_platforms_cooldown
ON integration_platforms(cooldown_until);
CREATE INDEX IF NOT EXISTS idx_integration_platforms_tenant_scope
ON integration_platforms(visibility_scope, tenant_id, tenant_key, status);
CREATE INDEX IF NOT EXISTS idx_model_pricing_scope
ON model_pricing_rules(scope_type, scope_id, resource_type);
CREATE INDEX IF NOT EXISTS idx_model_pricing_effective
ON model_pricing_rules(effective_from, effective_to);
CREATE INDEX IF NOT EXISTS idx_gateway_user_groups_status_priority
ON gateway_user_groups(status, priority);
CREATE INDEX IF NOT EXISTS idx_gateway_tenants_source_external
ON gateway_tenants(source, external_tenant_id)
WHERE external_tenant_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_gateway_tenants_status
ON gateway_tenants(status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_gateway_users_source_external
ON gateway_users(source, external_user_id)
WHERE external_user_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_gateway_users_status
ON gateway_users(status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_gateway_users_tenant
ON gateway_users(tenant_id, tenant_key, status);
CREATE INDEX IF NOT EXISTS idx_user_group_membership_principal
ON gateway_user_group_memberships(principal_type, principal_id, status);
CREATE INDEX IF NOT EXISTS idx_user_group_membership_effective
ON gateway_user_group_memberships(effective_from, effective_to);
CREATE INDEX IF NOT EXISTS idx_gateway_invitations_status
ON gateway_invitations(status, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_gateway_invitations_expiry
ON gateway_invitations(expires_at);
CREATE INDEX IF NOT EXISTS idx_gateway_api_keys_prefix
ON gateway_api_keys(key_prefix, status);
CREATE INDEX IF NOT EXISTS idx_gateway_api_keys_user
ON gateway_api_keys(gateway_user_id, status);
CREATE INDEX IF NOT EXISTS idx_gateway_wallet_accounts_tenant
ON gateway_wallet_accounts(gateway_tenant_id, status);
CREATE INDEX IF NOT EXISTS idx_gateway_wallet_transactions_user
ON gateway_wallet_transactions(gateway_user_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_gateway_recharge_orders_user
ON gateway_recharge_orders(gateway_user_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_platform_models_base
ON platform_models(base_model_id);
CREATE INDEX IF NOT EXISTS idx_platform_models_lookup
ON platform_models(model_type, model_name, enabled);
CREATE INDEX IF NOT EXISTS idx_platform_models_alias
ON platform_models(model_alias);
CREATE INDEX IF NOT EXISTS idx_platform_models_capabilities
ON platform_models USING gin(capabilities);
CREATE UNIQUE INDEX IF NOT EXISTS uniq_platform_models_model
ON platform_models(platform_id, model_name, model_type);
CREATE INDEX IF NOT EXISTS idx_gateway_tasks_queue
ON gateway_tasks(status, next_run_at, priority, created_at);
CREATE INDEX IF NOT EXISTS idx_gateway_tasks_lease
ON gateway_tasks(status, heartbeat_at);
CREATE INDEX IF NOT EXISTS idx_gateway_tasks_user_created
ON gateway_tasks(user_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_gateway_tasks_external
ON gateway_tasks(external_task_id);
CREATE UNIQUE INDEX IF NOT EXISTS uniq_gateway_tasks_idempotency
ON gateway_tasks(user_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_gateway_attempts_task
ON gateway_task_attempts(task_id);
CREATE INDEX IF NOT EXISTS idx_gateway_attempts_client
ON gateway_task_attempts(client_id, started_at DESC);
CREATE INDEX IF NOT EXISTS idx_gateway_events_task_created
ON gateway_task_events(task_id, created_at);
CREATE INDEX IF NOT EXISTS idx_task_callback_outbox_pending
ON gateway_task_callback_outbox(status, next_attempt_at);
CREATE INDEX IF NOT EXISTS idx_task_callback_outbox_task
ON gateway_task_callback_outbox(task_id, seq);
CREATE INDEX IF NOT EXISTS idx_runtime_client_queue
ON runtime_client_states(queue_key, cooldown_until);
CREATE INDEX IF NOT EXISTS idx_runtime_client_platform
ON runtime_client_states(platform_id);
CREATE INDEX IF NOT EXISTS idx_gateway_upload_task
ON gateway_upload_assets(task_id);
CREATE INDEX IF NOT EXISTS idx_gateway_upload_file
ON gateway_upload_assets(server_main_file_id);
CREATE INDEX IF NOT EXISTS idx_concurrency_leases_active
ON gateway_concurrency_leases(scope_type, scope_key, released_at, expires_at);
CREATE INDEX IF NOT EXISTS idx_concurrency_leases_task
ON gateway_concurrency_leases(task_id);
CREATE UNIQUE INDEX IF NOT EXISTS uniq_settlement_outbox_task_event
ON settlement_outbox(task_id, event_type);

View File

@ -9,7 +9,7 @@
"executor": "nx:run-commands",
"options": {
"cwd": "apps/api",
"command": "go run ./cmd/gateway"
"command": "node ../../scripts/go-watch.mjs -- go run ./cmd/gateway"
}
},
"migrate": {

View File

@ -2,7 +2,9 @@ import { useEffect, useMemo, useState, type FormEvent } from 'react';
import type {
BaseModelCatalogItem,
CatalogProvider,
GatewayApiKey,
GatewayTenant,
GatewayTask,
GatewayUser,
IntegrationPlatform,
PlatformModel,
@ -11,7 +13,12 @@ import type {
UserGroup,
} from '@easyai-ai-gateway/contracts';
import {
createApiKey,
createChatTask,
createPlatform,
getTask,
getHealth,
listApiKeys,
listBaseModels,
listCatalogProviders,
listModels,
@ -96,8 +103,6 @@ export function App() {
email: '',
password: '',
displayName: '',
tenantKey: '',
tenantName: '',
invitationCode: '',
});
const [health, setHealth] = useState<HealthResponse | null>(null);
@ -110,6 +115,22 @@ export function App() {
const [tenants, setTenants] = useState<GatewayTenant[]>([]);
const [users, setUsers] = useState<GatewayUser[]>([]);
const [userGroups, setUserGroups] = useState<UserGroup[]>([]);
const [apiKeys, setApiKeys] = useState<GatewayApiKey[]>([]);
const [apiKeyForm, setApiKeyForm] = useState({ name: 'Local smoke key' });
const [apiKeySecret, setApiKeySecret] = useState('');
const [platformForm, setPlatformForm] = useState({
provider: 'openai',
platformKey: 'openai-simulation',
name: 'OpenAI Simulation',
baseUrl: 'https://api.openai.com/v1',
});
const [taskForm, setTaskForm] = useState({
model: 'gpt-4o-mini',
prompt: '用一句话确认 AI Gateway simulation 链路正常。',
});
const [taskResult, setTaskResult] = useState<GatewayTask | null>(null);
const [coreState, setCoreState] = useState<LoadState>('idle');
const [coreMessage, setCoreMessage] = useState('');
const [state, setState] = useState<LoadState>('idle');
const [error, setError] = useState('');
@ -151,6 +172,7 @@ export function App() {
tenantResponse,
userResponse,
userGroupResponse,
apiKeyResponse,
] = await Promise.all([
listPlatforms(nextToken),
listModels(nextToken),
@ -161,6 +183,7 @@ export function App() {
listTenants(nextToken),
listUsers(nextToken),
listUserGroups(nextToken),
listApiKeys(nextToken),
]);
setPlatforms(platformResponse.items);
setModels(modelResponse.items);
@ -171,6 +194,7 @@ export function App() {
setTenants(tenantResponse.items);
setUsers(userResponse.items);
setUserGroups(userGroupResponse.items);
setApiKeys(apiKeyResponse.items);
setState('ready');
} catch (err) {
setState('error');
@ -229,6 +253,68 @@ export function App() {
setTenants([]);
setUsers([]);
setUserGroups([]);
setApiKeys([]);
setApiKeySecret('');
setTaskResult(null);
setCoreMessage('');
}
async function submitAPIKey(event: FormEvent<HTMLFormElement>) {
event.preventDefault();
setCoreState('loading');
setCoreMessage('');
try {
const response = await createApiKey(token, { name: apiKeyForm.name, scopes: ['chat', 'image', 'video'] });
setApiKeySecret(response.secret);
setApiKeys((current) => [response.apiKey, ...current.filter((item) => item.id !== response.apiKey.id)]);
setCoreState('ready');
setCoreMessage('API Key 已创建secret 仅展示一次。');
} catch (err) {
setCoreState('error');
setCoreMessage(err instanceof Error ? err.message : '创建 API Key 失败');
}
}
async function submitPlatform(event: FormEvent<HTMLFormElement>) {
event.preventDefault();
setCoreState('loading');
setCoreMessage('');
try {
const platform = await createPlatform(token, {
...platformForm,
authType: 'bearer',
credentials: { mode: 'simulation' },
config: { testMode: true },
});
setPlatforms((current) => [platform, ...current.filter((item) => item.id !== platform.id)]);
setCoreState('ready');
setCoreMessage('平台已创建,当前阶段平台凭证仅全局管理员可配置。');
} catch (err) {
setCoreState('error');
setCoreMessage(err instanceof Error ? err.message : '创建平台失败');
}
}
async function submitTask(event: FormEvent<HTMLFormElement>) {
event.preventDefault();
const credential = apiKeySecret || token;
setCoreState('loading');
setCoreMessage('');
try {
const response = await createChatTask(credential, {
model: taskForm.model,
runMode: 'simulation',
simulation: true,
messages: [{ role: 'user', content: taskForm.prompt }],
});
const detail = await getTask(credential, response.task.id);
setTaskResult(detail);
setCoreState('ready');
setCoreMessage(apiKeySecret ? '任务已通过本地 API Key 完成 simulation。' : '任务已通过当前 Access Token 完成 simulation。');
} catch (err) {
setCoreState('error');
setCoreMessage(err instanceof Error ? err.message : '测试任务失败');
}
}
return (
@ -278,6 +364,23 @@ export function App() {
</button>
</section>
<CoreFlowPanel
apiKeyForm={apiKeyForm}
apiKeys={apiKeys}
apiKeySecret={apiKeySecret}
coreMessage={coreMessage}
coreState={coreState}
platformForm={platformForm}
taskForm={taskForm}
taskResult={taskResult}
onAPIKeyFormChange={setApiKeyForm}
onPlatformFormChange={setPlatformForm}
onSubmitAPIKey={submitAPIKey}
onSubmitPlatform={submitPlatform}
onSubmitTask={submitTask}
onTaskFormChange={setTaskForm}
/>
<Dashboard
baseModels={baseModels}
models={models}
@ -302,8 +405,6 @@ function AuthPanel(props: {
email: string;
password: string;
displayName: string;
tenantKey: string;
tenantName: string;
invitationCode: string;
};
state: LoadState;
@ -315,8 +416,6 @@ function AuthPanel(props: {
email: string;
password: string;
displayName: string;
tenantKey: string;
tenantName: string;
invitationCode: string;
}) => void;
onSubmitExternalToken: (event: FormEvent<HTMLFormElement>) => void;
@ -414,22 +513,6 @@ function AuthPanel(props: {
placeholder="至少 8 位"
/>
</label>
<label>
<span> Key</span>
<input
value={props.registerForm.tenantKey}
onChange={(event) => props.onRegisterChange({ ...props.registerForm, tenantKey: event.target.value })}
placeholder="team-a"
/>
</label>
<label>
<span></span>
<input
value={props.registerForm.tenantName}
onChange={(event) => props.onRegisterChange({ ...props.registerForm, tenantName: event.target.value })}
placeholder="Team A"
/>
</label>
<label>
<span></span>
<input
@ -464,6 +547,98 @@ function AuthPanel(props: {
);
}
function CoreFlowPanel(props: {
apiKeyForm: { name: string };
apiKeys: GatewayApiKey[];
apiKeySecret: string;
coreMessage: string;
coreState: LoadState;
platformForm: { provider: string; platformKey: string; name: string; baseUrl: string };
taskForm: { model: string; prompt: string };
taskResult: GatewayTask | null;
onAPIKeyFormChange: (value: { name: string }) => void;
onPlatformFormChange: (value: { provider: string; platformKey: string; name: string; baseUrl: string }) => void;
onSubmitAPIKey: (event: FormEvent<HTMLFormElement>) => void;
onSubmitPlatform: (event: FormEvent<HTMLFormElement>) => void;
onSubmitTask: (event: FormEvent<HTMLFormElement>) => void;
onTaskFormChange: (value: { model: string; prompt: string }) => void;
}) {
return (
<section className="corePanel" aria-label="核心链路验证">
<div className="sectionHeader">
<div>
<p className="eyebrow">Smoke Flow</p>
<h2></h2>
</div>
<span>{props.coreState === 'loading' ? '运行中' : '本地闭环'}</span>
</div>
<div className="coreGrid">
<form className="inlineForm" onSubmit={props.onSubmitAPIKey}>
<h3>1. API Key</h3>
<label>
<span></span>
<input value={props.apiKeyForm.name} onChange={(event) => props.onAPIKeyFormChange({ name: event.target.value })} />
</label>
<button type="submit" disabled={props.coreState === 'loading'}>
API Key
</button>
<p className="formHint"> {props.apiKeys.length} Key</p>
{props.apiKeySecret && <code className="secretBox">{props.apiKeySecret}</code>}
</form>
<form className="inlineForm" onSubmit={props.onSubmitPlatform}>
<h3>2. </h3>
<label>
<span>Provider</span>
<input value={props.platformForm.provider} onChange={(event) => props.onPlatformFormChange({ ...props.platformForm, provider: event.target.value })} />
</label>
<label>
<span> Key</span>
<input value={props.platformForm.platformKey} onChange={(event) => props.onPlatformFormChange({ ...props.platformForm, platformKey: event.target.value })} />
</label>
<label>
<span></span>
<input value={props.platformForm.name} onChange={(event) => props.onPlatformFormChange({ ...props.platformForm, name: event.target.value })} />
</label>
<label>
<span>Base URL</span>
<input value={props.platformForm.baseUrl} onChange={(event) => props.onPlatformFormChange({ ...props.platformForm, baseUrl: event.target.value })} />
</label>
<button type="submit" disabled={props.coreState === 'loading'}>
</button>
</form>
<form className="inlineForm" onSubmit={props.onSubmitTask}>
<h3>3. Simulation </h3>
<label>
<span></span>
<input value={props.taskForm.model} onChange={(event) => props.onTaskFormChange({ ...props.taskForm, model: event.target.value })} />
</label>
<label>
<span>Prompt</span>
<input value={props.taskForm.prompt} onChange={(event) => props.onTaskFormChange({ ...props.taskForm, prompt: event.target.value })} />
</label>
<button type="submit" disabled={props.coreState === 'loading'}>
</button>
{props.taskResult && (
<div className="resultBox">
<div>
<span className="statusPill">{props.taskResult.status}</span>
<strong>{props.taskResult.model}</strong>
</div>
<pre>{JSON.stringify(props.taskResult.result ?? {}, null, 2)}</pre>
</div>
)}
</form>
</div>
{props.coreMessage && <p className="coreMessage" data-error={props.coreState === 'error'}>{props.coreMessage}</p>}
</section>
);
}
function Dashboard(props: {
baseModels: BaseModelCatalogItem[];
models: PlatformModel[];

View File

@ -2,7 +2,10 @@ import type {
AuthResponse,
BaseModelCatalogItem,
CatalogProvider,
CreatedGatewayApiKey,
GatewayApiKey,
GatewayTenant,
GatewayTask,
GatewayUser,
IntegrationPlatform,
ListResponse,
@ -30,8 +33,6 @@ export async function registerLocalAccount(input: {
email?: string;
password: string;
displayName?: string;
tenantKey?: string;
tenantName?: string;
invitationCode?: string;
}): Promise<AuthResponse> {
return request<AuthResponse>('/api/v1/auth/register', {
@ -81,6 +82,58 @@ export async function listUserGroups(token: string): Promise<ListResponse<UserGr
return request<ListResponse<UserGroup>>('/api/v1/user-groups', { token });
}
export async function listApiKeys(token: string): Promise<ListResponse<GatewayApiKey>> {
return request<ListResponse<GatewayApiKey>>('/api/v1/api-keys', { token });
}
export async function createApiKey(
token: string,
input: { name: string; scopes?: string[] },
): Promise<CreatedGatewayApiKey> {
return request<CreatedGatewayApiKey>('/api/v1/api-keys', {
body: input,
method: 'POST',
token,
});
}
export async function createPlatform(
token: string,
input: {
provider: string;
platformKey?: string;
name: string;
baseUrl?: string;
authType?: string;
credentials?: Record<string, unknown>;
config?: Record<string, unknown>;
defaultPricingMode?: string;
defaultDiscountFactor?: number;
priority?: number;
},
): Promise<IntegrationPlatform> {
return request<IntegrationPlatform>('/api/v1/platforms', {
body: input,
method: 'POST',
token,
});
}
export async function createChatTask(
token: string,
input: { model: string; messages: Array<Record<string, unknown>>; runMode?: string; simulation?: boolean },
): Promise<{ task: GatewayTask; next: Record<string, string> }> {
return request<{ task: GatewayTask; next: Record<string, string> }>('/api/v1/chat/completions', {
body: input,
method: 'POST',
token,
});
}
export async function getTask(token: string, taskId: string): Promise<GatewayTask> {
return request<GatewayTask>(`/api/v1/tasks/${taskId}`, { token });
}
export async function listRateLimitWindows(token: string): Promise<ListResponse<RateLimitWindow>> {
return request<ListResponse<RateLimitWindow>>('/api/v1/runtime/rate-limit-windows', { token });
}
@ -103,7 +156,19 @@ async function request<T>(
});
if (!response.ok) {
const body = await response.text();
throw new Error(body || `Request failed: ${response.status}`);
throw new Error(parseErrorMessage(body) || `Request failed: ${response.status}`);
}
return response.json() as Promise<T>;
}
function parseErrorMessage(body: string) {
if (!body) {
return '';
}
try {
const parsed = JSON.parse(body) as { error?: { message?: string } };
return parsed.error?.message ?? body;
} catch {
return body;
}
}

View File

@ -244,6 +244,116 @@ button:disabled {
background: #ffffff;
}
.corePanel {
padding: 18px;
margin-bottom: 18px;
border: 1px solid #dde3ee;
border-radius: 8px;
background: #ffffff;
}
.coreGrid {
display: grid;
grid-template-columns: repeat(3, minmax(0, 1fr));
gap: 12px;
}
.inlineForm {
display: flex;
min-height: 260px;
flex-direction: column;
gap: 12px;
padding: 14px;
border: 1px solid #e4eaf3;
border-radius: 8px;
background: #fbfcff;
}
.inlineForm label {
display: grid;
gap: 7px;
color: #4a5568;
font-size: 13px;
font-weight: 700;
}
.inlineForm input {
width: 100%;
min-height: 40px;
padding: 0 11px;
border: 1px solid #cbd5e1;
border-radius: 6px;
color: #172033;
outline: none;
}
.inlineForm button {
margin-top: auto;
}
.formHint,
.coreMessage {
color: #667085;
font-size: 13px;
line-height: 1.5;
}
.coreMessage {
margin-top: 12px;
font-weight: 700;
}
.coreMessage[data-error="true"] {
color: #9b2c2c;
}
.secretBox {
display: block;
overflow: hidden;
padding: 10px;
border: 1px solid #d8e0ec;
border-radius: 6px;
background: #ffffff;
color: #214e8a;
font-size: 12px;
text-overflow: ellipsis;
white-space: nowrap;
}
.resultBox {
display: grid;
gap: 10px;
padding: 10px;
border: 1px solid #d8e0ec;
border-radius: 6px;
background: #ffffff;
}
.resultBox > div {
display: flex;
align-items: center;
justify-content: space-between;
gap: 10px;
}
.resultBox pre {
overflow: auto;
max-height: 150px;
margin: 0;
color: #2d3748;
font-size: 12px;
line-height: 1.45;
}
.statusPill {
padding: 4px 8px;
border-radius: 999px;
background: #e8f5ef;
color: #1b8a5a;
font-size: 12px;
font-weight: 800;
}
.sectionHeader {
display: flex;
align-items: center;
@ -476,11 +586,12 @@ button:disabled {
flex-direction: column;
}
.metrics,
.split,
.moduleGrid,
.detailGrid {
grid-template-columns: 1fr;
.metrics,
.split,
.coreGrid,
.moduleGrid,
.detailGrid {
grid-template-columns: 1fr;
}
.row {

View File

@ -153,19 +153,21 @@ Gateway 本地登录首期只签发短期 access token不做 refresh token
| Method | Path | Permission | 说明 |
| --- | --- | --- | --- |
| `POST` | `/api/v1/auth/register` | `public` | 注册 Gateway 本地账号,创建或加入租户,返回 access token |
| `POST` | `/api/v1/auth/register` | `public` | 注册 Gateway 本地账号,进入默认租户;邀请码仅记录邀请关系,返回 access token |
| `POST` | `/api/v1/auth/login` | `public` | 本地账号登录,返回 access token |
注册字段:
- `username` / `email`:至少一个必填。
- `password`:至少 8 位,落库为 bcrypt hash。
- `tenantKey` / `tenantName`:可选;未传时创建个人租户。
- `displayName`:可选。
- `invitationCode`:可选;填写后只校验并记录邀请关系,不改变租户、用户组或角色。
安全边界:
- 普通注册首期只给 `user` 角色。
- 允许注册时填写已有 `tenantKey` 只是脚手架行为;正式上线前需要租户邀请、域名校验或管理员审批。
- 普通注册界面不展示、不提交租户 Key 和租户名称,单租户场景下后端创建或复用 `tenant_key=default` 的默认租户。
- 普通注册首期默认给 `user` 角色;首次初始化用户可作为本地闭环的全局管理员引导账号。
- 多租户加入后续通过域名校验、管理员审批或 `server-main` 同步等受控入口完成,不由普通注册邀请码决定。
- `server-main` 模式可关闭本地注册登录,只保留外部 token 入口。
### 5.2 权限等级
@ -315,8 +317,8 @@ AI Gateway 的外部接口必须以“兼容现有 EasyAI 路由、请求 DTO、
| `GET` | `/api/v1/tenants` | `power` | 网关租户列表,支持本地租户和 server-main 同步租户 |
| `GET` | `/api/v1/tenants/:id` | `power` | 租户详情、来源、策略和同步状态 |
| `POST` | `/api/v1/tenants/sync` | `power` | 从 `server-main` 拉取或接收租户同步 |
| `GET` | `/api/v1/tenant-invitations` | `power` | 本地租户邀请码列表 |
| `POST` | `/api/v1/tenant-invitations` | `power` | 创建本地注册邀请码 |
| `GET` | `/api/v1/invitations` | `power` | 本地注册邀请码列表 |
| `POST` | `/api/v1/invitations` | `power` | 创建本地注册邀请码 |
| `GET` | `/api/v1/users` | `power` | 网关用户列表,支持本地用户和 server-main 同步用户 |
| `GET` | `/api/v1/users/:id` | `power` | 用户详情、来源、角色、用户组和同步状态 |
| `POST` | `/api/v1/users/sync` | `power` | 从 `server-main` 拉取或接收用户同步 |
@ -375,8 +377,8 @@ Gateway 需要支持三种身份运行模式,默认配置为 `IDENTITY_MODE=hy
本地注册规则:
- 普通注册默认开放,`invitationCode` 可选;不填邀请码时创建或复用 `tenantKey` 对应的 Gateway 本地租户,未填 `tenantKey` 时生成个人租户
- 填写邀请码时必须命中 `gateway_tenant_invitations` 的有效记录,注册用户加入邀请码指定租户,并可绑定邀请码指定用户组
- 普通注册默认开放,`invitationCode` 可选;不填邀请码时创建或复用 Gateway 本地默认租户 `tenant_key=default`,不允许普通用户在注册界面填写租户 Key 或租户名称
- 填写邀请码时必须命中 `gateway_invitations` 的有效记录;邀请码只用于标识邀请关系、更新使用次数和写入用户注册 metadata不参与租户、用户组或角色分配
- 平台凭证、provider 凭证和全局模型配置暂时只允许全局管理员维护;租户侧先只做模型使用、用量查看和策略展示。
身份解析流程:
@ -783,17 +785,14 @@ Gateway 需要支持三种身份运行模式,默认配置为 `IDENTITY_MODE=hy
- 接入 `server-main` 时可使用 `principal_type='user' + external_user_id`,并通过 `gateway_users.source='server-main'` 反查同步副本。
- 多个组同时命中时,先按 `gateway_user_group_memberships.priority`,再按 `gateway_user_groups.priority` 合并策略。
#### 7.9.8 `gateway_tenant_invitations`
#### 7.9.8 `gateway_invitations`
本地注册邀请码。邀请码不是普通注册的必填项,但填写后必须可校验,并决定用户加入哪个租户和用户组
本地注册邀请码。邀请码不是普通注册的必填项;填写后必须可校验,但当前阶段只标识邀请关系,不决定用户加入哪个租户、用户组或角色
| 字段 | 类型 | 约束 | 说明 |
| --- | --- | --- | --- |
| `id` | `uuid` | PK | 邀请 ID |
| `tenant_id` | `uuid` | FK | 加入的 Gateway 租户 |
| `invite_code` | `text` | unique, not null | 注册邀请码 |
| `role` | `text` | not null, default `user` | 注册后角色,默认普通用户 |
| `user_group_id` | `uuid` | FK nullable | 注册后默认用户组 |
| `max_uses` | `int` | nullable | 最大使用次数 |
| `used_count` | `int` | not null | 已使用次数 |
| `expires_at` | `timestamptz` | nullable | 过期时间 |
@ -1838,7 +1837,7 @@ type ServerMainUploadClient interface {
| 页面 | 路由 | 权限 | 说明 |
| --- | --- | --- | --- |
| 登录 | `/login` | `public` | 用户名/邮箱 + 密码登录 Gateway 本地账号 |
| 注册 | `/register` | `public` | 创建 Gateway 本地账号;可填写租户 key / 租户名称 / 邀请码,邀请码不是必填 |
| 注册 | `/register` | `public` | 创建 Gateway 本地账号;可填写邀请码,租户字段不出现在普通注册界面 |
| 外部 Token 入口 | `/login?mode=external` | `public` | 粘贴 `server-main` access token用于接入模式和开发调试 |
登录成功后统一获得 Gateway 可校验的 access token本地账号 token 中包含 `source=gateway`、`gatewayUserId`、`gatewayTenantId`、`tenantKey`。外部 token 保留 `source=server-main` 或主服务返回的 claim。
@ -2127,6 +2126,6 @@ apps/web/src/
已确认决策:
1. 默认身份模式就是 `hybrid`,用于同时支持本地账号和 `server-main` token / API Key。
2. 普通注册支持填写邀请码,但邀请码不是必填项;填写后按邀请码加入指定租户和用户组
2. 普通注册支持填写邀请码,但邀请码不是必填项;填写后只记录邀请关系,不决定租户、用户组或角色
3. 先做本地闭环:独立模式下本地 API Key、余额、充值订单和钱包流水由 Gateway 承接。
4. 当前阶段仅允许全局管理员配置 provider / platform 凭证和全局模型,不开放租户管理员自助配置凭证。

View File

@ -15,7 +15,7 @@
### 1.1 模型库
- 建立 `model_catalog_providers`、`base_model_catalog`、`model_pricing_rules` 的首批数据。
- 建立 `gateway_tenants`、`gateway_users`、`gateway_user_groups`、`gateway_tenant_invitations` 和用户组策略,支持独立租户/用户、可选邀请码注册、`server-main` 同步租户/用户、不同用户组的充值折扣、调用折扣和并发/限流策略。
- 建立 `gateway_tenants`、`gateway_users`、`gateway_user_groups`、`gateway_invitations` 和用户组策略,支持独立租户/用户、可选邀请码注册关系记录、`server-main` 同步租户/用户、不同用户组的充值折扣、调用折扣和并发/限流策略。
- 建立独立模式本地闭环表:`gateway_api_keys`、`gateway_wallet_accounts`、`gateway_wallet_transactions`、`gateway_recharge_orders`。
- 导入 OpenAI、Gemini 的基准 provider、基准模型、能力 schema、默认限流模板。
- 支持全局模型配置:模型类型、上下文、多模态能力、图片输入/输出能力、stream 支持、价格规则。

View File

@ -30,8 +30,6 @@ export interface LocalRegisterRequest {
email?: string;
password: string;
displayName?: string;
tenantKey?: string;
tenantName?: string;
invitationCode?: string;
}
@ -42,10 +40,7 @@ export interface LocalLoginRequest {
export interface GatewayInvitation {
id: string;
tenantId: string;
inviteCode: string;
role: 'user' | 'creator' | 'operator' | 'admin' | string;
userGroupId?: string;
maxUses?: number;
usedCount: number;
expiresAt?: string;
@ -229,6 +224,11 @@ export interface GatewayApiKey {
updatedAt: string;
}
export interface CreatedGatewayApiKey {
apiKey: GatewayApiKey;
secret: string;
}
export interface GatewayWalletAccount {
id: string;
gatewayTenantId?: string;
@ -341,6 +341,7 @@ export interface RateLimitWindow {
export interface GatewayTask {
id: string;
kind: string;
runMode?: 'production' | 'simulation' | string;
userId: string;
gatewayUserId?: string;
userSource?: 'gateway' | 'server-main' | 'sync' | string;
@ -360,6 +361,20 @@ export interface GatewayTask {
updatedAt: string;
}
export interface GatewayTaskEvent {
id: string;
taskId: string;
seq: number;
eventType: string;
status?: string;
phase?: string;
progress?: number;
message?: string;
payload?: Record<string, unknown>;
simulated: boolean;
createdAt: string;
}
export interface ListResponse<T> {
items: T[];
}

View File

@ -1,7 +1,15 @@
#!/usr/bin/env bash
set -euo pipefail
CONTAINER="${AI_GATEWAY_PG_CONTAINER:-easyai-pgvector}"
if [[ -n "${AI_GATEWAY_PG_CONTAINER:-}" ]]; then
CONTAINER="$AI_GATEWAY_PG_CONTAINER"
elif docker inspect easyai-pgvector >/dev/null 2>&1; then
CONTAINER="easyai-pgvector"
elif docker inspect postgres >/dev/null 2>&1; then
CONTAINER="postgres"
else
CONTAINER="easyai-pgvector"
fi
PGUSER="${AI_GATEWAY_PG_USER:-easyai}"
DB_NAME="${AI_GATEWAY_DATABASE_NAME:-easyai_ai_gateway}"

View File

@ -1,10 +1,26 @@
#!/usr/bin/env bash
set -euo pipefail
export AI_GATEWAY_PG_CONTAINER="${AI_GATEWAY_PG_CONTAINER:-easyai-pgvector}"
if [[ -z "${AI_GATEWAY_PG_CONTAINER:-}" ]]; then
if docker inspect easyai-pgvector >/dev/null 2>&1; then
export AI_GATEWAY_PG_CONTAINER="easyai-pgvector"
elif docker inspect postgres >/dev/null 2>&1; then
export AI_GATEWAY_PG_CONTAINER="postgres"
else
export AI_GATEWAY_PG_CONTAINER="easyai-pgvector"
fi
fi
export AI_GATEWAY_PG_USER="${AI_GATEWAY_PG_USER:-easyai}"
if [[ -z "${AI_GATEWAY_PG_PASSWORD:-}" ]] && docker inspect "$AI_GATEWAY_PG_CONTAINER" >/dev/null 2>&1; then
AI_GATEWAY_PG_PASSWORD="$(
docker inspect "$AI_GATEWAY_PG_CONTAINER" --format '{{range .Config.Env}}{{println .}}{{end}}' \
| awk -F= '$1 == "POSTGRES_PASSWORD" {print $2; exit}'
)"
export AI_GATEWAY_PG_PASSWORD
fi
export AI_GATEWAY_PG_PASSWORD="${AI_GATEWAY_PG_PASSWORD:-easyai2025}"
export AI_GATEWAY_DATABASE_NAME="${AI_GATEWAY_DATABASE_NAME:-easyai_ai_gateway}"
export AI_GATEWAY_DATABASE_URL="${AI_GATEWAY_DATABASE_URL:-postgresql://easyai:easyai2025@localhost:5432/easyai_ai_gateway?sslmode=disable}"
export AI_GATEWAY_DATABASE_URL="${AI_GATEWAY_DATABASE_URL:-postgresql://${AI_GATEWAY_PG_USER}:${AI_GATEWAY_PG_PASSWORD}@localhost:5432/${AI_GATEWAY_DATABASE_NAME}?sslmode=disable}"
echo "[ai-gateway] using database: ${AI_GATEWAY_DATABASE_URL}"

226
scripts/go-watch.mjs Normal file
View File

@ -0,0 +1,226 @@
#!/usr/bin/env node
import { spawn } from 'node:child_process';
import { readdir, stat } from 'node:fs/promises';
import path from 'node:path';
import process from 'node:process';
const watchRoot = process.env.GO_WATCH_ROOT || process.cwd();
const intervalMs = Number(process.env.GO_WATCH_INTERVAL_MS || 700);
const debounceMs = Number(process.env.GO_WATCH_DEBOUNCE_MS || 250);
const shutdownGraceMs = Number(process.env.GO_WATCH_SHUTDOWN_GRACE_MS || 2500);
const restartDelayMs = Number(process.env.GO_WATCH_RESTART_DELAY_MS || 200);
const commandIndex = process.argv.indexOf('--');
const command = commandIndex >= 0 ? process.argv.slice(commandIndex + 1) : ['go', 'run', './cmd/gateway'];
const ignoredDirs = new Set([
'.git',
'.nx',
'coverage',
'dist',
'node_modules',
'tmp',
'vendor',
]);
const watchedExtensions = new Set(['.go', '.mod', '.sum']);
if (!command.length) {
console.error('[go-watch] missing command after --');
process.exit(1);
}
let child = null;
let knownSignature = '';
let restarting = false;
let restartQueued = false;
let pendingRestart = null;
let shuttingDown = false;
let stopped = false;
function log(message) {
console.log(`[go-watch] ${message}`);
}
function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
async function collectFiles(dir, files = []) {
let entries = [];
try {
entries = await readdir(dir, { withFileTypes: true });
} catch {
return files;
}
for (const entry of entries) {
if (entry.name.startsWith('.') && entry.name !== '.env') {
continue;
}
if (entry.isDirectory()) {
if (!ignoredDirs.has(entry.name)) {
await collectFiles(path.join(dir, entry.name), files);
}
continue;
}
if (watchedExtensions.has(path.extname(entry.name))) {
files.push(path.join(dir, entry.name));
}
}
return files;
}
async function signature() {
const files = await collectFiles(watchRoot);
files.sort();
const parts = [];
for (const file of files) {
try {
const info = await stat(file);
parts.push(`${file}:${info.mtimeMs}:${info.size}`);
} catch {
parts.push(`${file}:deleted`);
}
}
return parts.join('\n');
}
function start() {
if (stopped) {
return;
}
log(`starting: ${command.join(' ')}`);
child = spawn(command[0], command.slice(1), {
cwd: watchRoot,
detached: process.platform !== 'win32',
env: process.env,
stdio: 'inherit',
});
child.on('exit', (code, signal) => {
if (stopped || restarting) {
return;
}
log(`process exited code=${code ?? 'null'} signal=${signal ?? 'null'}`);
});
}
function signalProcessTree(processToStop, signal) {
if (!processToStop || processToStop.pid === undefined) {
return;
}
try {
if (process.platform === 'win32') {
processToStop.kill(signal);
} else {
process.kill(-processToStop.pid, signal);
}
} catch (error) {
if (error?.code !== 'ESRCH') {
console.error(`[go-watch] failed to send ${signal}`, error);
}
}
}
function stopProcessTree(processToStop) {
if (!processToStop || processToStop.exitCode !== null || processToStop.signalCode !== null) {
return Promise.resolve();
}
return new Promise((resolve) => {
let done = false;
const finish = () => {
if (done) {
return;
}
done = true;
clearTimeout(forceTimer);
clearTimeout(resolveTimer);
resolve();
};
const forceTimer = setTimeout(() => {
signalProcessTree(processToStop, 'SIGKILL');
}, shutdownGraceMs);
const resolveTimer = setTimeout(finish, shutdownGraceMs + 500);
forceTimer.unref();
resolveTimer.unref();
processToStop.once('exit', finish);
signalProcessTree(processToStop, 'SIGTERM');
});
}
async function restartNow() {
if (restarting) {
restartQueued = true;
return;
}
restarting = true;
log('change detected, restarting');
const previous = child;
child = null;
await stopProcessTree(previous);
if (restartDelayMs > 0) {
await sleep(restartDelayMs);
}
restarting = false;
if (!stopped) {
start();
}
if (restartQueued && !stopped) {
restartQueued = false;
restart();
}
}
function restart() {
if (pendingRestart) {
clearTimeout(pendingRestart);
}
pendingRestart = setTimeout(() => {
pendingRestart = null;
void restartNow();
}, debounceMs);
}
async function poll() {
try {
const nextSignature = await signature();
if (!knownSignature) {
knownSignature = nextSignature;
} else if (nextSignature !== knownSignature) {
knownSignature = nextSignature;
restart();
}
} catch (error) {
console.error('[go-watch] scan failed', error);
}
}
async function main() {
log(`watching ${watchRoot}`);
knownSignature = await signature();
start();
const timer = setInterval(poll, intervalMs);
const shutdown = async (exitCode) => {
if (shuttingDown) {
return;
}
shuttingDown = true;
stopped = true;
clearInterval(timer);
if (pendingRestart) {
clearTimeout(pendingRestart);
}
await stopProcessTree(child);
process.exit(exitCode);
};
process.on('SIGINT', () => {
void shutdown(130);
});
process.on('SIGTERM', () => {
void shutdown(143);
});
}
main().catch((error) => {
console.error('[go-watch] fatal', error);
process.exit(1);
});