From c992f1de607632a371f2da7e2f2a35045acb5f1a Mon Sep 17 00:00:00 2001 From: wangbo Date: Mon, 11 May 2026 22:59:26 +0800 Subject: [PATCH] feat: add wallet settlement audit flow --- .../httpapi/billing_admin_handlers.go | 181 ++++++ .../httpapi/core_flow_integration_test.go | 25 + apps/api/internal/httpapi/handlers.go | 2 + apps/api/internal/httpapi/server.go | 2 + apps/api/internal/runner/pricing.go | 12 +- apps/api/internal/runner/service.go | 13 + apps/api/internal/runner/wallet.go | 38 ++ apps/api/internal/store/audit_logs.go | 187 ++++++ apps/api/internal/store/postgres.go | 93 ++- apps/api/internal/store/tx.go | 19 + apps/api/internal/store/wallet.go | 562 ++++++++++++++++++ apps/api/migrations/0025_audit_logs.sql | 32 + apps/web/src/App.tsx | 41 +- apps/web/src/api.ts | 19 + apps/web/src/app-state.ts | 2 + apps/web/src/pages/AdminPage.tsx | 9 +- apps/web/src/pages/admin/AuditLogsPanel.tsx | 93 +++ .../pages/admin/IdentityManagementPanels.tsx | 114 +++- apps/web/src/routing.ts | 1 + apps/web/src/styles/pages.css | 11 +- apps/web/src/types.ts | 1 + packages/contracts/src/index.ts | 39 +- 22 files changed, 1452 insertions(+), 44 deletions(-) create mode 100644 apps/api/internal/httpapi/billing_admin_handlers.go create mode 100644 apps/api/internal/runner/wallet.go create mode 100644 apps/api/internal/store/audit_logs.go create mode 100644 apps/api/internal/store/tx.go create mode 100644 apps/api/internal/store/wallet.go create mode 100644 apps/api/migrations/0025_audit_logs.sql create mode 100644 apps/web/src/pages/admin/AuditLogsPanel.tsx diff --git a/apps/api/internal/httpapi/billing_admin_handlers.go b/apps/api/internal/httpapi/billing_admin_handlers.go new file mode 100644 index 0000000..90f0f60 --- /dev/null +++ b/apps/api/internal/httpapi/billing_admin_handlers.go @@ -0,0 +1,181 @@ +package httpapi + +import ( + "encoding/json" + "errors" + "net" + "net/http" + "strings" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/auth" + "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" +) + +type walletBalanceRequest struct { + Currency string `json:"currency"` + Balance float64 `json:"balance"` + Reason string `json:"reason"` + IdempotencyKey string `json:"idempotencyKey"` + Metadata map[string]any `json:"metadata"` +} + +func (s *Server) setUserWalletBalance(w http.ResponseWriter, r *http.Request) { + actor, _ := auth.UserFromContext(r.Context()) + var input walletBalanceRequest + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + writeError(w, http.StatusBadRequest, "invalid json body") + return + } + if input.Balance < 0 { + writeError(w, http.StatusBadRequest, "wallet balance cannot be negative") + return + } + gatewayUserID := strings.TrimSpace(r.PathValue("userID")) + reason := strings.TrimSpace(input.Reason) + if reason == "" { + writeError(w, http.StatusBadRequest, "reason is required") + return + } + + var result store.WalletAdjustmentResult + var auditLog store.AuditLog + err := s.store.InTx(r.Context(), func(tx store.Tx) error { + next, err := s.store.SetUserWalletBalanceTx(r.Context(), tx, store.WalletBalanceAdjustmentInput{ + GatewayUserID: gatewayUserID, + Currency: input.Currency, + Balance: input.Balance, + Reason: reason, + IdempotencyKey: input.IdempotencyKey, + Metadata: input.Metadata, + }) + if err != nil { + return err + } + result = next + record, err := s.store.RecordAuditLogTx(r.Context(), tx, walletAdjustmentAuditInput(r, actor, reason, result)) + if err != nil { + return err + } + auditLog = record + return nil + }) + if err != nil { + switch { + case store.IsNotFound(err): + writeError(w, http.StatusNotFound, "user not found") + case errors.Is(err, store.ErrWalletBalanceUnchanged): + writeError(w, http.StatusBadRequest, "wallet balance is unchanged") + default: + s.logger.Error("set user wallet balance failed", "error", err) + writeError(w, http.StatusInternalServerError, "set user wallet balance failed") + } + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "account": result.Account, + "before": result.Before, + "transaction": result.Transaction, + "auditLog": auditLog, + }) +} + +func (s *Server) listAuditLogs(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + limit, err := positiveQueryInt(query.Get("limit"), 100) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid limit") + return + } + items, err := s.store.ListAuditLogs(r.Context(), store.AuditLogFilter{ + Category: query.Get("category"), + Action: query.Get("action"), + TargetType: query.Get("targetType"), + TargetID: query.Get("targetId"), + Limit: limit, + }) + if err != nil { + s.logger.Error("list audit logs failed", "error", err) + writeError(w, http.StatusInternalServerError, "list audit logs failed") + return + } + writeJSON(w, http.StatusOK, map[string]any{"items": items}) +} + +func walletAdjustmentAuditInput(r *http.Request, actor *auth.User, reason string, result store.WalletAdjustmentResult) store.AuditLogInput { + actorGatewayUserID := "" + actorUserID := "" + actorUsername := "" + actorSource := "" + actorRoles := []string(nil) + if actor != nil { + actorGatewayUserID = uuidText(firstNonEmptyText(actor.GatewayUserID, actor.ID)) + actorUserID = actor.ID + actorUsername = actor.Username + actorSource = actor.Source + actorRoles = actor.Roles + } + return store.AuditLogInput{ + Category: "billing", + Action: "wallet.balance.set", + ActorGatewayUserID: actorGatewayUserID, + ActorUserID: actorUserID, + ActorUsername: actorUsername, + ActorSource: actorSource, + ActorRoles: actorRoles, + TargetType: "gateway_user", + TargetID: result.Account.GatewayUserID, + TargetGatewayUserID: result.Account.GatewayUserID, + TargetGatewayTenantID: result.Account.GatewayTenantID, + RequestIP: requestIP(r), + UserAgent: r.UserAgent(), + BeforeState: map[string]any{ + "walletAccount": result.Before, + }, + AfterState: map[string]any{ + "walletAccount": result.Account, + "transaction": result.Transaction, + }, + Metadata: map[string]any{ + "reason": reason, + "currency": result.Account.Currency, + "transactionId": result.Transaction.ID, + "amount": result.Transaction.Amount, + "direction": result.Transaction.Direction, + }, + } +} + +func requestIP(r *http.Request) string { + if forwarded := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); forwarded != "" { + parts := strings.Split(forwarded, ",") + return strings.TrimSpace(parts[0]) + } + if realIP := strings.TrimSpace(r.Header.Get("X-Real-IP")); realIP != "" { + return realIP + } + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err == nil { + return host + } + return strings.TrimSpace(r.RemoteAddr) +} + +func firstNonEmptyText(values ...string) string { + for _, value := range values { + if text := strings.TrimSpace(value); text != "" { + return text + } + } + return "" +} + +func uuidText(value string) string { + value = strings.TrimSpace(value) + if len(value) != 36 { + return "" + } + if value[8] != '-' || value[13] != '-' || value[18] != '-' || value[23] != '-' { + return "" + } + return value +} diff --git a/apps/api/internal/httpapi/core_flow_integration_test.go b/apps/api/internal/httpapi/core_flow_integration_test.go index bf09f5b..bd914bb 100644 --- a/apps/api/internal/httpapi/core_flow_integration_test.go +++ b/apps/api/internal/httpapi/core_flow_integration_test.go @@ -128,6 +128,31 @@ func TestCoreLocalFlow(t *testing.T) { if err := testPool.QueryRow(ctx, `SELECT id::text FROM gateway_users WHERE username = $1`, username).Scan(&smokeGatewayUserID); err != nil { t.Fatalf("read smoke gateway user id: %v", err) } + var walletAdjustment struct { + Account struct { + Balance float64 `json:"balance"` + } `json:"account"` + AuditLog struct { + Action string `json:"action"` + } `json:"auditLog"` + } + doJSON(t, server.URL, http.MethodPatch, "/api/admin/users/"+smokeGatewayUserID+"/wallet", loginResponse.AccessToken, map[string]any{ + "currency": "resource", + "balance": 1000, + "reason": "seed integration wallet", + }, http.StatusOK, &walletAdjustment) + if !floatNear(walletAdjustment.Account.Balance, 1000) || walletAdjustment.AuditLog.Action != "wallet.balance.set" { + t.Fatalf("wallet adjustment did not update balance and audit log: %+v", walletAdjustment) + } + var auditResponse struct { + Items []struct { + Action string `json:"action"` + } `json:"items"` + } + doJSON(t, server.URL, http.MethodGet, "/api/admin/audit-logs?category=billing&limit=5", loginResponse.AccessToken, nil, http.StatusOK, &auditResponse) + if len(auditResponse.Items) == 0 || auditResponse.Items[0].Action != "wallet.balance.set" { + t.Fatalf("wallet adjustment audit log not found: %+v", auditResponse) + } doJSON(t, server.URL, http.MethodGet, "/api/admin/models", apiKeyResponse.Secret, nil, http.StatusForbidden, nil) var chatOnlyAPIKeyResponse struct { diff --git a/apps/api/internal/httpapi/handlers.go b/apps/api/internal/httpapi/handlers.go index f382a3a..cf8382c 100644 --- a/apps/api/internal/httpapi/handlers.go +++ b/apps/api/internal/httpapi/handlers.go @@ -612,6 +612,8 @@ func statusFromRunError(err error) int { return http.StatusNotFound case errors.Is(err, store.ErrRateLimited): return http.StatusTooManyRequests + case errors.Is(err, store.ErrInsufficientWalletBalance): + return http.StatusPaymentRequired default: return http.StatusBadGateway } diff --git a/apps/api/internal/httpapi/server.go b/apps/api/internal/httpapi/server.go index aab384d..d7cb8bf 100644 --- a/apps/api/internal/httpapi/server.go +++ b/apps/api/internal/httpapi/server.go @@ -56,7 +56,9 @@ func NewServer(cfg config.Config, db *store.Store, logger *slog.Logger) http.Han mux.Handle("GET /api/admin/users", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listUsers))) mux.Handle("POST /api/admin/users", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.createGatewayUser))) mux.Handle("PATCH /api/admin/users/{userID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updateGatewayUser))) + mux.Handle("PATCH /api/admin/users/{userID}/wallet", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.setUserWalletBalance))) mux.Handle("DELETE /api/admin/users/{userID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.deleteGatewayUser))) + mux.Handle("GET /api/admin/audit-logs", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listAuditLogs))) mux.Handle("GET /api/admin/user-groups", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listUserGroups))) mux.Handle("POST /api/admin/user-groups", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.createUserGroup))) mux.Handle("PATCH /api/admin/user-groups/{groupID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updateUserGroup))) diff --git a/apps/api/internal/runner/pricing.go b/apps/api/internal/runner/pricing.go index 82976e3..3ced86e 100644 --- a/apps/api/internal/runner/pricing.go +++ b/apps/api/internal/runner/pricing.go @@ -21,6 +21,13 @@ func (s *Service) Estimate(ctx context.Context, kind string, model string, body return EstimateResult{}, err } candidate := candidates[0] + return EstimateResult{ + Items: s.estimatedBillings(ctx, user, kind, body, candidate), + Resolver: "effective-pricing-v1", + }, nil +} + +func (s *Service) estimatedBillings(ctx context.Context, user *auth.User, kind string, body map[string]any, candidate store.RuntimeModelCandidate) []any { usage := clients.Usage{InputTokens: estimateRequestTokens(body), OutputTokens: int(floatFromAny(body["max_tokens"]))} if usage.OutputTokens == 0 { usage.OutputTokens = 64 @@ -31,10 +38,7 @@ func (s *Service) Estimate(ctx context.Context, kind string, model string, body "completion_tokens": usage.OutputTokens, "total_tokens": usage.TotalTokens, }}} - return EstimateResult{ - Items: s.billings(ctx, user, kind, body, candidate, response, true), - Resolver: "effective-pricing-v1", - }, nil + return s.billings(ctx, user, kind, body, candidate, response, true) } func (s *Service) billings(ctx context.Context, user *auth.User, kind string, body map[string]any, candidate store.RuntimeModelCandidate, response clients.Response, simulated bool) []any { diff --git a/apps/api/internal/runner/service.go b/apps/api/internal/runner/service.go index 37a4eea..24c303a 100644 --- a/apps/api/internal/runner/service.go +++ b/apps/api/internal/runner/service.go @@ -67,6 +67,19 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut } return Result{Task: failed, Output: failed.Result}, err } + if len(candidates) > 0 { + estimatedBillings := s.estimatedBillings(ctx, user, task.Kind, body, candidates[0]) + if err := s.ensureWalletBalance(ctx, user, estimatedBillings); err != nil { + if errors.Is(err, store.ErrInsufficientWalletBalance) { + failed, finishErr := s.failTask(ctx, task.ID, "insufficient_balance", err.Error(), task.RunMode == "simulation", err) + if finishErr != nil { + return Result{}, finishErr + } + return Result{Task: failed, Output: failed.Result}, err + } + return Result{}, err + } + } if err := s.store.MarkTaskRunning(ctx, task.ID, modelType, body); err != nil { return Result{}, err } diff --git a/apps/api/internal/runner/wallet.go b/apps/api/internal/runner/wallet.go new file mode 100644 index 0000000..f8b5970 --- /dev/null +++ b/apps/api/internal/runner/wallet.go @@ -0,0 +1,38 @@ +package runner + +import ( + "context" + "fmt" + "strings" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/auth" + "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" +) + +func (s *Service) ensureWalletBalance(ctx context.Context, user *auth.User, billings []any) error { + amounts := map[string]float64{} + for _, raw := range billings { + line, _ := raw.(map[string]any) + if line == nil { + continue + } + currency := strings.TrimSpace(stringFromAny(line["currency"])) + if currency == "" { + currency = "resource" + } + amounts[currency] = roundPrice(amounts[currency] + floatFromAny(line["amount"])) + } + for currency, amount := range amounts { + if amount <= 0 { + continue + } + availability, err := s.store.WalletAvailability(ctx, user, currency, amount) + if err != nil { + return err + } + if !availability.Enough { + return fmt.Errorf("%w: required %.6f %s, available %.6f", store.ErrInsufficientWalletBalance, amount, currency, availability.AvailableAmount) + } + } + return nil +} diff --git a/apps/api/internal/store/audit_logs.go b/apps/api/internal/store/audit_logs.go new file mode 100644 index 0000000..1770268 --- /dev/null +++ b/apps/api/internal/store/audit_logs.go @@ -0,0 +1,187 @@ +package store + +import ( + "context" + "encoding/json" + "strings" + "time" +) + +type AuditLog struct { + ID string `json:"id"` + Category string `json:"category"` + Action string `json:"action"` + ActorGatewayUserID string `json:"actorGatewayUserId,omitempty"` + ActorUserID string `json:"actorUserId,omitempty"` + ActorUsername string `json:"actorUsername,omitempty"` + ActorSource string `json:"actorSource,omitempty"` + ActorRoles []string `json:"actorRoles,omitempty"` + TargetType string `json:"targetType"` + TargetID string `json:"targetId"` + TargetGatewayUserID string `json:"targetGatewayUserId,omitempty"` + TargetGatewayTenantID string `json:"targetGatewayTenantId,omitempty"` + RequestIP string `json:"requestIp,omitempty"` + UserAgent string `json:"userAgent,omitempty"` + BeforeState map[string]any `json:"beforeState,omitempty"` + AfterState map[string]any `json:"afterState,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` + CreatedAt time.Time `json:"createdAt"` +} + +type AuditLogInput struct { + Category string `json:"category"` + Action string `json:"action"` + ActorGatewayUserID string `json:"actorGatewayUserId"` + ActorUserID string `json:"actorUserId"` + ActorUsername string `json:"actorUsername"` + ActorSource string `json:"actorSource"` + ActorRoles []string `json:"actorRoles"` + TargetType string `json:"targetType"` + TargetID string `json:"targetId"` + TargetGatewayUserID string `json:"targetGatewayUserId"` + TargetGatewayTenantID string `json:"targetGatewayTenantId"` + RequestIP string `json:"requestIp"` + UserAgent string `json:"userAgent"` + BeforeState map[string]any `json:"beforeState"` + AfterState map[string]any `json:"afterState"` + Metadata map[string]any `json:"metadata"` +} + +type AuditLogFilter struct { + Category string + Action string + TargetType string + TargetID string + Limit int +} + +func (s *Store) RecordAuditLog(ctx context.Context, input AuditLogInput) (AuditLog, error) { + return s.RecordAuditLogTx(ctx, s.pool, input) +} + +func (s *Store) RecordAuditLogTx(ctx context.Context, tx Tx, input AuditLogInput) (AuditLog, error) { + input = normalizeAuditLogInput(input) + actorRolesJSON, _ := json.Marshal(input.ActorRoles) + beforeJSON, _ := json.Marshal(emptyObjectIfNil(input.BeforeState)) + afterJSON, _ := json.Marshal(emptyObjectIfNil(input.AfterState)) + metadataJSON, _ := json.Marshal(emptyObjectIfNil(input.Metadata)) + return scanAuditLog(tx.QueryRow(ctx, ` +INSERT INTO gateway_audit_logs ( + category, action, actor_gateway_user_id, actor_user_id, actor_username, actor_source, + actor_roles, target_type, target_id, target_gateway_user_id, target_gateway_tenant_id, + request_ip, user_agent, before_state, after_state, metadata +) +VALUES ( + $1, $2, NULLIF($3, '')::uuid, NULLIF($4, ''), NULLIF($5, ''), NULLIF($6, ''), + $7::jsonb, $8, $9, NULLIF($10, '')::uuid, NULLIF($11, '')::uuid, + NULLIF($12, ''), NULLIF($13, ''), $14::jsonb, $15::jsonb, $16::jsonb +) +RETURNING id::text, category, action, COALESCE(actor_gateway_user_id::text, ''), COALESCE(actor_user_id, ''), + COALESCE(actor_username, ''), COALESCE(actor_source, ''), actor_roles, + target_type, target_id, COALESCE(target_gateway_user_id::text, ''), + COALESCE(target_gateway_tenant_id::text, ''), COALESCE(request_ip, ''), + COALESCE(user_agent, ''), before_state, after_state, metadata, created_at`, + input.Category, + input.Action, + input.ActorGatewayUserID, + input.ActorUserID, + input.ActorUsername, + input.ActorSource, + string(actorRolesJSON), + input.TargetType, + input.TargetID, + input.TargetGatewayUserID, + input.TargetGatewayTenantID, + input.RequestIP, + input.UserAgent, + string(beforeJSON), + string(afterJSON), + string(metadataJSON), + )) +} + +func (s *Store) ListAuditLogs(ctx context.Context, filter AuditLogFilter) ([]AuditLog, error) { + limit := filter.Limit + if limit <= 0 { + limit = 100 + } + if limit > 500 { + limit = 500 + } + rows, err := s.pool.Query(ctx, ` +SELECT id::text, category, action, COALESCE(actor_gateway_user_id::text, ''), COALESCE(actor_user_id, ''), + COALESCE(actor_username, ''), COALESCE(actor_source, ''), actor_roles, + target_type, target_id, COALESCE(target_gateway_user_id::text, ''), + COALESCE(target_gateway_tenant_id::text, ''), COALESCE(request_ip, ''), + COALESCE(user_agent, ''), before_state, after_state, metadata, created_at +FROM gateway_audit_logs +WHERE (NULLIF($1, '') IS NULL OR category = $1) + AND (NULLIF($2, '') IS NULL OR action = $2) + AND (NULLIF($3, '') IS NULL OR target_type = $3) + AND (NULLIF($4, '') IS NULL OR target_id = $4) +ORDER BY created_at DESC +LIMIT $5`, + strings.TrimSpace(filter.Category), + strings.TrimSpace(filter.Action), + strings.TrimSpace(filter.TargetType), + strings.TrimSpace(filter.TargetID), + limit, + ) + if err != nil { + return nil, err + } + defer rows.Close() + items := make([]AuditLog, 0) + for rows.Next() { + item, err := scanAuditLog(rows) + if err != nil { + return nil, err + } + items = append(items, item) + } + return items, rows.Err() +} + +func normalizeAuditLogInput(input AuditLogInput) AuditLogInput { + input.Category = firstNonEmpty(strings.TrimSpace(input.Category), "system") + input.Action = strings.TrimSpace(input.Action) + input.TargetType = strings.TrimSpace(input.TargetType) + input.TargetID = strings.TrimSpace(input.TargetID) + input.ActorSource = firstNonEmpty(strings.TrimSpace(input.ActorSource), "gateway") + return input +} + +func scanAuditLog(row scanner) (AuditLog, error) { + var item AuditLog + var actorRoles []byte + var beforeState []byte + var afterState []byte + var metadata []byte + if err := row.Scan( + &item.ID, + &item.Category, + &item.Action, + &item.ActorGatewayUserID, + &item.ActorUserID, + &item.ActorUsername, + &item.ActorSource, + &actorRoles, + &item.TargetType, + &item.TargetID, + &item.TargetGatewayUserID, + &item.TargetGatewayTenantID, + &item.RequestIP, + &item.UserAgent, + &beforeState, + &afterState, + &metadata, + &item.CreatedAt, + ); err != nil { + return AuditLog{}, err + } + item.ActorRoles = decodeStringArray(actorRoles) + item.BeforeState = decodeObject(beforeState) + item.AfterState = decodeObject(afterState) + item.Metadata = decodeObject(metadata) + return item, nil +} diff --git a/apps/api/internal/store/postgres.go b/apps/api/internal/store/postgres.go index a3de2e3..34154ef 100644 --- a/apps/api/internal/store/postgres.go +++ b/apps/api/internal/store/postgres.go @@ -22,13 +22,15 @@ type Store struct { } var ( - ErrInvalidCredentials = errors.New("invalid account or password") - ErrInvalidInvitation = errors.New("invalid or expired invitation code") - ErrAccessRuleResourceDenied = errors.New("access rule resource is not available") - ErrLocalUserRequired = errors.New("local gateway user is required") - ErrProtectedDefault = errors.New("protected default resource cannot be deleted") - ErrUserAlreadyExists = errors.New("user already exists") - ErrWeakPassword = errors.New("password must be at least 8 characters") + ErrInvalidCredentials = errors.New("invalid account or password") + ErrInvalidInvitation = errors.New("invalid or expired invitation code") + ErrAccessRuleResourceDenied = errors.New("access rule resource is not available") + ErrInsufficientWalletBalance = errors.New("insufficient wallet balance") + ErrLocalUserRequired = errors.New("local gateway user is required") + ErrWalletBalanceUnchanged = errors.New("wallet balance unchanged") + ErrProtectedDefault = errors.New("protected default resource cannot be deleted") + ErrUserAlreadyExists = errors.New("user already exists") + ErrWeakPassword = errors.New("password must be at least 8 characters") ) func Connect(ctx context.Context, databaseURL string) (*Store, error) { @@ -298,28 +300,29 @@ type LocalLoginInput struct { } type GatewayUser struct { - ID string `json:"id"` - UserKey string `json:"userKey"` - Source string `json:"source"` - ExternalUserID string `json:"externalUserId,omitempty"` - Username string `json:"username"` - DisplayName string `json:"displayName,omitempty"` - Email string `json:"email,omitempty"` - Phone string `json:"phone,omitempty"` - AvatarURL string `json:"avatarUrl,omitempty"` - GatewayTenantID string `json:"gatewayTenantId,omitempty"` - TenantID string `json:"tenantId,omitempty"` - TenantKey string `json:"tenantKey,omitempty"` - DefaultUserGroupID string `json:"defaultUserGroupId,omitempty"` - Roles []string `json:"roles,omitempty"` - AuthProfile map[string]any `json:"authProfile,omitempty"` - Metadata map[string]any `json:"metadata,omitempty"` - Status string `json:"status"` - LastLoginAt string `json:"lastLoginAt,omitempty"` - SyncedAt string `json:"syncedAt,omitempty"` - SourceUpdatedAt string `json:"sourceUpdatedAt,omitempty"` - CreatedAt time.Time `json:"createdAt"` - UpdatedAt time.Time `json:"updatedAt"` + ID string `json:"id"` + UserKey string `json:"userKey"` + Source string `json:"source"` + ExternalUserID string `json:"externalUserId,omitempty"` + Username string `json:"username"` + DisplayName string `json:"displayName,omitempty"` + Email string `json:"email,omitempty"` + Phone string `json:"phone,omitempty"` + AvatarURL string `json:"avatarUrl,omitempty"` + GatewayTenantID string `json:"gatewayTenantId,omitempty"` + TenantID string `json:"tenantId,omitempty"` + TenantKey string `json:"tenantKey,omitempty"` + DefaultUserGroupID string `json:"defaultUserGroupId,omitempty"` + Roles []string `json:"roles,omitempty"` + AuthProfile map[string]any `json:"authProfile,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` + WalletAccounts []GatewayWalletAccount `json:"walletAccounts,omitempty"` + Status string `json:"status"` + LastLoginAt string `json:"lastLoginAt,omitempty"` + SyncedAt string `json:"syncedAt,omitempty"` + SourceUpdatedAt string `json:"sourceUpdatedAt,omitempty"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` } type UserGroup struct { @@ -927,10 +930,31 @@ SELECT id::text, user_key, source, COALESCE(external_user_id, ''), username, COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(default_user_group_id::text, ''), roles, auth_profile, metadata, status, COALESCE(last_login_at::text, ''), COALESCE(synced_at::text, ''), COALESCE(source_updated_at::text, ''), - created_at, updated_at -FROM gateway_users -WHERE deleted_at IS NULL -ORDER BY created_at DESC`) + created_at, updated_at, COALESCE(wallets.wallet_accounts, '[]'::jsonb) +FROM gateway_users u +LEFT JOIN LATERAL ( + SELECT jsonb_agg(jsonb_build_object( + 'id', a.id::text, + 'gatewayTenantId', COALESCE(a.gateway_tenant_id::text, ''), + 'gatewayUserId', a.gateway_user_id::text, + 'tenantId', COALESCE(a.tenant_id, ''), + 'tenantKey', COALESCE(a.tenant_key, ''), + 'userId', COALESCE(a.user_id, ''), + 'currency', a.currency, + 'balance', a.balance::float8, + 'frozenBalance', a.frozen_balance::float8, + 'totalRecharged', a.total_recharged::float8, + 'totalSpent', a.total_spent::float8, + 'status', a.status, + 'metadata', a.metadata, + 'createdAt', a.created_at, + 'updatedAt', a.updated_at + ) ORDER BY a.currency ASC) AS wallet_accounts + FROM gateway_wallet_accounts a + WHERE a.gateway_user_id = u.id +) wallets ON true +WHERE u.deleted_at IS NULL +ORDER BY u.created_at DESC`) if err != nil { return nil, err } @@ -942,6 +966,7 @@ ORDER BY created_at DESC`) var roles []byte var authProfile []byte var metadata []byte + var walletAccounts []byte if err := rows.Scan( &item.ID, &item.UserKey, @@ -965,12 +990,14 @@ ORDER BY created_at DESC`) &item.SourceUpdatedAt, &item.CreatedAt, &item.UpdatedAt, + &walletAccounts, ); err != nil { return nil, err } item.Roles = decodeStringArray(roles) item.AuthProfile = decodeObject(authProfile) item.Metadata = decodeObject(metadata) + item.WalletAccounts = decodeWalletAccounts(walletAccounts) items = append(items, item) } return items, rows.Err() diff --git a/apps/api/internal/store/tx.go b/apps/api/internal/store/tx.go new file mode 100644 index 0000000..d4795c8 --- /dev/null +++ b/apps/api/internal/store/tx.go @@ -0,0 +1,19 @@ +package store + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type Tx interface { + Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) + QueryRow(ctx context.Context, sql string, args ...any) pgx.Row +} + +func (s *Store) InTx(ctx context.Context, fn func(Tx) error) error { + return pgx.BeginFunc(ctx, s.pool, func(tx pgx.Tx) error { + return fn(tx) + }) +} diff --git a/apps/api/internal/store/wallet.go b/apps/api/internal/store/wallet.go new file mode 100644 index 0000000..f5de0fe --- /dev/null +++ b/apps/api/internal/store/wallet.go @@ -0,0 +1,562 @@ +package store + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/auth" + "github.com/jackc/pgx/v5" +) + +type GatewayWalletAccount struct { + ID string `json:"id"` + GatewayTenantID string `json:"gatewayTenantId,omitempty"` + GatewayUserID string `json:"gatewayUserId"` + TenantID string `json:"tenantId,omitempty"` + TenantKey string `json:"tenantKey,omitempty"` + UserID string `json:"userId,omitempty"` + Currency string `json:"currency"` + Balance float64 `json:"balance"` + FrozenBalance float64 `json:"frozenBalance"` + TotalRecharged float64 `json:"totalRecharged"` + TotalSpent float64 `json:"totalSpent"` + Status string `json:"status"` + Metadata map[string]any `json:"metadata,omitempty"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +type GatewayWalletTransaction struct { + ID string `json:"id"` + AccountID string `json:"accountId"` + Currency string `json:"currency,omitempty"` + GatewayTenantID string `json:"gatewayTenantId,omitempty"` + GatewayUserID string `json:"gatewayUserId,omitempty"` + Direction string `json:"direction"` + TransactionType string `json:"transactionType"` + Amount float64 `json:"amount"` + BalanceBefore float64 `json:"balanceBefore"` + BalanceAfter float64 `json:"balanceAfter"` + IdempotencyKey string `json:"idempotencyKey,omitempty"` + ReferenceType string `json:"referenceType,omitempty"` + ReferenceID string `json:"referenceId,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` + CreatedAt time.Time `json:"createdAt"` +} + +type WalletAvailability struct { + Account GatewayWalletAccount `json:"account"` + Currency string `json:"currency"` + RequiredAmount float64 `json:"requiredAmount"` + AvailableAmount float64 `json:"availableAmount"` + Enough bool `json:"enough"` +} + +type WalletSummary struct { + Accounts []GatewayWalletAccount `json:"accounts"` + PrimaryAccount GatewayWalletAccount `json:"primaryAccount"` +} + +type WalletTransactionListFilter struct { + Query string + Direction string + TransactionType string + CreatedFrom *time.Time + CreatedTo *time.Time + Page int + PageSize int +} + +type WalletTransactionListResult struct { + Items []GatewayWalletTransaction + Total int + Page int + PageSize int +} + +type WalletBalanceAdjustmentInput struct { + GatewayUserID string `json:"gatewayUserId"` + Currency string `json:"currency"` + Balance float64 `json:"balance"` + Reason string `json:"reason"` + IdempotencyKey string `json:"idempotencyKey"` + Metadata map[string]any `json:"metadata"` +} + +type WalletAdjustmentResult struct { + Account GatewayWalletAccount `json:"account"` + Before GatewayWalletAccount `json:"before"` + Transaction GatewayWalletTransaction `json:"transaction"` +} + +func (s *Store) WalletAvailability(ctx context.Context, user *auth.User, currency string, requiredAmount float64) (WalletAvailability, error) { + gatewayUserID := localGatewayUserID(user) + if gatewayUserID == "" { + return WalletAvailability{Currency: normalizeWalletCurrency(currency), RequiredAmount: requiredAmount, Enough: true}, nil + } + account, err := s.ensureWalletAccount(ctx, s.pool, gatewayUserID, currency) + if err != nil { + return WalletAvailability{}, err + } + available := roundMoney(account.Balance - account.FrozenBalance) + result := WalletAvailability{ + Account: account, + Currency: account.Currency, + RequiredAmount: roundMoney(requiredAmount), + AvailableAmount: available, + Enough: available+0.000001 >= requiredAmount, + } + if !result.Enough { + return result, fmt.Errorf("%w: required %.6f %s, available %.6f", ErrInsufficientWalletBalance, requiredAmount, account.Currency, available) + } + return result, nil +} + +func (s *Store) GetWalletSummary(ctx context.Context, user *auth.User, currency string) (WalletSummary, error) { + gatewayUserID := localGatewayUserID(user) + if gatewayUserID == "" { + account := GatewayWalletAccount{ + Currency: normalizeWalletCurrency(currency), + Status: "active", + } + return WalletSummary{Accounts: []GatewayWalletAccount{account}, PrimaryAccount: account}, nil + } + primary, err := s.ensureWalletAccount(ctx, s.pool, gatewayUserID, currency) + if err != nil { + return WalletSummary{}, err + } + rows, err := s.pool.Query(ctx, ` +SELECT id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text, + COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''), + currency, balance::float8, frozen_balance::float8, total_recharged::float8, + total_spent::float8, status, metadata, created_at, updated_at +FROM gateway_wallet_accounts +WHERE gateway_user_id = $1::uuid +ORDER BY CASE WHEN currency = $2 THEN 0 WHEN currency = 'resource' THEN 1 ELSE 2 END, currency ASC`, gatewayUserID, primary.Currency) + if err != nil { + return WalletSummary{}, err + } + defer rows.Close() + + accounts := make([]GatewayWalletAccount, 0) + for rows.Next() { + account, err := scanWalletAccount(rows) + if err != nil { + return WalletSummary{}, err + } + accounts = append(accounts, account) + } + if err := rows.Err(); err != nil { + return WalletSummary{}, err + } + if len(accounts) == 0 { + accounts = append(accounts, primary) + } + return WalletSummary{Accounts: accounts, PrimaryAccount: accounts[0]}, nil +} + +func (s *Store) ListWalletTransactions(ctx context.Context, user *auth.User, filter WalletTransactionListFilter) (WalletTransactionListResult, error) { + page := filter.Page + if page <= 0 { + page = 1 + } + pageSize := filter.PageSize + if pageSize <= 0 { + pageSize = 50 + } + if pageSize > 100 { + pageSize = 100 + } + gatewayUserID := localGatewayUserID(user) + if gatewayUserID == "" { + return WalletTransactionListResult{Items: []GatewayWalletTransaction{}, Page: page, PageSize: pageSize}, nil + } + queryPattern := "" + if query := strings.TrimSpace(filter.Query); query != "" { + queryPattern = "%" + query + "%" + } + args := []any{ + gatewayUserID, + queryPattern, + strings.TrimSpace(filter.Direction), + strings.TrimSpace(filter.TransactionType), + nullableTaskListTime(filter.CreatedFrom), + nullableTaskListTime(filter.CreatedTo), + } + whereSQL := ` +WHERE a.gateway_user_id = $1::uuid + AND ( + NULLIF($2, '') IS NULL + OR t.id::text ILIKE $2 + OR COALESCE(t.reference_id, '') ILIKE $2 + OR COALESCE(t.reference_type, '') ILIKE $2 + OR COALESCE(t.idempotency_key, '') ILIKE $2 + OR t.transaction_type ILIKE $2 + OR t.direction ILIKE $2 + OR COALESCE(task.id::text, '') ILIKE $2 + OR COALESCE(task.request_id, '') ILIKE $2 + OR COALESCE(task.kind, '') ILIKE $2 + OR COALESCE(task.model, '') ILIKE $2 + OR COALESCE(task.requested_model, '') ILIKE $2 + OR COALESCE(task.resolved_model, '') ILIKE $2 + OR COALESCE(task.model_type, '') ILIKE $2 + OR COALESCE(task.api_key_id, '') ILIKE $2 + OR COALESCE(task.api_key_name, '') ILIKE $2 + OR COALESCE(task.api_key_prefix, '') ILIKE $2 + OR COALESCE(task.status, '') ILIKE $2 + OR COALESCE(task.billing_summary->>'currency', '') ILIKE $2 + OR COALESCE(task.billing_summary->>'totalAmount', '') ILIKE $2 + OR COALESCE(attempt.client_id, '') ILIKE $2 + OR COALESCE(attempt.request_id, '') ILIKE $2 + OR COALESCE(platform.provider, '') ILIKE $2 + OR COALESCE(platform.platform_key, '') ILIKE $2 + OR COALESCE(platform.name, '') ILIKE $2 + OR COALESCE(platform_model.model_name, '') ILIKE $2 + OR COALESCE(platform_model.provider_model_name, '') ILIKE $2 + OR COALESCE(platform_model.model_alias, '') ILIKE $2 + OR COALESCE(platform_model.display_name, '') ILIKE $2 + OR COALESCE(task.metrics->>'provider', '') ILIKE $2 + OR COALESCE(task.metrics->>'platformName', '') ILIKE $2 + OR COALESCE(task.metrics->>'modelAlias', '') ILIKE $2 + OR COALESCE(task.metrics->>'providerModel', '') ILIKE $2 + ) + AND (NULLIF($3, '') IS NULL OR t.direction = $3) + AND (NULLIF($4, '') IS NULL OR t.transaction_type = $4) + AND ($5::timestamptz IS NULL OR t.created_at >= $5::timestamptz) + AND ($6::timestamptz IS NULL OR t.created_at <= $6::timestamptz)` + var total int + if err := s.pool.QueryRow(ctx, ` +SELECT count(*) + FROM gateway_wallet_transactions t + JOIN gateway_wallet_accounts a ON a.id = t.account_id + LEFT JOIN gateway_tasks task ON t.reference_type = 'gateway_task' AND t.reference_id = task.id::text + LEFT JOIN LATERAL ( + SELECT platform_id, platform_model_id, client_id, request_id + FROM gateway_task_attempts + WHERE task_id = task.id + ORDER BY attempt_no DESC, started_at DESC + LIMIT 1 +) attempt ON true +LEFT JOIN integration_platforms platform ON platform.id = attempt.platform_id +LEFT JOIN platform_models platform_model ON platform_model.id = attempt.platform_model_id +`+whereSQL, args...).Scan(&total); err != nil { + return WalletTransactionListResult{}, err + } + offset := (page - 1) * pageSize + queryArgs := append(args, pageSize, offset) + rows, err := s.pool.Query(ctx, ` +SELECT t.id::text, t.account_id::text, a.currency, COALESCE(t.gateway_tenant_id::text, ''), + COALESCE(t.gateway_user_id::text, ''), t.direction, t.transaction_type, + t.amount::float8, t.balance_before::float8, t.balance_after::float8, + COALESCE(t.idempotency_key, ''), COALESCE(t.reference_type, ''), + COALESCE(t.reference_id, ''), + t.metadata || jsonb_strip_nulls(jsonb_build_object( + 'taskId', task.id::text, + 'kind', task.kind, + 'model', task.model, + 'requestedModel', task.requested_model, + 'resolvedModel', task.resolved_model, + 'modelType', task.model_type, + 'taskStatus', task.status, + 'runMode', task.run_mode, + 'requestId', COALESCE(task.request_id, attempt.request_id), + 'apiKeyId', task.api_key_id, + 'apiKeyName', task.api_key_name, + 'apiKeyPrefix', task.api_key_prefix, + 'provider', COALESCE(platform.provider, task.metrics->>'provider'), + 'platformId', COALESCE(platform.id::text, task.metrics->>'platformId'), + 'platformName', COALESCE(platform.name, task.metrics->>'platformName'), + 'platformKey', platform.platform_key, + 'platformModelId', COALESCE(platform_model.id::text, task.metrics->>'platformModelId'), + 'platformModelName', platform_model.model_name, + 'platformModelAlias', platform_model.model_alias, + 'providerModel', COALESCE(platform_model.provider_model_name, task.metrics->>'providerModel'), + 'clientId', attempt.client_id, + 'usage', CASE WHEN task.id IS NULL THEN NULL ELSE COALESCE(task.usage, attempt.usage, '{}'::jsonb) END, + 'billings', CASE WHEN task.id IS NULL THEN NULL ELSE COALESCE(task.billings, '[]'::jsonb) END, + 'billingSummary', CASE WHEN task.id IS NULL THEN NULL ELSE COALESCE(task.billing_summary, '{}'::jsonb) END, + 'finalChargeAmount', CASE WHEN task.id IS NULL THEN NULL ELSE COALESCE(task.final_charge_amount, 0)::float8 END, + 'responseStartedAt', COALESCE(task.response_started_at::text, attempt.response_started_at::text), + 'responseFinishedAt', COALESCE(task.response_finished_at::text, attempt.response_finished_at::text), + 'responseDurationMs', COALESCE(task.response_duration_ms, attempt.response_duration_ms) + )), t.created_at + FROM gateway_wallet_transactions t + JOIN gateway_wallet_accounts a ON a.id = t.account_id + LEFT JOIN gateway_tasks task ON t.reference_type = 'gateway_task' AND t.reference_id = task.id::text + LEFT JOIN LATERAL ( + SELECT platform_id, platform_model_id, client_id, request_id, usage, response_started_at, + response_finished_at, response_duration_ms + FROM gateway_task_attempts + WHERE task_id = task.id + ORDER BY attempt_no DESC, started_at DESC + LIMIT 1 +) attempt ON true +LEFT JOIN integration_platforms platform ON platform.id = attempt.platform_id +LEFT JOIN platform_models platform_model ON platform_model.id = attempt.platform_model_id +`+whereSQL+` +ORDER BY t.created_at DESC, t.id DESC +LIMIT $7 OFFSET $8`, queryArgs...) + if err != nil { + return WalletTransactionListResult{}, err + } + defer rows.Close() + + items := make([]GatewayWalletTransaction, 0) + for rows.Next() { + item, err := scanWalletTransactionWithCurrency(rows) + if err != nil { + return WalletTransactionListResult{}, err + } + items = append(items, item) + } + if err := rows.Err(); err != nil { + return WalletTransactionListResult{}, err + } + return WalletTransactionListResult{Items: items, Total: total, Page: page, PageSize: pageSize}, nil +} + +func (s *Store) SetUserWalletBalance(ctx context.Context, input WalletBalanceAdjustmentInput) (WalletAdjustmentResult, error) { + var result WalletAdjustmentResult + err := s.InTx(ctx, func(tx Tx) error { + next, err := s.SetUserWalletBalanceTx(ctx, tx, input) + if err != nil { + return err + } + result = next + return nil + }) + return result, err +} + +func (s *Store) SetUserWalletBalanceTx(ctx context.Context, tx Tx, input WalletBalanceAdjustmentInput) (WalletAdjustmentResult, error) { + input.GatewayUserID = strings.TrimSpace(input.GatewayUserID) + if input.GatewayUserID == "" { + return WalletAdjustmentResult{}, ErrLocalUserRequired + } + if input.Balance < 0 { + return WalletAdjustmentResult{}, fmt.Errorf("wallet balance cannot be negative") + } + account, err := s.ensureWalletAccount(ctx, tx, input.GatewayUserID, input.Currency) + if err != nil { + return WalletAdjustmentResult{}, err + } + var locked GatewayWalletAccount + locked, err = scanWalletAccount(tx.QueryRow(ctx, ` +SELECT id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text, + COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''), + currency, balance::float8, frozen_balance::float8, total_recharged::float8, + total_spent::float8, status, metadata, created_at, updated_at +FROM gateway_wallet_accounts +WHERE id = $1::uuid +FOR UPDATE`, account.ID)) + if err != nil { + return WalletAdjustmentResult{}, err + } + before := locked + nextBalance := roundMoney(input.Balance) + delta := roundMoney(nextBalance - locked.Balance) + if delta == 0 { + return WalletAdjustmentResult{}, ErrWalletBalanceUnchanged + } + direction := "credit" + amount := delta + if delta < 0 { + direction = "debit" + amount = -delta + } + reason := strings.TrimSpace(input.Reason) + if reason == "" { + reason = "后台余额调整" + } + if _, err := tx.Exec(ctx, ` +UPDATE gateway_wallet_accounts +SET balance = $2, + total_recharged = total_recharged + CASE WHEN $3 = 'credit' THEN $4 ELSE 0 END, + updated_at = now() +WHERE id = $1::uuid`, + locked.ID, + nextBalance, + direction, + amount, + ); err != nil { + return WalletAdjustmentResult{}, err + } + metadata := mergeObjects(input.Metadata, map[string]any{ + "reason": reason, + "previousBalance": roundMoney(before.Balance), + "targetBalance": nextBalance, + }) + metadataJSON, _ := json.Marshal(emptyObjectIfNil(metadata)) + transaction, err := scanWalletTransaction(tx.QueryRow(ctx, ` +INSERT INTO gateway_wallet_transactions ( + account_id, gateway_tenant_id, gateway_user_id, direction, transaction_type, + amount, balance_before, balance_after, idempotency_key, reference_type, reference_id, metadata +) +VALUES ( + $1::uuid, NULLIF($2, '')::uuid, $3::uuid, $4, 'admin_adjust', + $5, $6, $7, NULLIF($8, ''), 'gateway_user', $9, $10::jsonb +) +RETURNING id::text, account_id::text, COALESCE(gateway_tenant_id::text, ''), COALESCE(gateway_user_id::text, ''), + direction, transaction_type, amount::float8, balance_before::float8, balance_after::float8, + COALESCE(idempotency_key, ''), COALESCE(reference_type, ''), COALESCE(reference_id, ''), + metadata, created_at`, + locked.ID, + locked.GatewayTenantID, + locked.GatewayUserID, + direction, + amount, + roundMoney(before.Balance), + nextBalance, + strings.TrimSpace(input.IdempotencyKey), + locked.GatewayUserID, + string(metadataJSON), + )) + if err != nil { + return WalletAdjustmentResult{}, err + } + locked.Balance = nextBalance + if direction == "credit" { + locked.TotalRecharged = roundMoney(locked.TotalRecharged + amount) + } + locked.UpdatedAt = time.Now() + return WalletAdjustmentResult{Account: locked, Before: before, Transaction: transaction}, nil +} + +func (s *Store) ensureWalletAccount(ctx context.Context, q Tx, gatewayUserID string, currency string) (GatewayWalletAccount, error) { + currency = normalizeWalletCurrency(currency) + if _, err := q.Exec(ctx, ` +INSERT INTO gateway_wallet_accounts ( + gateway_tenant_id, gateway_user_id, tenant_id, tenant_key, user_id, currency +) +SELECT gateway_tenant_id, id, NULLIF(tenant_id, ''), NULLIF(tenant_key, ''), + COALESCE(NULLIF(external_user_id, ''), id::text), $2 +FROM gateway_users +WHERE id = $1::uuid + AND deleted_at IS NULL +ON CONFLICT (gateway_user_id, currency) DO UPDATE +SET gateway_tenant_id = COALESCE(gateway_wallet_accounts.gateway_tenant_id, EXCLUDED.gateway_tenant_id), + tenant_id = COALESCE(NULLIF(gateway_wallet_accounts.tenant_id, ''), EXCLUDED.tenant_id), + tenant_key = COALESCE(NULLIF(gateway_wallet_accounts.tenant_key, ''), EXCLUDED.tenant_key), + user_id = COALESCE(NULLIF(gateway_wallet_accounts.user_id, ''), EXCLUDED.user_id), + updated_at = now() +WHERE gateway_wallet_accounts.gateway_tenant_id IS NULL + OR COALESCE(gateway_wallet_accounts.tenant_id, '') = '' + OR COALESCE(gateway_wallet_accounts.tenant_key, '') = '' + OR COALESCE(gateway_wallet_accounts.user_id, '') = ''`, gatewayUserID, currency); err != nil { + return GatewayWalletAccount{}, err + } + account, err := scanWalletAccount(q.QueryRow(ctx, ` +SELECT id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text, + COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''), + currency, balance::float8, frozen_balance::float8, total_recharged::float8, + total_spent::float8, status, metadata, created_at, updated_at +FROM gateway_wallet_accounts +WHERE gateway_user_id = $1::uuid + AND currency = $2`, gatewayUserID, currency)) + if err != nil { + if err == pgx.ErrNoRows { + return GatewayWalletAccount{}, pgx.ErrNoRows + } + return GatewayWalletAccount{}, err + } + return account, nil +} + +func normalizeWalletCurrency(currency string) string { + currency = strings.TrimSpace(currency) + if currency == "" { + return "resource" + } + return currency +} + +func scanWalletAccount(row scanner) (GatewayWalletAccount, error) { + var item GatewayWalletAccount + var metadata []byte + if err := row.Scan( + &item.ID, + &item.GatewayTenantID, + &item.GatewayUserID, + &item.TenantID, + &item.TenantKey, + &item.UserID, + &item.Currency, + &item.Balance, + &item.FrozenBalance, + &item.TotalRecharged, + &item.TotalSpent, + &item.Status, + &metadata, + &item.CreatedAt, + &item.UpdatedAt, + ); err != nil { + return GatewayWalletAccount{}, err + } + item.Metadata = decodeObject(metadata) + return item, nil +} + +func scanWalletTransaction(row scanner) (GatewayWalletTransaction, error) { + var item GatewayWalletTransaction + var metadata []byte + if err := row.Scan( + &item.ID, + &item.AccountID, + &item.GatewayTenantID, + &item.GatewayUserID, + &item.Direction, + &item.TransactionType, + &item.Amount, + &item.BalanceBefore, + &item.BalanceAfter, + &item.IdempotencyKey, + &item.ReferenceType, + &item.ReferenceID, + &metadata, + &item.CreatedAt, + ); err != nil { + return GatewayWalletTransaction{}, err + } + item.Metadata = decodeObject(metadata) + return item, nil +} + +func scanWalletTransactionWithCurrency(row scanner) (GatewayWalletTransaction, error) { + var item GatewayWalletTransaction + var metadata []byte + if err := row.Scan( + &item.ID, + &item.AccountID, + &item.Currency, + &item.GatewayTenantID, + &item.GatewayUserID, + &item.Direction, + &item.TransactionType, + &item.Amount, + &item.BalanceBefore, + &item.BalanceAfter, + &item.IdempotencyKey, + &item.ReferenceType, + &item.ReferenceID, + &metadata, + &item.CreatedAt, + ); err != nil { + return GatewayWalletTransaction{}, err + } + item.Metadata = decodeObject(metadata) + return item, nil +} + +func decodeWalletAccounts(data []byte) []GatewayWalletAccount { + if len(data) == 0 { + return nil + } + var items []GatewayWalletAccount + if err := json.Unmarshal(data, &items); err != nil { + return nil + } + return items +} diff --git a/apps/api/migrations/0025_audit_logs.sql b/apps/api/migrations/0025_audit_logs.sql new file mode 100644 index 0000000..3ac5dff --- /dev/null +++ b/apps/api/migrations/0025_audit_logs.sql @@ -0,0 +1,32 @@ +CREATE TABLE IF NOT EXISTS gateway_audit_logs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + category text NOT NULL DEFAULT 'system', + action text NOT NULL, + actor_gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE SET NULL, + actor_user_id text, + actor_username text, + actor_source text, + actor_roles jsonb NOT NULL DEFAULT '[]'::jsonb, + target_type text NOT NULL, + target_id text NOT NULL, + target_gateway_user_id uuid REFERENCES gateway_users(id) ON DELETE SET NULL, + target_gateway_tenant_id uuid REFERENCES gateway_tenants(id) ON DELETE SET NULL, + request_ip text, + user_agent text, + before_state jsonb NOT NULL DEFAULT '{}'::jsonb, + after_state jsonb NOT NULL DEFAULT '{}'::jsonb, + metadata jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_gateway_audit_logs_category_created + ON gateway_audit_logs(category, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_gateway_audit_logs_action_created + ON gateway_audit_logs(action, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_gateway_audit_logs_target + ON gateway_audit_logs(target_type, target_id, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_gateway_audit_logs_actor + ON gateway_audit_logs(actor_gateway_user_id, created_at DESC); diff --git a/apps/web/src/App.tsx b/apps/web/src/App.tsx index e99c33d..7fdb737 100644 --- a/apps/web/src/App.tsx +++ b/apps/web/src/App.tsx @@ -6,6 +6,7 @@ import type { GatewayAccessRule, GatewayAccessRuleUpsertRequest, GatewayApiKey, + GatewayAuditLog, GatewayTenantUpsertRequest, GatewayTask, GatewayUserUpsertRequest, @@ -20,6 +21,7 @@ import type { RuntimePolicySet, UserGroupUpsertRequest, UserGroup, + WalletBalanceAdjustmentRequest, } from '@easyai-ai-gateway/contracts'; import { batchAccessRules, @@ -38,6 +40,7 @@ import { deleteUserGroup, getHealth, getTask, + listAuditLogs, listAccessRules, listApiKeyAccessRules, listApiKeys, @@ -61,6 +64,7 @@ import { loginLocalAccount, registerLocalAccount, replacePlatformModels, + setUserWalletBalance, type HealthResponse, updateAccessRule, updateGatewayUser, @@ -130,6 +134,7 @@ type DataKey = | 'userGroups' | 'tasks' | 'accessRules' + | 'auditLogs' | 'apiKeys'; export function App() { @@ -160,6 +165,7 @@ export function App() { const [pricingRuleSets, setPricingRuleSets] = useState([]); const [runtimePolicySets, setRuntimePolicySets] = useState([]); const [accessRules, setAccessRules] = useState([]); + const [auditLogs, setAuditLogs] = useState([]); const [rateLimitWindows, setRateLimitWindows] = useState([]); const [tenants, setTenants] = useState([]); const [users, setUsers] = useState([]); @@ -238,6 +244,7 @@ export function App() { const data = useMemo(() => ({ accessRules, + auditLogs, apiKeys, baseModels, modelCatalog, @@ -253,7 +260,7 @@ export function App() { tenants, userGroups, users, - }), [accessRules, apiKeys, baseModels, modelCatalog, models, platforms, pricingRuleSets, pricingRules, providers, rateLimitWindows, runtimePolicySets, taskResult, tasks, tenants, userGroups, users]); + }), [accessRules, apiKeys, auditLogs, baseModels, modelCatalog, models, platforms, pricingRuleSets, pricingRules, providers, rateLimitWindows, runtimePolicySets, taskResult, tasks, tenants, userGroups, users]); async function refresh(nextToken = token) { await ensureRouteData(nextToken, true); @@ -371,6 +378,9 @@ export function App() { ? listApiKeyAccessRules(nextToken) : listAccessRules(nextToken))).items); return; + case 'auditLogs': + setAuditLogs((await listAuditLogs(nextToken)).items); + return; case 'apiKeys': setApiKeys((await listApiKeys(nextToken)).items); } @@ -527,6 +537,31 @@ export function App() { } } + async function saveUserWalletBalance(userId: string, input: WalletBalanceAdjustmentRequest) { + setCoreState('loading'); + setCoreMessage(''); + try { + const response = await setUserWalletBalance(token, userId, input); + setUsers((current) => current.map((user) => user.id === userId + ? { + ...user, + walletAccounts: [ + response.account, + ...(user.walletAccounts ?? []).filter((account) => account.id !== response.account.id), + ], + } + : user)); + setAuditLogs((current) => [response.auditLog, ...current.filter((item) => item.id !== response.auditLog.id)]); + invalidateDataKeys('auditLogs'); + setCoreState('ready'); + setCoreMessage('用户余额已更新,审计日志已记录。'); + } catch (err) { + setCoreState('error'); + setCoreMessage(err instanceof Error ? err.message : '更新用户余额失败'); + throw err; + } + } + async function removeUser(userId: string) { setCoreState('loading'); setCoreMessage(''); @@ -697,6 +732,7 @@ export function App() { setPricingRuleSets([]); setRuntimePolicySets([]); setAccessRules([]); + setAuditLogs([]); setRateLimitWindows([]); setTenants([]); setUsers([]); @@ -866,6 +902,7 @@ export function App() { onSaveAccessRule={saveAccessRule} onSaveTenant={saveTenant} onSaveUser={saveUser} + onSetUserWalletBalance={saveUserWalletBalance} onSaveUserGroup={saveUserGroup} onSectionChange={navigateAdminSection} /> @@ -1015,6 +1052,8 @@ function dataKeysForRoute( return ['users', 'tenants', 'userGroups']; case 'userGroups': return ['userGroups']; + case 'auditLogs': + return ['auditLogs']; case 'accessRules': return ['accessRules', 'userGroups', 'platforms', 'models']; default: diff --git a/apps/web/src/api.ts b/apps/web/src/api.ts index 64578d6..acaf269 100644 --- a/apps/web/src/api.ts +++ b/apps/web/src/api.ts @@ -9,6 +9,7 @@ import type { GatewayAccessRule, GatewayAccessRuleUpsertRequest, GatewayApiKey, + GatewayAuditLog, GatewayTenant, GatewayTenantUpsertRequest, GatewayTask, @@ -27,6 +28,8 @@ import type { RuntimePolicySetUpsertRequest, UserGroup, UserGroupUpsertRequest, + WalletAdjustmentResponse, + WalletBalanceAdjustmentRequest, } from '@easyai-ai-gateway/contracts'; import type { PlatformCreateInput, PlatformModelBindingInput, WorkspaceTaskQuery } from './types'; @@ -287,6 +290,18 @@ export async function updateGatewayUser(token: string, userId: string, input: Ga }); } +export async function setUserWalletBalance( + token: string, + userId: string, + input: WalletBalanceAdjustmentRequest, +): Promise { + return request(`/api/admin/users/${userId}/wallet`, { + body: input, + method: 'PATCH', + token, + }); +} + export async function deleteGatewayUser(token: string, userId: string): Promise { await request(`/api/admin/users/${userId}`, { method: 'DELETE', @@ -294,6 +309,10 @@ export async function deleteGatewayUser(token: string, userId: string): Promise< }); } +export async function listAuditLogs(token: string): Promise> { + return request>('/api/admin/audit-logs', { token }); +} + export async function listUserGroups(token: string): Promise> { return request>('/api/admin/user-groups', { token }); } diff --git a/apps/web/src/app-state.ts b/apps/web/src/app-state.ts index 3c0028e..a5811bb 100644 --- a/apps/web/src/app-state.ts +++ b/apps/web/src/app-state.ts @@ -3,6 +3,7 @@ import type { CatalogProvider, GatewayAccessRule, GatewayApiKey, + GatewayAuditLog, GatewayTask, GatewayTenant, GatewayUser, @@ -18,6 +19,7 @@ import type { export interface ConsoleData { accessRules: GatewayAccessRule[]; + auditLogs: GatewayAuditLog[]; apiKeys: GatewayApiKey[]; baseModels: BaseModelCatalogItem[]; modelCatalog: ModelCatalogResponse; diff --git a/apps/web/src/pages/AdminPage.tsx b/apps/web/src/pages/AdminPage.tsx index cdd4c12..3925950 100644 --- a/apps/web/src/pages/AdminPage.tsx +++ b/apps/web/src/pages/AdminPage.tsx @@ -1,5 +1,5 @@ import type { ReactNode } from 'react'; -import { Boxes, Building2, Gauge, KeyRound, Route, ServerCog, ShieldCheck, UsersRound, Workflow } from 'lucide-react'; +import { Boxes, Building2, Gauge, History, KeyRound, Route, ServerCog, ShieldCheck, UsersRound, Workflow } from 'lucide-react'; import type { BaseModelUpsertRequest, CatalogProviderUpsertRequest, @@ -10,6 +10,7 @@ import type { PricingRuleSetUpsertRequest, RuntimePolicySetUpsertRequest, UserGroupUpsertRequest, + WalletBalanceAdjustmentRequest, } from '@easyai-ai-gateway/contracts'; import type { ConsoleData, StatItem } from '../app-state'; import { EntityTable } from '../components/EntityTable'; @@ -17,6 +18,7 @@ import { StatGrid } from '../components/StatGrid'; import { Badge, Card, CardContent, CardHeader, CardTitle, Tabs } from '../components/ui'; import type { AdminSection, LoadState, PlatformWithModelsInput } from '../types'; import { AccessRulesPanel } from './admin/AccessRulesPanel'; +import { AuditLogsPanel } from './admin/AuditLogsPanel'; import { BaseModelCatalogPanel } from './admin/BaseModelCatalogPanel'; import { TenantsPanel, UserGroupsPanel, UsersPanel } from './admin/IdentityManagementPanels'; import { PlatformManagementPanel } from './admin/PlatformManagementPanel'; @@ -35,6 +37,7 @@ const tabs = [ { value: 'users', label: '用户', icon: }, { value: 'userGroups', label: '用户组', icon: }, { value: 'accessRules', label: '模型权限', icon: }, + { value: 'auditLogs', label: '审计日志', icon: }, ] satisfies Array<{ value: AdminSection; label: string; icon: ReactNode }>; export function AdminPage(props: { @@ -63,6 +66,7 @@ export function AdminPage(props: { onSaveAccessRule: (input: GatewayAccessRuleUpsertRequest, ruleId?: string) => Promise; onSaveTenant: (input: GatewayTenantUpsertRequest, tenantId?: string) => Promise; onSaveUser: (input: GatewayUserUpsertRequest, userId?: string) => Promise; + onSetUserWalletBalance: (userId: string, input: WalletBalanceAdjustmentRequest) => Promise; onSaveUserGroup: (input: UserGroupUpsertRequest, groupId?: string) => Promise; onSectionChange: (value: AdminSection) => void; }) { @@ -141,6 +145,7 @@ export function AdminPage(props: { {props.section === 'tenants' && } {props.section === 'users' && } {props.section === 'userGroups' && } + {props.section === 'auditLogs' && } @@ -156,6 +161,7 @@ function identityPanelProps(props: { onDeleteUserGroup: (groupId: string) => Promise; onSaveTenant: (input: GatewayTenantUpsertRequest, tenantId?: string) => Promise; onSaveUser: (input: GatewayUserUpsertRequest, userId?: string) => Promise; + onSetUserWalletBalance: (userId: string, input: WalletBalanceAdjustmentRequest) => Promise; onSaveUserGroup: (input: UserGroupUpsertRequest, groupId?: string) => Promise; }) { return { @@ -167,6 +173,7 @@ function identityPanelProps(props: { onDeleteUserGroup: props.onDeleteUserGroup, onSaveTenant: props.onSaveTenant, onSaveUser: props.onSaveUser, + onSetUserWalletBalance: props.onSetUserWalletBalance, onSaveUserGroup: props.onSaveUserGroup, }; } diff --git a/apps/web/src/pages/admin/AuditLogsPanel.tsx b/apps/web/src/pages/admin/AuditLogsPanel.tsx new file mode 100644 index 0000000..7bc6cb7 --- /dev/null +++ b/apps/web/src/pages/admin/AuditLogsPanel.tsx @@ -0,0 +1,93 @@ +import { History, ShieldCheck } from 'lucide-react'; +import type { GatewayAuditLog } from '@easyai-ai-gateway/contracts'; +import { Badge, Card, CardContent, CardHeader, CardTitle, Table, TableCell, TableHead, TableRow } from '../../components/ui'; + +export function AuditLogsPanel(props: { auditLogs: GatewayAuditLog[]; message?: string }) { + return ( +
+ + +
+
+
+ 审计日志 +

敏感管理动作独立记录,便于追踪操作者、对象和前后状态。

+
+ {props.auditLogs.length} +
+
+ {props.message &&

{props.message}

} +
+ {props.auditLogs.length ? ( + + + 动作 + 操作者 + 目标 + 摘要 + 时间 + + {props.auditLogs.map((item) => ( + + + + {actionLabel(item.action)} + {item.category} + + + {item.actorUsername || item.actorUserId || '系统'} + + + {targetLabel(item.targetType)} + {shortId(item.targetId)} + + + {auditSummary(item)} + {formatDateTime(item.createdAt)} + + ))} +
+ ) : ( + + + + 暂无审计日志 + 后台余额调整后会在这里留下独立记录。 + + + )} +
+ ); +} + +function actionLabel(action: string) { + if (action === 'wallet.balance.set') return '余额调整'; + return action; +} + +function targetLabel(targetType: string) { + if (targetType === 'gateway_user') return '用户'; + return targetType; +} + +function auditSummary(item: GatewayAuditLog) { + const metadata = item.metadata ?? {}; + const direction = typeof metadata.direction === 'string' ? metadata.direction : ''; + const amount = typeof metadata.amount === 'number' ? metadata.amount : undefined; + const currency = typeof metadata.currency === 'string' ? metadata.currency : 'resource'; + const reason = typeof metadata.reason === 'string' ? metadata.reason : ''; + const prefix = amount === undefined ? '' : `${direction === 'debit' ? '-' : '+'}${formatBalance(amount)} ${currency}`; + return [prefix, reason].filter(Boolean).join(' · ') || '已记录'; +} + +function formatDateTime(value: string) { + return value ? new Date(value).toLocaleString() : '-'; +} + +function formatBalance(value: number) { + return new Intl.NumberFormat('zh-CN', { maximumFractionDigits: 6 }).format(value); +} + +function shortId(value: string) { + return value.length > 12 ? `${value.slice(0, 8)}...${value.slice(-4)}` : value; +} diff --git a/apps/web/src/pages/admin/IdentityManagementPanels.tsx b/apps/web/src/pages/admin/IdentityManagementPanels.tsx index 11e8884..a686f8d 100644 --- a/apps/web/src/pages/admin/IdentityManagementPanels.tsx +++ b/apps/web/src/pages/admin/IdentityManagementPanels.tsx @@ -1,12 +1,14 @@ import { useMemo, useState, type FormEvent, type ReactNode } from 'react'; -import { Building2, KeyRound, Pencil, Plus, RotateCcw, Trash2, UserRound, UsersRound } from 'lucide-react'; +import { Building2, CircleDollarSign, KeyRound, Pencil, Plus, RotateCcw, Trash2, UserRound, UsersRound } from 'lucide-react'; import type { GatewayTenant, GatewayTenantUpsertRequest, GatewayUser, GatewayUserUpsertRequest, + GatewayWalletAccount, UserGroup, UserGroupUpsertRequest, + WalletBalanceAdjustmentRequest, } from '@easyai-ai-gateway/contracts'; import { Badge, @@ -64,6 +66,12 @@ type UserForm = { status: string; }; +type WalletForm = { + currency: string; + balance: string; + reason: string; +}; + type UserGroupForm = { groupKey: string; name: string; @@ -215,6 +223,8 @@ export function UsersPanel(props: IdentityPanelProps) { const [form, setForm] = useState(() => defaultUserForm(props.data.tenants[0])); const [localError, setLocalError] = useState(''); const [pendingDeleteUser, setPendingDeleteUser] = useState(null); + const [walletUser, setWalletUser] = useState(null); + const [walletForm, setWalletForm] = useState(() => defaultWalletForm()); const tenantById = useMemo(() => new Map(props.data.tenants.map((tenant) => [tenant.id, tenant])), [props.data.tenants]); @@ -238,6 +248,22 @@ export function UsersPanel(props: IdentityPanelProps) { setLocalError(''); } + function openWalletDialog(user: GatewayUser) { + const wallet = primaryWallet(user); + setLocalError(''); + setWalletUser(user); + setWalletForm({ + currency: wallet?.currency ?? 'resource', + balance: String(wallet?.balance ?? 0), + reason: '', + }); + } + + function closeWalletDialog() { + setWalletUser(null); + setWalletForm(defaultWalletForm()); + } + async function submit(event: FormEvent) { event.preventDefault(); setLocalError(''); @@ -258,6 +284,32 @@ export function UsersPanel(props: IdentityPanelProps) { } } + async function submitWallet(event: FormEvent) { + event.preventDefault(); + setLocalError(''); + if (!walletUser) return; + const balance = Number(walletForm.balance); + if (!Number.isFinite(balance) || balance < 0) { + setLocalError('余额必须是非负数字'); + return; + } + if (!walletForm.reason.trim()) { + setLocalError('请填写调整原因'); + return; + } + try { + await props.onSetUserWalletBalance(walletUser.id, { + currency: walletForm.currency, + balance, + reason: walletForm.reason.trim(), + idempotencyKey: newIdempotencyKey(), + }); + closeWalletDialog(); + } catch (err) { + setLocalError(err instanceof Error ? err.message : '更新余额失败'); + } + } + function selectTenant(gatewayTenantId: string) { const tenant = tenantById.get(gatewayTenantId); setForm({ ...form, gatewayTenantId, tenantKey: tenant?.tenantKey ?? form.tenantKey }); @@ -281,6 +333,7 @@ export function UsersPanel(props: IdentityPanelProps) { 角色 租户 用户组 + 余额 来源 状态 操作 @@ -291,9 +344,10 @@ export function UsersPanel(props: IdentityPanelProps) { {roleLabel(user.roles)} {tenantName(props.data.tenants, user.gatewayTenantId, user.tenantKey)} {groupName(props.data.userGroups, user.defaultUserGroupId)} + {walletSummary(user)} {user.source} {user.status} - editUser(user)} onDelete={() => setPendingDeleteUser(user)} /> + editUser(user)} onDelete={() => setPendingDeleteUser(user)} onWallet={() => openWalletDialog(user)} /> ))} @@ -343,6 +397,20 @@ export function UsersPanel(props: IdentityPanelProps) { onCancel={() => setPendingDeleteUser(null)} onConfirm={() => pendingDeleteUser ? deleteUser(pendingDeleteUser) : undefined} /> + } + open={Boolean(walletUser)} + title={`修改余额${walletUser ? ` · ${walletUser.displayName || walletUser.username}` : ''}`} + onClose={closeWalletDialog} + onSubmit={submitWallet} + > + + +