diff --git a/.env.example b/.env.example index 053e244..0eedfa5 100644 --- a/.env.example +++ b/.env.example @@ -34,5 +34,5 @@ TASK_PROGRESS_CALLBACK_URL=http://localhost:3000/internal/platform/task-progress TASK_PROGRESS_CALLBACK_TIMEOUT_MS=5000 TASK_PROGRESS_CALLBACK_MAX_ATTEMPTS=10 -CORS_ALLOWED_ORIGIN=http://localhost:5178 +CORS_ALLOWED_ORIGIN=http://localhost:5178,http://127.0.0.1:5178 VITE_GATEWAY_API_BASE_URL=http://localhost:8088 diff --git a/README.md b/README.md index f99b091..88e5b40 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,13 @@ pnpm dev - PostgreSQL: 目标版本 18,默认使用宿主机 `localhost:5432` 上的 `easyai-pgvector` 实例,并使用独立库 `easyai_ai_gateway` - 身份模式: 默认 `IDENTITY_MODE=hybrid`,可同时测试 Gateway 本地账号注册登录、可选邀请码和 `server-main` JWT / API Key 对接。 +`pnpm dev` 会先创建数据库并执行 migration,然后并行启动: + +- `api:dev`:通过 `scripts/go-watch.mjs` 运行 Go API,监听 `.go`、`go.mod`、`go.sum` 变化并自动重启后端进程;watcher 会按进程组终止旧的 `go run` 和其子进程,避免热更新时残留进程占用 API 端口。 +- `web:dev`:Vite React dev server。 + +后端热更新可通过 `GO_WATCH_SHUTDOWN_GRACE_MS` 和 `GO_WATCH_RESTART_DELAY_MS` 调整旧进程退出等待时间与重启间隔。 + 默认 EasyAI 部署里,`easyai-pgvector` 在容器网络内的连接串是: ```dotenv diff --git a/apps/api/internal/auth/auth.go b/apps/api/internal/auth/auth.go index ff9bbde..d4be656 100644 --- a/apps/api/internal/auth/auth.go +++ b/apps/api/internal/auth/auth.go @@ -52,6 +52,7 @@ type Authenticator struct { ServerMainBaseURL string ServerMainInternalToken string HTTPClient *http.Client + LocalAPIKeyVerifier func(ctx context.Context, apiKey string) (*User, error) } func New(jwtSecret string, serverMainBaseURL string, internalToken string) *Authenticator { @@ -169,6 +170,18 @@ func (a *Authenticator) SignJWT(user *User, ttl time.Duration) (string, error) { } func (a *Authenticator) verifyAPIKey(ctx context.Context, apiKey string) (*User, error) { + if a.LocalAPIKeyVerifier != nil { + user, err := a.LocalAPIKeyVerifier(ctx, apiKey) + if err == nil { + return user, nil + } + if !errors.Is(err, ErrUnauthorized) { + return nil, err + } + if strings.HasPrefix(apiKey, "sk-gw-") { + return nil, ErrUnauthorized + } + } if a.ServerMainBaseURL == "" || a.ServerMainInternalToken == "" { return nil, ErrUnauthorized } diff --git a/apps/api/internal/config/config.go b/apps/api/internal/config/config.go index 967c22b..8902d76 100644 --- a/apps/api/internal/config/config.go +++ b/apps/api/internal/config/config.go @@ -41,7 +41,7 @@ func Load() Config { ), TaskProgressCallbackTimeoutMS: env("TASK_PROGRESS_CALLBACK_TIMEOUT_MS", "5000"), TaskProgressCallbackMaxAttempts: env("TASK_PROGRESS_CALLBACK_MAX_ATTEMPTS", "10"), - CORSAllowedOrigin: env("CORS_ALLOWED_ORIGIN", "http://localhost:5178"), + CORSAllowedOrigin: env("CORS_ALLOWED_ORIGIN", "http://localhost:5178,http://127.0.0.1:5178"), LogLevel: logLevel(env("LOG_LEVEL", "info")), } } diff --git a/apps/api/internal/httpapi/core_flow_integration_test.go b/apps/api/internal/httpapi/core_flow_integration_test.go new file mode 100644 index 0000000..3c16083 --- /dev/null +++ b/apps/api/internal/httpapi/core_flow_integration_test.go @@ -0,0 +1,298 @@ +package httpapi + +import ( + "bytes" + "context" + "encoding/json" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "runtime" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/config" + "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" + "github.com/jackc/pgx/v5/pgxpool" +) + +func TestCoreLocalFlow(t *testing.T) { + databaseURL := strings.TrimSpace(os.Getenv("AI_GATEWAY_TEST_DATABASE_URL")) + if databaseURL == "" { + t.Skip("set AI_GATEWAY_TEST_DATABASE_URL to run the PostgreSQL integration flow") + } + ctx := context.Background() + applyMigration(t, ctx, databaseURL) + + db, err := store.Connect(ctx, databaseURL) + if err != nil { + t.Fatalf("connect store: %v", err) + } + defer db.Close() + + handler := NewServer(config.Config{ + AppEnv: "test", + HTTPAddr: ":0", + DatabaseURL: databaseURL, + IdentityMode: "hybrid", + JWTSecret: "test-secret", + CORSAllowedOrigin: "*", + }, db, slog.New(slog.NewTextHandler(io.Discard, nil))) + server := httptest.NewServer(handler) + defer server.Close() + + suffix := time.Now().UnixNano() + suffixText := strconv.FormatInt(suffix, 10) + username := "smoke_admin_" + suffixText + password := "password123" + + var registerResponse struct { + AccessToken string `json:"accessToken"` + } + doJSON(t, server.URL, http.MethodPost, "/api/v1/auth/register", "", map[string]any{ + "username": username, + "email": username + "@example.com", + "password": password, + "tenantKey": "manual-" + suffixText, + "tenantName": "Manual Tenant", + }, http.StatusCreated, ®isterResponse) + if registerResponse.AccessToken == "" { + t.Fatal("register did not return access token") + } + + var duplicateResponse map[string]any + doJSON(t, server.URL, http.MethodPost, "/api/v1/auth/register", "", map[string]any{ + "username": username, + "email": username + "@example.com", + "password": password, + }, http.StatusConflict, &duplicateResponse) + if errorBody, ok := duplicateResponse["error"].(map[string]any); !ok || errorBody["message"] != "user already exists" { + t.Fatalf("unexpected duplicate response: %+v", duplicateResponse) + } + + var loginResponse struct { + AccessToken string `json:"accessToken"` + } + doJSON(t, server.URL, http.MethodPost, "/api/v1/auth/login", "", map[string]any{ + "account": username, + "password": password, + }, http.StatusOK, &loginResponse) + if loginResponse.AccessToken == "" { + t.Fatal("login did not return access token") + } + + var apiKeyResponse struct { + Secret string `json:"secret"` + APIKey struct { + ID string `json:"id"` + KeyPrefix string `json:"keyPrefix"` + Status string `json:"status"` + } `json:"apiKey"` + } + doJSON(t, server.URL, http.MethodPost, "/api/v1/api-keys", loginResponse.AccessToken, map[string]any{ + "name": "smoke key", + "scopes": []string{"chat", "image", "video"}, + }, http.StatusCreated, &apiKeyResponse) + if !strings.HasPrefix(apiKeyResponse.Secret, "sk-gw-") || apiKeyResponse.APIKey.Status != "active" { + t.Fatalf("unexpected api key response: %+v", apiKeyResponse) + } + + var me map[string]any + doJSON(t, server.URL, http.MethodGet, "/api/v1/me", apiKeyResponse.Secret, nil, http.StatusOK, &me) + if me["apiKeyId"] == "" { + t.Fatalf("api key auth did not expose apiKeyId: %+v", me) + } + if me["tenantKey"] != "default" { + t.Fatalf("register should ignore public tenant fields and use default tenant: %+v", me) + } + + testPool, err := pgxpool.New(ctx, databaseURL) + if err != nil { + t.Fatalf("connect test pool: %v", err) + } + defer testPool.Close() + + inviteCode := "INVITE-" + suffixText + if _, err := testPool.Exec(ctx, ` +INSERT INTO gateway_invitations (invite_code, max_uses, metadata) +VALUES ($1, 5, '{"purpose":"core-flow"}'::jsonb)`, inviteCode); err != nil { + t.Fatalf("insert invitation: %v", err) + } + invitedUsername := "smoke_invited_" + suffixText + var invitedRegisterResponse struct { + AccessToken string `json:"accessToken"` + } + doJSON(t, server.URL, http.MethodPost, "/api/v1/auth/register", "", map[string]any{ + "username": invitedUsername, + "email": invitedUsername + "@example.com", + "password": password, + "tenantKey": "manual-invited-" + suffixText, + "tenantName": "Manual Invited Tenant", + "invitationCode": inviteCode, + }, http.StatusCreated, &invitedRegisterResponse) + var invitedMe map[string]any + doJSON(t, server.URL, http.MethodGet, "/api/v1/me", invitedRegisterResponse.AccessToken, nil, http.StatusOK, &invitedMe) + if invitedMe["tenantKey"] != "default" { + t.Fatalf("invitation should not change tenant context: %+v", invitedMe) + } + var usedCount int + if err := testPool.QueryRow(ctx, `SELECT used_count FROM gateway_invitations WHERE invite_code = $1`, inviteCode).Scan(&usedCount); err != nil { + t.Fatalf("read invitation used_count: %v", err) + } + if usedCount != 1 { + t.Fatalf("invitation used_count = %d, want 1", usedCount) + } + var userMetadata []byte + if err := testPool.QueryRow(ctx, `SELECT metadata FROM gateway_users WHERE username = $1`, invitedUsername).Scan(&userMetadata); err != nil { + t.Fatalf("read invited user metadata: %v", err) + } + var metadata map[string]any + if err := json.Unmarshal(userMetadata, &metadata); err != nil { + t.Fatalf("decode invited user metadata: %v", err) + } + registration, ok := metadata["registration"].(map[string]any) + if !ok || registration["invitationCode"] != inviteCode { + t.Fatalf("invitation relationship was not recorded: %+v", metadata) + } + + var platform struct { + ID string `json:"id"` + Provider string `json:"provider"` + PlatformKey string `json:"platformKey"` + Status string `json:"status"` + } + doJSON(t, server.URL, http.MethodPost, "/api/v1/platforms", loginResponse.AccessToken, map[string]any{ + "provider": "openai", + "platformKey": "openai-smoke-" + suffixText, + "name": "OpenAI Smoke", + "baseUrl": "https://api.openai.com/v1", + "authType": "bearer", + "credentials": map[string]any{"mode": "simulation"}, + "config": map[string]any{"testMode": true}, + }, http.StatusCreated, &platform) + if platform.ID == "" || platform.Status != "enabled" { + t.Fatalf("unexpected platform response: %+v", platform) + } + + var taskResponse struct { + Task struct { + ID string `json:"id"` + Status string `json:"status"` + RunMode string `json:"runMode"` + Result map[string]any `json:"result"` + } `json:"task"` + } + doJSON(t, server.URL, http.MethodPost, "/api/v1/chat/completions", apiKeyResponse.Secret, map[string]any{ + "model": "gpt-4o-mini", + "runMode": "simulation", + "simulation": true, + "messages": []map[string]any{{"role": "user", "content": "ping"}}, + }, http.StatusAccepted, &taskResponse) + if taskResponse.Task.ID == "" || taskResponse.Task.Status != "succeeded" || taskResponse.Task.RunMode != "simulation" { + t.Fatalf("unexpected task response: %+v", taskResponse.Task) + } + + var taskDetail struct { + ID string `json:"id"` + Status string `json:"status"` + Result map[string]any `json:"result"` + } + doJSON(t, server.URL, http.MethodGet, "/api/v1/tasks/"+taskResponse.Task.ID, apiKeyResponse.Secret, nil, http.StatusOK, &taskDetail) + if taskDetail.Status != "succeeded" || taskDetail.Result["id"] == "" { + t.Fatalf("unexpected task detail: %+v", taskDetail) + } + + req, err := http.NewRequest(http.MethodGet, server.URL+"/api/v1/tasks/"+taskResponse.Task.ID+"/events", nil) + if err != nil { + t.Fatalf("build events request: %v", err) + } + req.Header.Set("Authorization", "Bearer "+apiKeyResponse.Secret) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("events request: %v", err) + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK || !bytes.Contains(body, []byte("task.completed")) { + t.Fatalf("unexpected events response status=%d body=%s", resp.StatusCode, string(body)) + } +} + +func TestOriginAllowedSupportsCommaSeparatedOrigins(t *testing.T) { + allowed := "http://localhost:5178, http://127.0.0.1:5178" + if !originAllowed("http://localhost:5178", allowed) { + t.Fatal("localhost origin should be allowed") + } + if !originAllowed("http://127.0.0.1:5178", allowed) { + t.Fatal("127.0.0.1 origin should be allowed") + } + if originAllowed("http://127.0.0.1:5179", allowed) { + t.Fatal("unexpected origin should not be allowed") + } +} + +func applyMigration(t *testing.T, ctx context.Context, databaseURL string) { + t.Helper() + _, filename, _, _ := runtime.Caller(0) + migrationFiles, err := filepath.Glob(filepath.Join(filepath.Dir(filename), "..", "..", "migrations", "*.sql")) + if err != nil { + t.Fatalf("read migration files: %v", err) + } + sort.Strings(migrationFiles) + pool, err := pgxpool.New(ctx, databaseURL) + if err != nil { + t.Fatalf("connect migration db: %v", err) + } + defer pool.Close() + for _, migrationPath := range migrationFiles { + migration, err := os.ReadFile(migrationPath) + if err != nil { + t.Fatalf("read migration %s: %v", filepath.Base(migrationPath), err) + } + if _, err := pool.Exec(ctx, string(migration)); err != nil { + t.Fatalf("apply migration %s: %v", filepath.Base(migrationPath), err) + } + } +} + +func doJSON(t *testing.T, baseURL string, method string, path string, token string, payload any, expectedStatus int, out any) { + t.Helper() + var body io.Reader + if payload != nil { + raw, err := json.Marshal(payload) + if err != nil { + t.Fatalf("marshal payload: %v", err) + } + body = bytes.NewReader(raw) + } + req, err := http.NewRequest(method, baseURL+path, body) + if err != nil { + t.Fatalf("build request: %v", err) + } + if payload != nil { + req.Header.Set("Content-Type", "application/json") + } + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("%s %s: %v", method, path, err) + } + defer resp.Body.Close() + raw, _ := io.ReadAll(resp.Body) + if resp.StatusCode != expectedStatus { + t.Fatalf("%s %s status=%d want=%d body=%s", method, path, resp.StatusCode, expectedStatus, string(raw)) + } + if out != nil && len(raw) > 0 { + if err := json.Unmarshal(raw, out); err != nil { + t.Fatalf("decode %s %s response: %v body=%s", method, path, err, string(raw)) + } + } +} diff --git a/apps/api/internal/httpapi/handlers.go b/apps/api/internal/httpapi/handlers.go index 3993649..a979690 100644 --- a/apps/api/internal/httpapi/handlers.go +++ b/apps/api/internal/httpapi/handlers.go @@ -54,8 +54,12 @@ func (s *Server) register(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, err.Error()) return } + if errors.Is(err, store.ErrUserAlreadyExists) { + writeError(w, http.StatusConflict, err.Error()) + return + } s.logger.Error("register local user failed", "error", err) - writeError(w, http.StatusConflict, "user already exists or tenant is unavailable") + writeError(w, http.StatusInternalServerError, "register local user failed") return } s.writeAuthResponse(w, http.StatusCreated, user) @@ -230,6 +234,56 @@ func (s *Server) listUserGroups(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]any{"items": items}) } +func (s *Server) listAPIKeys(w http.ResponseWriter, r *http.Request) { + user, _ := auth.UserFromContext(r.Context()) + items, err := s.store.ListAPIKeys(r.Context(), user) + if err != nil { + s.logger.Error("list api keys failed", "error", err) + writeError(w, http.StatusInternalServerError, "list api keys failed") + return + } + writeJSON(w, http.StatusOK, map[string]any{"items": items}) +} + +func (s *Server) createAPIKey(w http.ResponseWriter, r *http.Request) { + user, _ := auth.UserFromContext(r.Context()) + var input store.CreateAPIKeyInput + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + writeError(w, http.StatusBadRequest, "invalid json body") + return + } + created, err := s.store.CreateAPIKey(r.Context(), input, user) + if err != nil { + if errors.Is(err, store.ErrLocalUserRequired) { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + s.logger.Error("create api key failed", "error", err) + writeError(w, http.StatusInternalServerError, "create api key failed") + return + } + writeJSON(w, http.StatusCreated, created) +} + +func (s *Server) disableAPIKey(w http.ResponseWriter, r *http.Request) { + user, _ := auth.UserFromContext(r.Context()) + item, err := s.store.DisableAPIKey(r.Context(), r.PathValue("apiKeyID"), user) + if err == nil { + writeJSON(w, http.StatusOK, item) + return + } + if errors.Is(err, store.ErrLocalUserRequired) { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + if store.IsNotFound(err) { + writeError(w, http.StatusNotFound, "api key not found") + return + } + s.logger.Error("disable api key failed", "error", err) + writeError(w, http.StatusInternalServerError, "disable api key failed") +} + func (s *Server) estimatePricing(w http.ResponseWriter, r *http.Request) { var body map[string]any if err := json.NewDecoder(r.Body).Decode(&body); err != nil { @@ -275,6 +329,7 @@ func (s *Server) createTask(kind string) http.Handler { task, err := s.store.CreateTask(r.Context(), store.CreateTaskInput{ Kind: kind, Model: model, + RunMode: runModeFromRequest(body), Request: body, }, user) if err != nil { @@ -322,24 +377,37 @@ func (s *Server) taskEvents(w http.ResponseWriter, r *http.Request) { w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - sendSSE(w, "task.accepted", map[string]any{ - "taskId": task.ID, - "status": task.Status, - }) - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } - - timer := time.NewTimer(250 * time.Millisecond) - defer timer.Stop() - - select { - case <-r.Context().Done(): + events, err := s.store.ListTaskEvents(r.Context(), task.ID) + if err != nil { + s.logger.Error("list task events failed", "error", err) return - case <-timer.C: - sendSSE(w, "task.placeholder", map[string]any{ - "taskId": task.ID, - "message": "runtime worker is not wired yet", + } + for _, event := range events { + sendSSE(w, event.EventType, event) + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + } + if len(events) == 0 { + sendSSE(w, "task.accepted", map[string]any{ + "taskId": task.ID, + "status": task.Status, }) } } + +func runModeFromRequest(body map[string]any) string { + if value, ok := body["runMode"].(string); ok { + return value + } + if value, ok := body["mode"].(string); ok { + return value + } + if value, ok := body["simulation"].(bool); ok && value { + return "simulation" + } + if value, ok := body["testMode"].(bool); ok && value { + return "simulation" + } + return "" +} diff --git a/apps/api/internal/httpapi/server.go b/apps/api/internal/httpapi/server.go index 36dbc47..e90c257 100644 --- a/apps/api/internal/httpapi/server.go +++ b/apps/api/internal/httpapi/server.go @@ -24,6 +24,7 @@ func NewServer(cfg config.Config, db *store.Store, logger *slog.Logger) http.Han auth: auth.New(cfg.JWTSecret, cfg.ServerMainBaseURL, cfg.ServerMainInternalToken), logger: logger, } + server.auth.LocalAPIKeyVerifier = db.VerifyLocalAPIKey mux := http.NewServeMux() mux.HandleFunc("GET /healthz", server.health) @@ -37,6 +38,9 @@ func NewServer(cfg config.Config, db *store.Store, logger *slog.Logger) http.Han mux.Handle("GET /api/v1/tenants", server.auth.Require(auth.PermissionPower, http.HandlerFunc(server.listTenants))) mux.Handle("GET /api/v1/users", server.auth.Require(auth.PermissionPower, http.HandlerFunc(server.listUsers))) mux.Handle("GET /api/v1/user-groups", server.auth.Require(auth.PermissionPower, http.HandlerFunc(server.listUserGroups))) + mux.Handle("GET /api/v1/api-keys", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.listAPIKeys))) + mux.Handle("POST /api/v1/api-keys", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.createAPIKey))) + mux.Handle("PATCH /api/v1/api-keys/{apiKeyID}/disable", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.disableAPIKey))) mux.Handle("GET /api/v1/pricing/rules", server.auth.Require(auth.PermissionPower, http.HandlerFunc(server.listPricingRules))) mux.Handle("POST /api/v1/pricing/estimate", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.estimatePricing))) mux.Handle("GET /api/v1/platforms", server.auth.Require(auth.PermissionPower, http.HandlerFunc(server.listPlatforms))) @@ -55,7 +59,7 @@ func NewServer(cfg config.Config, db *store.Store, logger *slog.Logger) http.Han func (s *Server) cors(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { origin := r.Header.Get("Origin") - if origin != "" && (s.cfg.CORSAllowedOrigin == "*" || strings.EqualFold(origin, s.cfg.CORSAllowedOrigin)) { + if origin != "" && originAllowed(origin, s.cfg.CORSAllowedOrigin) { w.Header().Set("Access-Control-Allow-Origin", origin) w.Header().Set("Vary", "Origin") w.Header().Set("Access-Control-Allow-Credentials", "true") @@ -70,6 +74,16 @@ func (s *Server) cors(next http.Handler) http.Handler { }) } +func originAllowed(origin string, allowed string) bool { + for _, item := range strings.Split(allowed, ",") { + item = strings.TrimSpace(item) + if item == "*" || strings.EqualFold(origin, item) { + return true + } + } + return false +} + func (s *Server) recover(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { defer func() { diff --git a/apps/api/internal/store/postgres.go b/apps/api/internal/store/postgres.go index 4b35d9e..e4cc284 100644 --- a/apps/api/internal/store/postgres.go +++ b/apps/api/internal/store/postgres.go @@ -2,6 +2,8 @@ package store import ( "context" + "crypto/rand" + "encoding/base64" "encoding/json" "errors" "strings" @@ -10,6 +12,7 @@ import ( "github.com/easyai/easyai-ai-gateway/apps/api/internal/auth" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/crypto/bcrypt" ) @@ -21,6 +24,8 @@ type Store struct { var ( ErrInvalidCredentials = errors.New("invalid account or password") ErrInvalidInvitation = errors.New("invalid or expired invitation code") + ErrLocalUserRequired = errors.New("local gateway user is required") + ErrUserAlreadyExists = errors.New("user already exists") ErrWeakPassword = errors.New("password must be at least 8 characters") ) @@ -73,6 +78,37 @@ type CreatePlatformInput struct { Priority int `json:"priority"` } +type CreateAPIKeyInput struct { + Name string `json:"name"` + Scopes []string `json:"scopes"` + ExpiresAt string `json:"expiresAt"` +} + +type APIKey struct { + ID string `json:"id"` + GatewayTenantID string `json:"gatewayTenantId,omitempty"` + GatewayUserID string `json:"gatewayUserId"` + TenantID string `json:"tenantId,omitempty"` + TenantKey string `json:"tenantKey,omitempty"` + UserID string `json:"userId,omitempty"` + KeyPrefix string `json:"keyPrefix"` + Name string `json:"name"` + Scopes []string `json:"scopes,omitempty"` + UserGroupID string `json:"userGroupId,omitempty"` + RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"` + QuotaPolicy map[string]any `json:"quotaPolicy,omitempty"` + Status string `json:"status"` + ExpiresAt string `json:"expiresAt,omitempty"` + LastUsedAt string `json:"lastUsedAt,omitempty"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +type CreatedAPIKey struct { + APIKey APIKey `json:"apiKey"` + Secret string `json:"secret"` +} + type PlatformModel struct { ID string `json:"id"` PlatformID string `json:"platformId"` @@ -161,8 +197,6 @@ type LocalRegisterInput struct { Email string `json:"email"` Password string `json:"password"` DisplayName string `json:"displayName"` - TenantKey string `json:"tenantKey"` - TenantName string `json:"tenantName"` InvitationCode string `json:"invitationCode"` } @@ -228,12 +262,14 @@ type RateLimitWindow struct { type CreateTaskInput struct { Kind string `json:"kind"` Model string `json:"model"` + RunMode string `json:"runMode"` Request map[string]any `json:"request"` } type GatewayTask struct { ID string `json:"id"` Kind string `json:"kind"` + RunMode string `json:"runMode"` UserID string `json:"userId"` GatewayUserID string `json:"gatewayUserId,omitempty"` UserSource string `json:"userSource,omitempty"` @@ -252,6 +288,20 @@ type GatewayTask struct { UpdatedAt time.Time `json:"updatedAt"` } +type TaskEvent struct { + ID string `json:"id"` + TaskID string `json:"taskId"` + Seq int64 `json:"seq"` + EventType string `json:"eventType"` + Status string `json:"status,omitempty"` + Phase string `json:"phase,omitempty"` + Progress float64 `json:"progress,omitempty"` + Message string `json:"message,omitempty"` + Payload map[string]any `json:"payload,omitempty"` + Simulated bool `json:"simulated"` + CreatedAt time.Time `json:"createdAt"` +} + func (s *Store) ListPlatforms(ctx context.Context) ([]Platform, error) { rows, err := s.pool.Query(ctx, ` SELECT id::text, provider, platform_key, name, COALESCE(base_url, ''), auth_type, status, priority, @@ -656,6 +706,216 @@ ORDER BY priority ASC, group_key ASC`) return items, rows.Err() } +func (s *Store) ListAPIKeys(ctx context.Context, user *auth.User) ([]APIKey, error) { + gatewayUserID := localGatewayUserID(user) + if gatewayUserID == "" { + return []APIKey{}, nil + } + rows, err := s.pool.Query(ctx, ` +SELECT id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text, + COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''), + key_prefix, name, scopes, COALESCE(user_group_id::text, ''), + rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''), + COALESCE(last_used_at::text, ''), created_at, updated_at +FROM gateway_api_keys +WHERE gateway_user_id = $1::uuid AND deleted_at IS NULL +ORDER BY created_at DESC`, gatewayUserID) + if err != nil { + return nil, err + } + defer rows.Close() + + items := make([]APIKey, 0) + for rows.Next() { + item, err := scanAPIKey(rows) + if err != nil { + return nil, err + } + items = append(items, item) + } + return items, rows.Err() +} + +func (s *Store) CreateAPIKey(ctx context.Context, input CreateAPIKeyInput, user *auth.User) (CreatedAPIKey, error) { + gatewayUserID := localGatewayUserID(user) + if gatewayUserID == "" { + return CreatedAPIKey{}, ErrLocalUserRequired + } + name := strings.TrimSpace(input.Name) + if name == "" { + name = "Default API Key" + } + scopes := input.Scopes + if len(scopes) == 0 { + scopes = []string{"chat", "image", "video"} + } + secret, err := generateAPIKeySecret() + if err != nil { + return CreatedAPIKey{}, err + } + keyHash, err := bcrypt.GenerateFromPassword([]byte(secret), bcrypt.DefaultCost) + if err != nil { + return CreatedAPIKey{}, err + } + scopesJSON, err := json.Marshal(scopes) + if err != nil { + return CreatedAPIKey{}, err + } + + var item APIKey + var scopesBytes []byte + var rateLimitPolicy []byte + var quotaPolicy []byte + err = s.pool.QueryRow(ctx, ` +INSERT INTO gateway_api_keys ( + gateway_tenant_id, gateway_user_id, tenant_id, tenant_key, user_id, + key_prefix, key_hash, name, scopes, expires_at +) +VALUES (NULLIF($1, '')::uuid, $2::uuid, NULLIF($3, ''), NULLIF($4, ''), NULLIF($5, ''), + $6, $7, $8, $9::jsonb, NULLIF($10, '')::timestamptz) +RETURNING id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text, + COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''), + key_prefix, name, scopes, COALESCE(user_group_id::text, ''), + rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''), + COALESCE(last_used_at::text, ''), created_at, updated_at`, + user.GatewayTenantID, gatewayUserID, user.TenantID, user.TenantKey, user.ID, + apiKeyPrefix(secret), string(keyHash), name, string(scopesJSON), strings.TrimSpace(input.ExpiresAt), + ).Scan( + &item.ID, + &item.GatewayTenantID, + &item.GatewayUserID, + &item.TenantID, + &item.TenantKey, + &item.UserID, + &item.KeyPrefix, + &item.Name, + &scopesBytes, + &item.UserGroupID, + &rateLimitPolicy, + "aPolicy, + &item.Status, + &item.ExpiresAt, + &item.LastUsedAt, + &item.CreatedAt, + &item.UpdatedAt, + ) + if err != nil { + return CreatedAPIKey{}, err + } + item.Scopes = decodeStringArray(scopesBytes) + item.RateLimitPolicy = decodeObject(rateLimitPolicy) + item.QuotaPolicy = decodeObject(quotaPolicy) + return CreatedAPIKey{APIKey: item, Secret: secret}, nil +} + +func (s *Store) DisableAPIKey(ctx context.Context, apiKeyID string, user *auth.User) (APIKey, error) { + gatewayUserID := localGatewayUserID(user) + if gatewayUserID == "" { + return APIKey{}, ErrLocalUserRequired + } + var item APIKey + var scopesBytes []byte + var rateLimitPolicy []byte + var quotaPolicy []byte + err := s.pool.QueryRow(ctx, ` +UPDATE gateway_api_keys +SET status = 'disabled', updated_at = now() +WHERE id = $1::uuid AND gateway_user_id = $2::uuid AND deleted_at IS NULL +RETURNING id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text, + COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''), + key_prefix, name, scopes, COALESCE(user_group_id::text, ''), + rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''), + COALESCE(last_used_at::text, ''), created_at, updated_at`, + apiKeyID, gatewayUserID, + ).Scan( + &item.ID, + &item.GatewayTenantID, + &item.GatewayUserID, + &item.TenantID, + &item.TenantKey, + &item.UserID, + &item.KeyPrefix, + &item.Name, + &scopesBytes, + &item.UserGroupID, + &rateLimitPolicy, + "aPolicy, + &item.Status, + &item.ExpiresAt, + &item.LastUsedAt, + &item.CreatedAt, + &item.UpdatedAt, + ) + if err != nil { + return APIKey{}, err + } + item.Scopes = decodeStringArray(scopesBytes) + item.RateLimitPolicy = decodeObject(rateLimitPolicy) + item.QuotaPolicy = decodeObject(quotaPolicy) + return item, nil +} + +func (s *Store) VerifyLocalAPIKey(ctx context.Context, secret string) (*auth.User, error) { + prefix := apiKeyPrefix(secret) + if prefix == "" { + return nil, auth.ErrUnauthorized + } + rows, err := s.pool.Query(ctx, ` +SELECT k.id::text, k.key_hash, COALESCE(k.user_group_id::text, ''), + u.id::text, u.username, u.roles, COALESCE(u.gateway_tenant_id::text, ''), + COALESCE(u.tenant_id, ''), COALESCE(u.tenant_key, '') +FROM gateway_api_keys k +JOIN gateway_users u ON u.id = k.gateway_user_id +WHERE k.key_prefix = $1 + AND k.status = 'active' + AND k.deleted_at IS NULL + AND u.status = 'active' + AND u.deleted_at IS NULL + AND (k.expires_at IS NULL OR k.expires_at > now())`, prefix) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var apiKeyID string + var hash string + var userGroupID string + var gatewayUserID string + var username string + var rolesBytes []byte + var gatewayTenantID string + var tenantID string + var tenantKey string + if err := rows.Scan(&apiKeyID, &hash, &userGroupID, &gatewayUserID, &username, &rolesBytes, &gatewayTenantID, &tenantID, &tenantKey); err != nil { + return nil, err + } + if bcrypt.CompareHashAndPassword([]byte(hash), []byte(secret)) != nil { + continue + } + if _, err := s.pool.Exec(ctx, `UPDATE gateway_api_keys SET last_used_at = now(), updated_at = now() WHERE id = $1::uuid`, apiKeyID); err != nil { + return nil, err + } + return &auth.User{ + ID: gatewayUserID, + Username: username, + Roles: decodeStringArray(rolesBytes), + TenantID: tenantID, + GatewayTenantID: gatewayTenantID, + TenantKey: tenantKey, + Source: "gateway", + GatewayUserID: gatewayUserID, + UserGroupID: userGroupID, + APIKeyID: apiKeyID, + APIKeyName: prefix, + }, nil + } + if err := rows.Err(); err != nil { + return nil, err + } + return nil, auth.ErrUnauthorized +} + func (s *Store) RegisterLocalUser(ctx context.Context, input LocalRegisterInput) (GatewayUser, error) { account := normalizeAccount(firstNonEmpty(input.Username, input.Email)) if account == "" { @@ -664,14 +924,8 @@ func (s *Store) RegisterLocalUser(ctx context.Context, input LocalRegisterInput) if len(input.Password) < 8 { return GatewayUser{}, ErrWeakPassword } - tenantKey := normalizeKey(input.TenantKey) - if tenantKey == "" { - tenantKey = "personal-" + normalizeKey(account) - } - tenantName := strings.TrimSpace(input.TenantName) - if tenantName == "" { - tenantName = tenantKey - } + tenantKey := "default" + tenantName := "Default Tenant" displayName := strings.TrimSpace(input.DisplayName) username := strings.TrimSpace(input.Username) if username == "" { @@ -695,61 +949,79 @@ func (s *Store) RegisterLocalUser(ctx context.Context, input LocalRegisterInput) userGroupID := "" role := "user" invitationID := "" + invitedBy := "" + var isBootstrapUser bool + if err := tx.QueryRow(ctx, ` +SELECT NOT EXISTS ( + SELECT 1 FROM gateway_users WHERE source = 'gateway' AND deleted_at IS NULL +)`).Scan(&isBootstrapUser); err != nil { + return GatewayUser{}, err + } + if isBootstrapUser { + role = "admin" + } + if err := tx.QueryRow(ctx, ` + INSERT INTO gateway_tenants (tenant_key, source, external_tenant_id, name) + VALUES ($1, 'gateway', $1, $2) + ON CONFLICT (tenant_key) DO UPDATE SET updated_at=now() + RETURNING id::text`, + tenantKey, tenantName, + ).Scan(&tenantID); err != nil { + return GatewayUser{}, err + } if invitationCode != "" { if err := tx.QueryRow(ctx, ` -SELECT i.id::text, - i.tenant_id::text, - t.tenant_key, - t.name, - COALESCE(i.user_group_id::text, t.default_user_group_id::text, ''), - COALESCE(NULLIF(i.role, ''), 'user') -FROM gateway_tenant_invitations i -JOIN gateway_tenants t ON t.id = i.tenant_id +SELECT i.id::text, COALESCE(i.created_by::text, '') +FROM gateway_invitations i WHERE lower(i.invite_code) = lower($1) AND i.status = 'active' - AND t.status = 'active' AND (i.expires_at IS NULL OR i.expires_at > now()) AND (i.max_uses IS NULL OR i.used_count < i.max_uses) FOR UPDATE OF i`, invitationCode, - ).Scan(&invitationID, &tenantID, &tenantKey, &tenantName, &userGroupID, &role); err != nil { + ).Scan(&invitationID, &invitedBy); err != nil { if errors.Is(err, pgx.ErrNoRows) { return GatewayUser{}, ErrInvalidInvitation } return GatewayUser{}, err } - } else if err := tx.QueryRow(ctx, ` - INSERT INTO gateway_tenants (tenant_key, source, external_tenant_id, name) - VALUES ($1, 'gateway', $1, $2) - ON CONFLICT (tenant_key) DO UPDATE SET updated_at=now() - RETURNING id::text`, - tenantKey, tenantName, - ).Scan(&tenantID); err != nil { - return GatewayUser{}, err } rolesJSON, err := json.Marshal([]string{role}) if err != nil { return GatewayUser{}, err } + userMetadataJSON := []byte("{}") + if invitationID != "" { + userMetadataJSON, err = json.Marshal(map[string]any{ + "registration": map[string]any{ + "invitationId": invitationID, + "invitationCode": invitationCode, + "invitedBy": invitedBy, + }, + }) + if err != nil { + return GatewayUser{}, err + } + } var user GatewayUser var roles []byte var authProfile []byte var metadata []byte if err := tx.QueryRow(ctx, ` - INSERT INTO gateway_users ( - user_key, source, external_user_id, username, display_name, email, - password_hash, gateway_tenant_id, tenant_id, tenant_key, default_user_group_id, roles, status - ) - VALUES ($1, 'gateway', $2, $3, NULLIF($4, ''), NULLIF($5, ''), $6, $7::uuid, $8, $8, NULLIF($9, '')::uuid, $10::jsonb, 'active') - RETURNING id::text, user_key, source, COALESCE(external_user_id, ''), username, - COALESCE(display_name, ''), COALESCE(email, ''), COALESCE(phone, ''), COALESCE(avatar_url, ''), - COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), - COALESCE(default_user_group_id::text, ''), roles, auth_profile, metadata, - status, COALESCE(last_login_at::text, ''), COALESCE(synced_at::text, ''), COALESCE(source_updated_at::text, ''), - created_at, updated_at`, - "gateway:"+account, account, username, displayName, email, string(passwordHash), tenantID, tenantKey, userGroupID, string(rolesJSON), + INSERT INTO gateway_users ( + user_key, source, external_user_id, username, display_name, email, + password_hash, gateway_tenant_id, tenant_id, tenant_key, default_user_group_id, roles, metadata, status + ) + VALUES ($1, 'gateway', $2, $3, NULLIF($4, ''), NULLIF($5, ''), $6, $7::uuid, $8, $8, NULLIF($9, '')::uuid, $10::jsonb, $11::jsonb, 'active') + RETURNING id::text, user_key, source, COALESCE(external_user_id, ''), username, + COALESCE(display_name, ''), COALESCE(email, ''), COALESCE(phone, ''), COALESCE(avatar_url, ''), + COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), + COALESCE(default_user_group_id::text, ''), roles, auth_profile, metadata, + status, COALESCE(last_login_at::text, ''), COALESCE(synced_at::text, ''), COALESCE(source_updated_at::text, ''), + created_at, updated_at`, + "gateway:"+account, account, username, displayName, email, string(passwordHash), tenantID, tenantKey, userGroupID, string(rolesJSON), string(userMetadataJSON), ).Scan( &user.ID, &user.UserKey, @@ -774,33 +1046,28 @@ FOR UPDATE OF i`, &user.CreatedAt, &user.UpdatedAt, ); err != nil { + if isUniqueViolation(err) { + return GatewayUser{}, ErrUserAlreadyExists + } return GatewayUser{}, err } if invitationID != "" { if _, err := tx.Exec(ctx, ` -UPDATE gateway_tenant_invitations +UPDATE gateway_invitations SET used_count = used_count + 1, updated_at = now() WHERE id = $1::uuid`, invitationID); err != nil { return GatewayUser{}, err } } - if userGroupID != "" { - metadata, err := json.Marshal(map[string]any{ - "source": "registration", - "invitationId": invitationID, - }) - if err != nil { - return GatewayUser{}, err - } - if _, err := tx.Exec(ctx, ` -INSERT INTO gateway_user_group_memberships (group_id, principal_type, principal_id, source, metadata) -VALUES ($1::uuid, 'user', $2, 'gateway', $3::jsonb) -ON CONFLICT (group_id, principal_type, principal_id) -DO UPDATE SET status = 'active', updated_at = now()`, - userGroupID, user.ID, string(metadata), - ); err != nil { - return GatewayUser{}, err - } + if _, err := tx.Exec(ctx, ` +INSERT INTO gateway_wallet_accounts ( + gateway_tenant_id, gateway_user_id, tenant_id, tenant_key, user_id, currency +) +VALUES ($1::uuid, $2::uuid, $3, $3, $4, 'resource') +ON CONFLICT (gateway_user_id, currency) DO NOTHING`, + user.GatewayTenantID, user.ID, user.TenantKey, user.ID, + ); err != nil { + return GatewayUser{}, err } if err := tx.Commit(ctx); err != nil { return GatewayUser{}, err @@ -913,24 +1180,58 @@ ORDER BY window_start DESC, scope_type ASC, scope_key ASC, metric ASC`) func (s *Store) CreateTask(ctx context.Context, input CreateTaskInput, user *auth.User) (GatewayTask, error) { requestBody, _ := json.Marshal(input.Request) + runMode := normalizeRunMode(input.RunMode, input.Request) + status := "queued" + result := map[string]any(nil) + billings := []any(nil) + finished := false + if runMode == "simulation" { + status = "succeeded" + result = simulationResult(input.Kind, input.Model) + billings = simulationBillings(input.Kind, input.Model) + finished = true + } + resultBody, _ := json.Marshal(result) + billingsBody, _ := json.Marshal(billings) + + tx, err := s.pool.Begin(ctx) + if err != nil { + return GatewayTask{}, err + } + defer tx.Rollback(ctx) + var task GatewayTask var requestBytes []byte var resultBytes []byte var billingsBytes []byte - err := s.pool.QueryRow(ctx, ` -INSERT INTO gateway_tasks ( - kind, user_id, gateway_user_id, user_source, gateway_tenant_id, tenant_id, tenant_key, - api_key_id, user_group_id, user_group_key, model, request, status -) -VALUES ($1, $2, NULLIF($3, '')::uuid, COALESCE(NULLIF($4, ''), 'gateway'), NULLIF($5, '')::uuid, NULLIF($6, ''), NULLIF($7, ''), NULLIF($8, ''), NULLIF($9, '')::uuid, NULLIF($10, ''), $11, $12, 'queued') -RETURNING id::text, kind, user_id, COALESCE(gateway_user_id::text, ''), user_source, - COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), - COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model, request, status, result, billings, COALESCE(error, ''), created_at, updated_at`, - input.Kind, user.ID, user.GatewayUserID, user.Source, user.GatewayTenantID, user.TenantID, user.TenantKey, user.APIKeyID, user.UserGroupID, user.UserGroupKey, input.Model, requestBody, - ).Scan(&task.ID, &task.Kind, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt) + err = tx.QueryRow(ctx, ` + INSERT INTO gateway_tasks ( + kind, run_mode, user_id, gateway_user_id, user_source, gateway_tenant_id, tenant_id, tenant_key, + api_key_id, user_group_id, user_group_key, model, request, status, result, billings, finished_at + ) + VALUES ($1, $2, $3, NULLIF($4, '')::uuid, COALESCE(NULLIF($5, ''), 'gateway'), NULLIF($6, '')::uuid, NULLIF($7, ''), NULLIF($8, ''), NULLIF($9, ''), NULLIF($10, '')::uuid, NULLIF($11, ''), $12, $13, $14, $15::jsonb, $16::jsonb, CASE WHEN $17 THEN now() ELSE NULL END) + RETURNING id::text, kind, run_mode, user_id, COALESCE(gateway_user_id::text, ''), user_source, + COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), + COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model, request, status, result, billings, COALESCE(error, ''), created_at, updated_at`, + input.Kind, runMode, user.ID, user.GatewayUserID, user.Source, user.GatewayTenantID, user.TenantID, user.TenantKey, user.APIKeyID, user.UserGroupID, user.UserGroupKey, input.Model, requestBody, status, resultBody, billingsBody, finished, + ).Scan(&task.ID, &task.Kind, &task.RunMode, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt) if err != nil { return GatewayTask{}, err } + events := taskEventsForCreate(task.ID, runMode, status, result) + for _, event := range events { + payload, _ := json.Marshal(event.Payload) + if _, err := tx.Exec(ctx, ` +INSERT INTO gateway_task_events (task_id, seq, event_type, status, phase, progress, message, payload, simulated) +VALUES ($1::uuid, $2, $3, NULLIF($4, ''), NULLIF($5, ''), $6, NULLIF($7, ''), $8::jsonb, $9)`, + task.ID, event.Seq, event.EventType, event.Status, event.Phase, event.Progress, event.Message, string(payload), event.Simulated, + ); err != nil { + return GatewayTask{}, err + } + } + if err := tx.Commit(ctx); err != nil { + return GatewayTask{}, err + } task.Request = decodeObject(requestBytes) task.Result = decodeObject(resultBytes) task.Billings = decodeArray(billingsBytes) @@ -943,12 +1244,12 @@ func (s *Store) GetTask(ctx context.Context, taskID string) (GatewayTask, error) var resultBytes []byte var billingsBytes []byte err := s.pool.QueryRow(ctx, ` -SELECT id::text, kind, user_id, COALESCE(gateway_user_id::text, ''), user_source, +SELECT id::text, kind, run_mode, user_id, COALESCE(gateway_user_id::text, ''), user_source, COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model, request, status, result, billings, COALESCE(error, ''), created_at, updated_at -FROM gateway_tasks -WHERE id=$1`, taskID, - ).Scan(&task.ID, &task.Kind, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt) + FROM gateway_tasks + WHERE id=$1`, taskID, + ).Scan(&task.ID, &task.Kind, &task.RunMode, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt) if err != nil { return GatewayTask{}, err } @@ -958,10 +1259,239 @@ WHERE id=$1`, taskID, return task, nil } +func (s *Store) ListTaskEvents(ctx context.Context, taskID string) ([]TaskEvent, error) { + rows, err := s.pool.Query(ctx, ` +SELECT id::text, task_id::text, seq, event_type, COALESCE(status, ''), COALESCE(phase, ''), + COALESCE(progress, 0)::float8, COALESCE(message, ''), payload, simulated, created_at +FROM gateway_task_events +WHERE task_id = $1::uuid +ORDER BY seq ASC`, taskID) + if err != nil { + return nil, err + } + defer rows.Close() + + items := make([]TaskEvent, 0) + for rows.Next() { + var item TaskEvent + var payload []byte + if err := rows.Scan( + &item.ID, + &item.TaskID, + &item.Seq, + &item.EventType, + &item.Status, + &item.Phase, + &item.Progress, + &item.Message, + &payload, + &item.Simulated, + &item.CreatedAt, + ); err != nil { + return nil, err + } + item.Payload = decodeObject(payload) + items = append(items, item) + } + return items, rows.Err() +} + func IsNotFound(err error) bool { return err == pgx.ErrNoRows } +func isUniqueViolation(err error) bool { + var pgErr *pgconn.PgError + return errors.As(err, &pgErr) && pgErr.Code == "23505" +} + +type apiKeyScanner interface { + Scan(dest ...any) error +} + +func scanAPIKey(scanner apiKeyScanner) (APIKey, error) { + var item APIKey + var scopesBytes []byte + var rateLimitPolicy []byte + var quotaPolicy []byte + if err := scanner.Scan( + &item.ID, + &item.GatewayTenantID, + &item.GatewayUserID, + &item.TenantID, + &item.TenantKey, + &item.UserID, + &item.KeyPrefix, + &item.Name, + &scopesBytes, + &item.UserGroupID, + &rateLimitPolicy, + "aPolicy, + &item.Status, + &item.ExpiresAt, + &item.LastUsedAt, + &item.CreatedAt, + &item.UpdatedAt, + ); err != nil { + return APIKey{}, err + } + item.Scopes = decodeStringArray(scopesBytes) + item.RateLimitPolicy = decodeObject(rateLimitPolicy) + item.QuotaPolicy = decodeObject(quotaPolicy) + return item, nil +} + +func localGatewayUserID(user *auth.User) string { + if user == nil { + return "" + } + if user.GatewayUserID != "" { + return user.GatewayUserID + } + if user.Source == "" || user.Source == "gateway" { + return user.ID + } + return "" +} + +func generateAPIKeySecret() (string, error) { + bytes := make([]byte, 32) + if _, err := rand.Read(bytes); err != nil { + return "", err + } + return "sk-gw-" + base64.RawURLEncoding.EncodeToString(bytes), nil +} + +func apiKeyPrefix(secret string) string { + secret = strings.TrimSpace(secret) + if !strings.HasPrefix(secret, "sk-gw-") { + return "" + } + if len(secret) <= 18 { + return secret + } + return secret[:18] +} + +func normalizeRunMode(input string, request map[string]any) string { + mode := strings.ToLower(strings.TrimSpace(input)) + if mode == "" && request != nil { + if raw, ok := request["runMode"].(string); ok { + mode = strings.ToLower(strings.TrimSpace(raw)) + } + if raw, ok := request["mode"].(string); ok && mode == "" { + mode = strings.ToLower(strings.TrimSpace(raw)) + } + if raw, ok := request["simulation"].(bool); ok && raw { + mode = "simulation" + } + if raw, ok := request["testMode"].(bool); ok && raw { + mode = "simulation" + } + } + if mode == "test" || mode == "dry-run" || mode == "dry_run" { + return "simulation" + } + if mode == "simulation" { + return "simulation" + } + return "production" +} + +func simulationResult(kind string, model string) map[string]any { + switch kind { + case "chat.completions": + return map[string]any{ + "id": "chatcmpl-simulated", + "object": "chat.completion", + "model": model, + "choices": []any{map[string]any{"index": 0, "finish_reason": "stop", "message": map[string]any{"role": "assistant", "content": "simulation response"}}}, + "usage": map[string]any{"prompt_tokens": 12, "completion_tokens": 8, "total_tokens": 20}, + } + case "images.generations": + return map[string]any{ + "id": "img-simulated", + "model": model, + "data": []any{map[string]any{"url": "/static/simulation/image.png", "revised_prompt": "simulation image"}}, + } + case "videos.generations": + return map[string]any{ + "id": "video-simulated", + "model": model, + "data": []any{map[string]any{"url": "/static/simulation/video.mp4", "duration": 5}}, + } + default: + return map[string]any{"id": "task-simulated", "model": model, "kind": kind, "ok": true} + } +} + +func simulationBillings(kind string, model string) []any { + resourceType := "task" + unit := "item" + if kind == "chat.completions" { + resourceType = "text_total" + unit = "1k_tokens" + } + if kind == "images.generations" { + resourceType = "image" + unit = "image" + } + if kind == "videos.generations" { + resourceType = "video" + unit = "second" + } + return []any{map[string]any{ + "model": model, + "resourceType": resourceType, + "unit": unit, + "quantity": 1, + "amount": 0, + "currency": "resource", + "simulated": true, + }} +} + +func taskEventsForCreate(taskID string, runMode string, status string, result map[string]any) []TaskEvent { + events := []TaskEvent{{ + TaskID: taskID, + Seq: 1, + EventType: "task.accepted", + Status: "queued", + Phase: "queued", + Progress: 0, + Message: "task accepted", + Payload: map[string]any{"taskId": taskID}, + Simulated: runMode == "simulation", + }} + if runMode != "simulation" { + return events + } + return append(events, + TaskEvent{ + TaskID: taskID, + Seq: 2, + EventType: "task.progress", + Status: "running", + Phase: "simulation", + Progress: 0.5, + Message: "simulation client running", + Payload: map[string]any{"taskId": taskID}, + Simulated: true, + }, + TaskEvent{ + TaskID: taskID, + Seq: 3, + EventType: "task.completed", + Status: status, + Phase: "completed", + Progress: 1, + Message: "simulation completed", + Payload: map[string]any{"taskId": taskID, "result": result}, + Simulated: true, + }, + ) +} + func decodeObject(bytes []byte) map[string]any { if len(bytes) == 0 { return nil diff --git a/apps/api/migrations/0001_init.sql b/apps/api/migrations/0001_init.sql index 8d96768..eeb8c7c 100644 --- a/apps/api/migrations/0001_init.sql +++ b/apps/api/migrations/0001_init.sql @@ -209,12 +209,9 @@ CREATE INDEX IF NOT EXISTS idx_user_group_membership_principal CREATE INDEX IF NOT EXISTS idx_user_group_membership_effective ON gateway_user_group_memberships(effective_from, effective_to); -CREATE TABLE IF NOT EXISTS gateway_tenant_invitations ( +CREATE TABLE IF NOT EXISTS gateway_invitations ( id uuid PRIMARY KEY DEFAULT gen_random_uuid(), - tenant_id uuid NOT NULL REFERENCES gateway_tenants(id) ON DELETE CASCADE, invite_code text NOT NULL UNIQUE, - role text NOT NULL DEFAULT 'user', - user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL, max_uses integer, used_count integer NOT NULL DEFAULT 0, expires_at timestamptz, @@ -225,11 +222,11 @@ CREATE TABLE IF NOT EXISTS gateway_tenant_invitations ( updated_at timestamptz NOT NULL DEFAULT now() ); -CREATE INDEX IF NOT EXISTS idx_gateway_invitations_tenant - ON gateway_tenant_invitations(tenant_id, status); +CREATE INDEX IF NOT EXISTS idx_gateway_invitations_status + ON gateway_invitations(status, created_at DESC); CREATE INDEX IF NOT EXISTS idx_gateway_invitations_expiry - ON gateway_tenant_invitations(expires_at); + ON gateway_invitations(expires_at); CREATE TABLE IF NOT EXISTS gateway_api_keys ( id uuid PRIMARY KEY DEFAULT gen_random_uuid(), diff --git a/apps/api/migrations/0002_invitation_relationship_only.sql b/apps/api/migrations/0002_invitation_relationship_only.sql new file mode 100644 index 0000000..b5adbc9 --- /dev/null +++ b/apps/api/migrations/0002_invitation_relationship_only.sql @@ -0,0 +1,634 @@ +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +CREATE TABLE IF NOT EXISTS model_catalog_providers ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + provider_key text NOT NULL UNIQUE, + display_name text NOT NULL, + provider_type text NOT NULL DEFAULT 'openai_compatible', + capability_schema jsonb NOT NULL DEFAULT '{}'::jsonb, + default_rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + status text NOT NULL DEFAULT 'active', + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS base_model_catalog ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + provider_id uuid REFERENCES model_catalog_providers(id) ON DELETE SET NULL, + provider_key text NOT NULL, + canonical_model_key text NOT NULL UNIQUE, + provider_model_name text NOT NULL, + model_type text NOT NULL, + display_name text NOT NULL, + capabilities jsonb NOT NULL DEFAULT '{}'::jsonb, + base_billing_config jsonb NOT NULL DEFAULT '{}'::jsonb, + default_rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + pricing_version integer NOT NULL DEFAULT 1, + status text NOT NULL DEFAULT 'active', + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS integration_platforms ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + provider text NOT NULL, + platform_key text NOT NULL UNIQUE DEFAULT ('platform_' || replace(gen_random_uuid()::text, '-', '')), + name text NOT NULL, + base_url text, + auth_type text NOT NULL DEFAULT 'bearer', + credentials jsonb NOT NULL DEFAULT '{}'::jsonb, + config jsonb NOT NULL DEFAULT '{}'::jsonb, + visibility_scope text NOT NULL DEFAULT 'global', + tenant_id text, + tenant_key text, + default_pricing_mode text NOT NULL DEFAULT 'inherit_discount', + default_discount_factor numeric NOT NULL DEFAULT 1, + retry_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + priority integer NOT NULL DEFAULT 100, + dynamic_priority integer, + status text NOT NULL DEFAULT 'enabled', + disabled_reason text, + cooldown_until timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + deleted_at timestamptz +); + +ALTER TABLE IF EXISTS integration_platforms + ADD COLUMN IF NOT EXISTS platform_key text, + ADD COLUMN IF NOT EXISTS visibility_scope text NOT NULL DEFAULT 'global', + ADD COLUMN IF NOT EXISTS tenant_id text, + ADD COLUMN IF NOT EXISTS tenant_key text, + ADD COLUMN IF NOT EXISTS default_pricing_mode text NOT NULL DEFAULT 'inherit_discount', + ADD COLUMN IF NOT EXISTS default_discount_factor numeric NOT NULL DEFAULT 1, + ADD COLUMN IF NOT EXISTS retry_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS dynamic_priority integer, + ADD COLUMN IF NOT EXISTS disabled_reason text, + ADD COLUMN IF NOT EXISTS cooldown_until timestamptz, + ADD COLUMN IF NOT EXISTS deleted_at timestamptz; + +UPDATE integration_platforms +SET platform_key = 'platform_' || replace(id::text, '-', '') +WHERE platform_key IS NULL OR platform_key = ''; + +ALTER TABLE IF EXISTS integration_platforms + ALTER COLUMN platform_key SET DEFAULT ('platform_' || replace(gen_random_uuid()::text, '-', '')), + ALTER COLUMN platform_key SET NOT NULL; + +CREATE TABLE IF NOT EXISTS model_pricing_rules ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + scope_type text NOT NULL, + scope_id uuid, + resource_type text NOT NULL, + unit text NOT NULL, + base_price numeric NOT NULL, + currency text NOT NULL DEFAULT 'resource', + base_weight jsonb NOT NULL DEFAULT '{}'::jsonb, + dynamic_weight jsonb NOT NULL DEFAULT '{}'::jsonb, + effective_from timestamptz, + effective_to timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS gateway_user_groups ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + group_key text NOT NULL UNIQUE, + name text NOT NULL, + description text, + source text NOT NULL DEFAULT 'gateway', + priority integer NOT NULL DEFAULT 100, + recharge_discount_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + billing_discount_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + quota_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + status text NOT NULL DEFAULT 'active', + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS gateway_tenants ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + tenant_key text NOT NULL UNIQUE, + source text NOT NULL DEFAULT 'gateway', + external_tenant_id text, + name text NOT NULL, + description text, + default_user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL, + plan_key text, + billing_profile jsonb NOT NULL DEFAULT '{}'::jsonb, + rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + auth_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + status text NOT NULL DEFAULT 'active', + synced_at timestamptz, + source_updated_at timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + deleted_at timestamptz, + UNIQUE(source, external_tenant_id) +); + +CREATE TABLE IF NOT EXISTS gateway_users ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + user_key text NOT NULL UNIQUE, + source text NOT NULL DEFAULT 'gateway', + external_user_id text, + username text NOT NULL, + display_name text, + email text, + phone text, + avatar_url text, + password_hash text, + gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL, + tenant_id text, + tenant_key text, + default_user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL, + roles jsonb NOT NULL DEFAULT '[]'::jsonb, + auth_profile jsonb NOT NULL DEFAULT '{}'::jsonb, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + status text NOT NULL DEFAULT 'active', + last_login_at timestamptz, + synced_at timestamptz, + source_updated_at timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + deleted_at timestamptz, + UNIQUE(source, external_user_id) +); + +CREATE TABLE IF NOT EXISTS gateway_user_group_memberships ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + group_id uuid NOT NULL REFERENCES gateway_user_groups(id) ON DELETE CASCADE, + principal_type text NOT NULL, + principal_id text NOT NULL, + source text NOT NULL DEFAULT 'gateway', + priority integer NOT NULL DEFAULT 100, + effective_from timestamptz, + effective_to timestamptz, + status text NOT NULL DEFAULT 'active', + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + UNIQUE(group_id, principal_type, principal_id) +); + +DO $$ +BEGIN + IF to_regclass('public.gateway_tenant_invitations') IS NOT NULL + AND to_regclass('public.gateway_invitations') IS NULL THEN + ALTER TABLE gateway_tenant_invitations RENAME TO gateway_invitations; + END IF; +END $$; + +DROP INDEX IF EXISTS idx_gateway_invitations_tenant; + +CREATE TABLE IF NOT EXISTS gateway_invitations ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + invite_code text NOT NULL UNIQUE, + max_uses integer, + used_count integer NOT NULL DEFAULT 0, + expires_at timestamptz, + status text NOT NULL DEFAULT 'active', + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_by uuid REFERENCES gateway_users(id) ON DELETE SET NULL, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +ALTER TABLE IF EXISTS gateway_invitations + DROP COLUMN IF EXISTS tenant_id, + DROP COLUMN IF EXISTS role, + DROP COLUMN IF EXISTS user_group_id; + +CREATE TABLE IF NOT EXISTS gateway_api_keys ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL, + gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE CASCADE, + tenant_id text, + tenant_key text, + user_id text, + key_prefix text NOT NULL, + key_hash text NOT NULL UNIQUE, + name text NOT NULL, + scopes jsonb NOT NULL DEFAULT '[]'::jsonb, + user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL, + rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + quota_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + status text NOT NULL DEFAULT 'active', + expires_at timestamptz, + last_used_at timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + deleted_at timestamptz +); + +CREATE TABLE IF NOT EXISTS gateway_wallet_accounts ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL, + gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE CASCADE, + tenant_id text, + tenant_key text, + user_id text, + currency text NOT NULL DEFAULT 'resource', + balance numeric NOT NULL DEFAULT 0, + frozen_balance numeric NOT NULL DEFAULT 0, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + status text NOT NULL DEFAULT 'active', + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + UNIQUE(gateway_user_id, currency) +); + +CREATE TABLE IF NOT EXISTS gateway_wallet_transactions ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + wallet_account_id uuid REFERENCES gateway_wallet_accounts(id) ON DELETE SET NULL, + gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL, + gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE SET NULL, + transaction_type text NOT NULL, + amount numeric NOT NULL, + balance_after numeric NOT NULL, + reference_type text, + reference_id text, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS gateway_recharge_orders ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL, + gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE SET NULL, + tenant_id text, + tenant_key text, + user_id text, + order_no text NOT NULL UNIQUE, + amount numeric NOT NULL, + bonus_amount numeric NOT NULL DEFAULT 0, + currency text NOT NULL DEFAULT 'resource', + payment_provider text, + payment_payload jsonb NOT NULL DEFAULT '{}'::jsonb, + status text NOT NULL DEFAULT 'pending', + paid_at timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS platform_models ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + platform_id uuid REFERENCES integration_platforms(id) ON DELETE CASCADE, + base_model_id uuid REFERENCES base_model_catalog(id) ON DELETE SET NULL, + model_name text NOT NULL, + model_alias text, + model_type text NOT NULL, + display_name text NOT NULL DEFAULT '', + capability_override jsonb NOT NULL DEFAULT '{}'::jsonb, + capabilities jsonb NOT NULL DEFAULT '{}'::jsonb, + pricing_mode text NOT NULL DEFAULT 'inherit_discount', + discount_factor numeric, + billing_config_override jsonb NOT NULL DEFAULT '{}'::jsonb, + billing_config jsonb NOT NULL DEFAULT '{}'::jsonb, + permission_config jsonb NOT NULL DEFAULT '{}'::jsonb, + retry_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + enabled boolean NOT NULL DEFAULT true, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + UNIQUE(platform_id, model_name, model_type) +); + +ALTER TABLE IF EXISTS platform_models + ADD COLUMN IF NOT EXISTS base_model_id uuid REFERENCES base_model_catalog(id) ON DELETE SET NULL, + ADD COLUMN IF NOT EXISTS model_alias text, + ADD COLUMN IF NOT EXISTS capability_override jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS pricing_mode text NOT NULL DEFAULT 'inherit_discount', + ADD COLUMN IF NOT EXISTS discount_factor numeric, + ADD COLUMN IF NOT EXISTS billing_config_override jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS permission_config jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS retry_policy jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS rate_limit_policy jsonb NOT NULL DEFAULT '{}'::jsonb; + +CREATE TABLE IF NOT EXISTS gateway_tasks ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + external_task_id text, + kind text NOT NULL, + run_mode text NOT NULL DEFAULT 'production', + user_id text NOT NULL, + gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE SET NULL, + user_source text NOT NULL DEFAULT 'gateway', + gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL, + tenant_id text, + tenant_key text, + api_key_id text, + user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL, + user_group_key text, + user_group_policy_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb, + model text NOT NULL, + model_type text, + request jsonb NOT NULL DEFAULT '{}'::jsonb, + normalized_request jsonb NOT NULL DEFAULT '{}'::jsonb, + status text NOT NULL DEFAULT 'queued', + queue_key text NOT NULL DEFAULT 'default', + priority integer NOT NULL DEFAULT 100, + idempotency_key text, + remote_task_id text, + remote_task_payload jsonb, + simulation_profile jsonb, + simulation_seed text, + locked_by text, + locked_at timestamptz, + heartbeat_at timestamptz, + next_run_at timestamptz NOT NULL DEFAULT now(), + attempt_count integer NOT NULL DEFAULT 0, + max_attempts integer NOT NULL DEFAULT 1, + result jsonb, + billings jsonb, + error text, + error_code text, + error_message text, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + finished_at timestamptz +); + +ALTER TABLE IF EXISTS gateway_tasks + ADD COLUMN IF NOT EXISTS external_task_id text, + ADD COLUMN IF NOT EXISTS run_mode text NOT NULL DEFAULT 'production', + ADD COLUMN IF NOT EXISTS gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE SET NULL, + ADD COLUMN IF NOT EXISTS user_source text NOT NULL DEFAULT 'gateway', + ADD COLUMN IF NOT EXISTS gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL, + ADD COLUMN IF NOT EXISTS tenant_key text, + ADD COLUMN IF NOT EXISTS api_key_id text, + ADD COLUMN IF NOT EXISTS user_group_id uuid REFERENCES gateway_user_groups(id) ON DELETE SET NULL, + ADD COLUMN IF NOT EXISTS user_group_key text, + ADD COLUMN IF NOT EXISTS user_group_policy_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS model_type text, + ADD COLUMN IF NOT EXISTS normalized_request jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS queue_key text NOT NULL DEFAULT 'default', + ADD COLUMN IF NOT EXISTS priority integer NOT NULL DEFAULT 100, + ADD COLUMN IF NOT EXISTS idempotency_key text, + ADD COLUMN IF NOT EXISTS remote_task_id text, + ADD COLUMN IF NOT EXISTS remote_task_payload jsonb, + ADD COLUMN IF NOT EXISTS simulation_profile jsonb, + ADD COLUMN IF NOT EXISTS simulation_seed text, + ADD COLUMN IF NOT EXISTS locked_by text, + ADD COLUMN IF NOT EXISTS locked_at timestamptz, + ADD COLUMN IF NOT EXISTS heartbeat_at timestamptz, + ADD COLUMN IF NOT EXISTS next_run_at timestamptz NOT NULL DEFAULT now(), + ADD COLUMN IF NOT EXISTS attempt_count integer NOT NULL DEFAULT 0, + ADD COLUMN IF NOT EXISTS max_attempts integer NOT NULL DEFAULT 1, + ADD COLUMN IF NOT EXISTS error_code text, + ADD COLUMN IF NOT EXISTS error_message text, + ADD COLUMN IF NOT EXISTS finished_at timestamptz; + +CREATE TABLE IF NOT EXISTS gateway_task_attempts ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE, + attempt_no integer NOT NULL, + platform_id uuid REFERENCES integration_platforms(id) ON DELETE SET NULL, + platform_model_id uuid REFERENCES platform_models(id) ON DELETE SET NULL, + client_id text, + queue_key text NOT NULL, + status text NOT NULL, + retryable boolean NOT NULL DEFAULT false, + simulated boolean NOT NULL DEFAULT false, + remote_task_id text, + request_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb, + response_snapshot jsonb, + error_code text, + error_message text, + started_at timestamptz NOT NULL DEFAULT now(), + finished_at timestamptz, + UNIQUE(task_id, attempt_no) +); + +CREATE TABLE IF NOT EXISTS gateway_task_events ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE, + seq bigint NOT NULL, + event_type text NOT NULL, + status text, + phase text, + progress numeric, + message text, + payload jsonb NOT NULL DEFAULT '{}'::jsonb, + simulated boolean NOT NULL DEFAULT false, + created_at timestamptz NOT NULL DEFAULT now(), + UNIQUE(task_id, seq) +); + +CREATE TABLE IF NOT EXISTS gateway_task_callback_outbox ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE, + event_id uuid REFERENCES gateway_task_events(id) ON DELETE SET NULL, + seq bigint NOT NULL, + callback_url text NOT NULL, + payload jsonb NOT NULL, + status text NOT NULL DEFAULT 'pending', + attempts integer NOT NULL DEFAULT 0, + next_attempt_at timestamptz NOT NULL DEFAULT now(), + last_error text, + delivered_at timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + UNIQUE(task_id, seq, callback_url) +); + +CREATE TABLE IF NOT EXISTS runtime_client_states ( + client_id text PRIMARY KEY, + platform_id uuid REFERENCES integration_platforms(id) ON DELETE SET NULL, + provider text NOT NULL, + method_name text NOT NULL, + queue_key text NOT NULL, + running_count integer NOT NULL DEFAULT 0, + waiting_count integer NOT NULL DEFAULT 0, + limiter_ratio numeric NOT NULL DEFAULT 0, + cooldown_until timestamptz, + last_assigned_at timestamptz, + last_error text, + updated_at timestamptz NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS gateway_upload_assets ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + task_id uuid REFERENCES gateway_tasks(id) ON DELETE SET NULL, + source text NOT NULL, + server_main_file_id text, + url text NOT NULL, + object_key text, + content_type text, + size bigint, + checksum text, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS gateway_retry_policies ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + scope_type text NOT NULL, + scope_key text NOT NULL, + enabled boolean NOT NULL DEFAULT true, + policy jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + UNIQUE(scope_type, scope_key) +); + +CREATE TABLE IF NOT EXISTS gateway_rate_limit_policies ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + scope_type text NOT NULL, + scope_key text NOT NULL, + policy jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + UNIQUE(scope_type, scope_key) +); + +CREATE TABLE IF NOT EXISTS gateway_rate_limit_counters ( + scope_type text NOT NULL, + scope_key text NOT NULL, + metric text NOT NULL, + window_start timestamptz NOT NULL, + limit_value numeric NOT NULL, + used_value numeric NOT NULL DEFAULT 0, + reserved_value numeric NOT NULL DEFAULT 0, + reset_at timestamptz NOT NULL, + updated_at timestamptz NOT NULL DEFAULT now(), + PRIMARY KEY(scope_type, scope_key, metric, window_start) +); + +CREATE TABLE IF NOT EXISTS gateway_concurrency_leases ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE, + attempt_id uuid REFERENCES gateway_task_attempts(id) ON DELETE SET NULL, + scope_type text NOT NULL, + scope_key text NOT NULL, + lease_value numeric NOT NULL DEFAULT 1, + acquired_at timestamptz NOT NULL DEFAULT now(), + expires_at timestamptz NOT NULL, + released_at timestamptz +); + +ALTER TABLE IF EXISTS settlement_outbox + ADD COLUMN IF NOT EXISTS event_type text NOT NULL DEFAULT 'task.settlement.requested', + ADD COLUMN IF NOT EXISTS payload jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS status text NOT NULL DEFAULT 'pending', + ADD COLUMN IF NOT EXISTS attempts integer NOT NULL DEFAULT 0, + ADD COLUMN IF NOT EXISTS next_attempt_at timestamptz NOT NULL DEFAULT now(), + ADD COLUMN IF NOT EXISTS updated_at timestamptz NOT NULL DEFAULT now(); + +CREATE TABLE IF NOT EXISTS settlement_outbox ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE, + event_type text NOT NULL DEFAULT 'task.settlement.requested', + payload jsonb NOT NULL, + status text NOT NULL DEFAULT 'pending', + attempts integer NOT NULL DEFAULT 0, + next_attempt_at timestamptz NOT NULL DEFAULT now(), + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + UNIQUE(task_id, event_type) +); + +CREATE UNIQUE INDEX IF NOT EXISTS uniq_integration_platforms_platform_key + ON integration_platforms(platform_key); + +CREATE INDEX IF NOT EXISTS idx_model_catalog_provider_status + ON model_catalog_providers(status); +CREATE INDEX IF NOT EXISTS idx_base_model_catalog_provider + ON base_model_catalog(provider_key, model_type, status); +CREATE INDEX IF NOT EXISTS idx_base_model_catalog_capabilities + ON base_model_catalog USING gin(capabilities); +CREATE INDEX IF NOT EXISTS idx_integration_platforms_provider_status + ON integration_platforms(provider, status); +CREATE INDEX IF NOT EXISTS idx_integration_platforms_status_priority + ON integration_platforms(status, priority, dynamic_priority); +CREATE INDEX IF NOT EXISTS idx_integration_platforms_cooldown + ON integration_platforms(cooldown_until); +CREATE INDEX IF NOT EXISTS idx_integration_platforms_tenant_scope + ON integration_platforms(visibility_scope, tenant_id, tenant_key, status); +CREATE INDEX IF NOT EXISTS idx_model_pricing_scope + ON model_pricing_rules(scope_type, scope_id, resource_type); +CREATE INDEX IF NOT EXISTS idx_model_pricing_effective + ON model_pricing_rules(effective_from, effective_to); +CREATE INDEX IF NOT EXISTS idx_gateway_user_groups_status_priority + ON gateway_user_groups(status, priority); +CREATE INDEX IF NOT EXISTS idx_gateway_tenants_source_external + ON gateway_tenants(source, external_tenant_id) + WHERE external_tenant_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_gateway_tenants_status + ON gateway_tenants(status, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_gateway_users_source_external + ON gateway_users(source, external_user_id) + WHERE external_user_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_gateway_users_status + ON gateway_users(status, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_gateway_users_tenant + ON gateway_users(tenant_id, tenant_key, status); +CREATE INDEX IF NOT EXISTS idx_user_group_membership_principal + ON gateway_user_group_memberships(principal_type, principal_id, status); +CREATE INDEX IF NOT EXISTS idx_user_group_membership_effective + ON gateway_user_group_memberships(effective_from, effective_to); +CREATE INDEX IF NOT EXISTS idx_gateway_invitations_status + ON gateway_invitations(status, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_gateway_invitations_expiry + ON gateway_invitations(expires_at); +CREATE INDEX IF NOT EXISTS idx_gateway_api_keys_prefix + ON gateway_api_keys(key_prefix, status); +CREATE INDEX IF NOT EXISTS idx_gateway_api_keys_user + ON gateway_api_keys(gateway_user_id, status); +CREATE INDEX IF NOT EXISTS idx_gateway_wallet_accounts_tenant + ON gateway_wallet_accounts(gateway_tenant_id, status); +CREATE INDEX IF NOT EXISTS idx_gateway_wallet_transactions_user + ON gateway_wallet_transactions(gateway_user_id, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_gateway_recharge_orders_user + ON gateway_recharge_orders(gateway_user_id, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_platform_models_base + ON platform_models(base_model_id); +CREATE INDEX IF NOT EXISTS idx_platform_models_lookup + ON platform_models(model_type, model_name, enabled); +CREATE INDEX IF NOT EXISTS idx_platform_models_alias + ON platform_models(model_alias); +CREATE INDEX IF NOT EXISTS idx_platform_models_capabilities + ON platform_models USING gin(capabilities); +CREATE UNIQUE INDEX IF NOT EXISTS uniq_platform_models_model + ON platform_models(platform_id, model_name, model_type); +CREATE INDEX IF NOT EXISTS idx_gateway_tasks_queue + ON gateway_tasks(status, next_run_at, priority, created_at); +CREATE INDEX IF NOT EXISTS idx_gateway_tasks_lease + ON gateway_tasks(status, heartbeat_at); +CREATE INDEX IF NOT EXISTS idx_gateway_tasks_user_created + ON gateway_tasks(user_id, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_gateway_tasks_external + ON gateway_tasks(external_task_id); +CREATE UNIQUE INDEX IF NOT EXISTS uniq_gateway_tasks_idempotency + ON gateway_tasks(user_id, idempotency_key) + WHERE idempotency_key IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_gateway_attempts_task + ON gateway_task_attempts(task_id); +CREATE INDEX IF NOT EXISTS idx_gateway_attempts_client + ON gateway_task_attempts(client_id, started_at DESC); +CREATE INDEX IF NOT EXISTS idx_gateway_events_task_created + ON gateway_task_events(task_id, created_at); +CREATE INDEX IF NOT EXISTS idx_task_callback_outbox_pending + ON gateway_task_callback_outbox(status, next_attempt_at); +CREATE INDEX IF NOT EXISTS idx_task_callback_outbox_task + ON gateway_task_callback_outbox(task_id, seq); +CREATE INDEX IF NOT EXISTS idx_runtime_client_queue + ON runtime_client_states(queue_key, cooldown_until); +CREATE INDEX IF NOT EXISTS idx_runtime_client_platform + ON runtime_client_states(platform_id); +CREATE INDEX IF NOT EXISTS idx_gateway_upload_task + ON gateway_upload_assets(task_id); +CREATE INDEX IF NOT EXISTS idx_gateway_upload_file + ON gateway_upload_assets(server_main_file_id); +CREATE INDEX IF NOT EXISTS idx_concurrency_leases_active + ON gateway_concurrency_leases(scope_type, scope_key, released_at, expires_at); +CREATE INDEX IF NOT EXISTS idx_concurrency_leases_task + ON gateway_concurrency_leases(task_id); +CREATE UNIQUE INDEX IF NOT EXISTS uniq_settlement_outbox_task_event + ON settlement_outbox(task_id, event_type); diff --git a/apps/api/project.json b/apps/api/project.json index daa0fb7..21ed72e 100644 --- a/apps/api/project.json +++ b/apps/api/project.json @@ -9,7 +9,7 @@ "executor": "nx:run-commands", "options": { "cwd": "apps/api", - "command": "go run ./cmd/gateway" + "command": "node ../../scripts/go-watch.mjs -- go run ./cmd/gateway" } }, "migrate": { diff --git a/apps/web/src/App.tsx b/apps/web/src/App.tsx index 404041f..c89daf7 100644 --- a/apps/web/src/App.tsx +++ b/apps/web/src/App.tsx @@ -2,7 +2,9 @@ import { useEffect, useMemo, useState, type FormEvent } from 'react'; import type { BaseModelCatalogItem, CatalogProvider, + GatewayApiKey, GatewayTenant, + GatewayTask, GatewayUser, IntegrationPlatform, PlatformModel, @@ -11,7 +13,12 @@ import type { UserGroup, } from '@easyai-ai-gateway/contracts'; import { + createApiKey, + createChatTask, + createPlatform, + getTask, getHealth, + listApiKeys, listBaseModels, listCatalogProviders, listModels, @@ -96,8 +103,6 @@ export function App() { email: '', password: '', displayName: '', - tenantKey: '', - tenantName: '', invitationCode: '', }); const [health, setHealth] = useState(null); @@ -110,6 +115,22 @@ export function App() { const [tenants, setTenants] = useState([]); const [users, setUsers] = useState([]); const [userGroups, setUserGroups] = useState([]); + const [apiKeys, setApiKeys] = useState([]); + const [apiKeyForm, setApiKeyForm] = useState({ name: 'Local smoke key' }); + const [apiKeySecret, setApiKeySecret] = useState(''); + const [platformForm, setPlatformForm] = useState({ + provider: 'openai', + platformKey: 'openai-simulation', + name: 'OpenAI Simulation', + baseUrl: 'https://api.openai.com/v1', + }); + const [taskForm, setTaskForm] = useState({ + model: 'gpt-4o-mini', + prompt: '用一句话确认 AI Gateway simulation 链路正常。', + }); + const [taskResult, setTaskResult] = useState(null); + const [coreState, setCoreState] = useState('idle'); + const [coreMessage, setCoreMessage] = useState(''); const [state, setState] = useState('idle'); const [error, setError] = useState(''); @@ -151,6 +172,7 @@ export function App() { tenantResponse, userResponse, userGroupResponse, + apiKeyResponse, ] = await Promise.all([ listPlatforms(nextToken), listModels(nextToken), @@ -161,6 +183,7 @@ export function App() { listTenants(nextToken), listUsers(nextToken), listUserGroups(nextToken), + listApiKeys(nextToken), ]); setPlatforms(platformResponse.items); setModels(modelResponse.items); @@ -171,6 +194,7 @@ export function App() { setTenants(tenantResponse.items); setUsers(userResponse.items); setUserGroups(userGroupResponse.items); + setApiKeys(apiKeyResponse.items); setState('ready'); } catch (err) { setState('error'); @@ -229,6 +253,68 @@ export function App() { setTenants([]); setUsers([]); setUserGroups([]); + setApiKeys([]); + setApiKeySecret(''); + setTaskResult(null); + setCoreMessage(''); + } + + async function submitAPIKey(event: FormEvent) { + event.preventDefault(); + setCoreState('loading'); + setCoreMessage(''); + try { + const response = await createApiKey(token, { name: apiKeyForm.name, scopes: ['chat', 'image', 'video'] }); + setApiKeySecret(response.secret); + setApiKeys((current) => [response.apiKey, ...current.filter((item) => item.id !== response.apiKey.id)]); + setCoreState('ready'); + setCoreMessage('API Key 已创建,secret 仅展示一次。'); + } catch (err) { + setCoreState('error'); + setCoreMessage(err instanceof Error ? err.message : '创建 API Key 失败'); + } + } + + async function submitPlatform(event: FormEvent) { + event.preventDefault(); + setCoreState('loading'); + setCoreMessage(''); + try { + const platform = await createPlatform(token, { + ...platformForm, + authType: 'bearer', + credentials: { mode: 'simulation' }, + config: { testMode: true }, + }); + setPlatforms((current) => [platform, ...current.filter((item) => item.id !== platform.id)]); + setCoreState('ready'); + setCoreMessage('平台已创建,当前阶段平台凭证仅全局管理员可配置。'); + } catch (err) { + setCoreState('error'); + setCoreMessage(err instanceof Error ? err.message : '创建平台失败'); + } + } + + async function submitTask(event: FormEvent) { + event.preventDefault(); + const credential = apiKeySecret || token; + setCoreState('loading'); + setCoreMessage(''); + try { + const response = await createChatTask(credential, { + model: taskForm.model, + runMode: 'simulation', + simulation: true, + messages: [{ role: 'user', content: taskForm.prompt }], + }); + const detail = await getTask(credential, response.task.id); + setTaskResult(detail); + setCoreState('ready'); + setCoreMessage(apiKeySecret ? '任务已通过本地 API Key 完成 simulation。' : '任务已通过当前 Access Token 完成 simulation。'); + } catch (err) { + setCoreState('error'); + setCoreMessage(err instanceof Error ? err.message : '测试任务失败'); + } } return ( @@ -278,6 +364,23 @@ export function App() { + + void; onSubmitExternalToken: (event: FormEvent) => void; @@ -414,22 +513,6 @@ function AuthPanel(props: { placeholder="至少 8 位" /> - -