From 0d0d0b91151395a8480f5187f108a0b4188098e2 Mon Sep 17 00:00:00 2001 From: wangbo Date: Wed, 13 May 2026 12:29:00 +0800 Subject: [PATCH 01/15] fix: normalize video duration steps --- apps/api/internal/runner/param_processor.go | 58 +++++++++--- .../internal/runner/param_processor_test.go | 94 +++++++++++++++++++ 2 files changed, 137 insertions(+), 15 deletions(-) diff --git a/apps/api/internal/runner/param_processor.go b/apps/api/internal/runner/param_processor.go index 26cdfcd..621a574 100644 --- a/apps/api/internal/runner/param_processor.go +++ b/apps/api/internal/runner/param_processor.go @@ -3,6 +3,7 @@ package runner import ( "fmt" "math" + "sort" "strconv" "strings" @@ -518,7 +519,7 @@ func (durationProcessor) Process(params map[string]any, modelType string, contex resolution := firstNonEmptyString(stringFromAny(params["resolution"]), context.resolution) modeKey := videoModeKey(params) if options := scopedNumberList(capability["duration_options"], resolution, modeKey); len(options) > 0 { - normalized := closestNumber(duration, options) + normalized := nextAllowedNumber(duration, options) params["duration"] = normalized syncDurationSeconds(params) if normalized != duration { @@ -528,7 +529,7 @@ func (durationProcessor) Process(params map[string]any, modelType string, contex "duration", duration, normalized, - "duration 不在模型固定时长选项内,已调整为最近的允许值。", + "duration 不在模型固定时长选项内,已向上调整为允许值。", capabilityPath(modelType, "duration_options"), capability["duration_options"], ) @@ -555,6 +556,23 @@ func (durationProcessor) Process(params map[string]any, modelType string, contex }, ) } + return true + } + step := durationStep(capability["duration_step"], resolution, modeKey) + normalized := normalizeDurationByStep(duration, step) + params["duration"] = normalized + syncDurationSeconds(params) + if normalized != duration { + context.recordChange( + "DurationProcessor", + "adjust", + "duration", + duration, + normalized, + "duration 不符合模型时长步进,已按步进向上归一。", + capabilityPath(modelType, "duration_step"), + capability["duration_step"], + ) } return true } @@ -1044,28 +1062,38 @@ func durationStep(value any, scopes ...string) float64 { } func normalizeDurationByRange(target float64, minValue float64, maxValue float64, step float64) float64 { - clamped := math.Min(math.Max(target, minValue), maxValue) - if step <= 0 { - return clamped + if minValue > maxValue { + minValue, maxValue = maxValue, minValue } - snapped := math.Round((clamped-minValue)/step)*step + minValue + if step <= 0 { + step = 1 + } + clamped := math.Min(math.Max(target, minValue), maxValue) + snapped := math.Ceil(((clamped-minValue)/step)-1e-9)*step + minValue + snapped = math.Min(math.Max(snapped, minValue), maxValue) return math.Round(snapped*1_000_000) / 1_000_000 } -func closestNumber(target float64, values []float64) float64 { +func normalizeDurationByStep(target float64, step float64) float64 { + if step <= 0 { + step = 1 + } + snapped := math.Ceil((target/step)-1e-9) * step + return math.Round(snapped*1_000_000) / 1_000_000 +} + +func nextAllowedNumber(target float64, values []float64) float64 { if len(values) == 0 { return target } - closest := values[0] - minDiff := math.Abs(target - closest) - for _, value := range values[1:] { - diff := math.Abs(target - value) - if diff < minDiff { - minDiff = diff - closest = value + sorted := append([]float64(nil), values...) + sort.Float64s(sorted) + for _, value := range sorted { + if value >= target || math.Abs(value-target) < 1e-9 { + return value } } - return closest + return sorted[len(sorted)-1] } func videoModeKey(params map[string]any) string { diff --git a/apps/api/internal/runner/param_processor_test.go b/apps/api/internal/runner/param_processor_test.go index 1804ea3..34ab9f8 100644 --- a/apps/api/internal/runner/param_processor_test.go +++ b/apps/api/internal/runner/param_processor_test.go @@ -180,6 +180,100 @@ func TestParamProcessorVideoCapabilitiesNormalizeAndFilter(t *testing.T) { } } +func TestParamProcessorDurationRangeRoundsFractionalSecondsUp(t *testing.T) { + body := map[string]any{ + "model": "Seedance", + "prompt": "animate it", + "duration": 5.5, + } + candidate := store.RuntimeModelCandidate{ + ModelType: "video_generate", + Capabilities: map[string]any{ + "video_generate": map[string]any{ + "duration_range": []any{3, 12}, + }, + }, + } + + result := preprocessRequestWithLog("videos.generations", body, candidate) + if result.Body["duration"] != float64(6) && result.Body["duration"] != 6 { + t.Fatalf("fractional duration should be rounded up to default 1s step, got %+v", result.Body["duration"]) + } +} + +func TestParamProcessorDurationWithoutRangeStillRoundsUp(t *testing.T) { + body := map[string]any{ + "model": "Seedance", + "prompt": "animate it", + "duration": 5.2, + } + candidate := store.RuntimeModelCandidate{ + ModelType: "video_generate", + Capabilities: map[string]any{ + "video_generate": map[string]any{}, + }, + } + + result := preprocessRequestWithLog("videos.generations", body, candidate) + if result.Body["duration"] != float64(6) && result.Body["duration"] != 6 { + t.Fatalf("duration should default to a 1s upward step without range, got %+v", result.Body["duration"]) + } +} + +func TestParamProcessorDurationRangeUsesStepCeilingAndRange(t *testing.T) { + body := map[string]any{ + "model": "Seedance", + "prompt": "animate it", + "duration": 6.1, + "duration_seconds": 6.1, + } + candidate := store.RuntimeModelCandidate{ + ModelType: "image_to_video", + Capabilities: map[string]any{ + "image_to_video": map[string]any{ + "duration_range": []any{5, 10}, + "duration_step": 2, + }, + }, + } + + result := preprocessRequestWithLog("videos.generations", body, candidate) + if result.Body["duration"] != float64(7) && result.Body["duration"] != 7 { + t.Fatalf("duration should be rounded up by configured step, got %+v", result.Body["duration"]) + } + if result.Body["duration_seconds"] != result.Body["duration"] { + t.Fatalf("duration_seconds should sync with normalized duration, got %+v", result.Body) + } + + body["duration"] = 10.1 + body["duration_seconds"] = 10.1 + result = preprocessRequestWithLog("videos.generations", body, candidate) + if result.Body["duration"] != float64(10) && result.Body["duration"] != 10 { + t.Fatalf("duration should be capped by range max, got %+v", result.Body["duration"]) + } +} + +func TestParamProcessorDurationOptionsChooseNextAllowedValue(t *testing.T) { + body := map[string]any{ + "model": "Seedance", + "prompt": "animate it", + "duration": 8.1, + } + candidate := store.RuntimeModelCandidate{ + ModelType: "image_to_video", + Capabilities: map[string]any{ + "image_to_video": map[string]any{ + "duration_options": []any{4, 8, 12}, + }, + }, + } + + result := preprocessRequestWithLog("videos.generations", body, candidate) + if result.Body["duration"] != float64(12) && result.Body["duration"] != 12 { + t.Fatalf("duration should use next allowed option, got %+v", result.Body["duration"]) + } +} + func TestParamProcessorVideoGenerateLogsFirstFrameRemoval(t *testing.T) { body := map[string]any{ "model": "Seedance T2V", From fc5dfd6bc544ff63e41ca852a01c5ae96bea0596 Mon Sep 17 00:00:00 2001 From: wangbo Date: Wed, 13 May 2026 20:23:45 +0800 Subject: [PATCH 02/15] feat: add file storage settings and uploads --- .../internal/httpapi/file_upload_handlers.go | 58 + apps/api/internal/httpapi/server.go | 8 + .../httpapi/system_settings_handlers.go | 150 +++ apps/api/internal/runner/service.go | 2 +- apps/api/internal/runner/upload.go | 1040 ++++++++++++++++- apps/api/internal/runner/upload_test.go | 187 +++ .../internal/store/file_storage_channels.go | 499 ++++++++ .../migrations/0036_file_storage_channels.sql | 85 ++ .../migrations/0037_file_storage_settings.sql | 13 + apps/web/src/App.tsx | 77 +- apps/web/src/api.ts | 131 ++- apps/web/src/app-state.ts | 4 + apps/web/src/pages/AdminPage.tsx | 20 +- apps/web/src/pages/ApiDocsPage.tsx | 20 +- apps/web/src/pages/PlaygroundPage.tsx | 489 +++++++- .../src/pages/admin/SystemSettingsPanel.tsx | 444 +++++++ apps/web/src/routing.ts | 1 + apps/web/src/styles/pages.css | 120 ++ apps/web/src/styles/playground.css | 82 ++ apps/web/src/types.ts | 3 +- packages/contracts/src/index.ts | 40 + 21 files changed, 3401 insertions(+), 72 deletions(-) create mode 100644 apps/api/internal/httpapi/file_upload_handlers.go create mode 100644 apps/api/internal/httpapi/system_settings_handlers.go create mode 100644 apps/api/internal/runner/upload_test.go create mode 100644 apps/api/internal/store/file_storage_channels.go create mode 100644 apps/api/migrations/0036_file_storage_channels.sql create mode 100644 apps/api/migrations/0037_file_storage_settings.sql create mode 100644 apps/web/src/pages/admin/SystemSettingsPanel.tsx diff --git a/apps/api/internal/httpapi/file_upload_handlers.go b/apps/api/internal/httpapi/file_upload_handlers.go new file mode 100644 index 0000000..68db37a --- /dev/null +++ b/apps/api/internal/httpapi/file_upload_handlers.go @@ -0,0 +1,58 @@ +package httpapi + +import ( + "io" + "net/http" + "strings" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/clients" + "github.com/easyai/easyai-ai-gateway/apps/api/internal/runner" +) + +const maxGatewayUploadBytes = 256 << 20 + +func (s *Server) uploadFile(w http.ResponseWriter, r *http.Request) { + r.Body = http.MaxBytesReader(w, r.Body, maxGatewayUploadBytes) + if err := r.ParseMultipartForm(32 << 20); err != nil { + writeError(w, http.StatusBadRequest, "invalid multipart upload") + return + } + file, header, err := r.FormFile("file") + if err != nil { + writeError(w, http.StatusBadRequest, "file is required") + return + } + defer file.Close() + payload, err := io.ReadAll(file) + if err != nil { + writeError(w, http.StatusBadRequest, "read upload file failed") + return + } + contentType := strings.TrimSpace(header.Header.Get("Content-Type")) + if contentType == "" && len(payload) > 0 { + contentType = http.DetectContentType(payload) + } + upload, err := s.runner.UploadFile(r.Context(), runner.FileUploadPayload{ + Bytes: payload, + ContentType: contentType, + FileName: header.Filename, + Source: firstNonEmptyFormValue(r, "source", "ai-gateway-openapi"), + }) + if err != nil { + s.logger.Error("upload file failed", "error", err) + status := http.StatusBadGateway + if clients.ErrorCode(err) == "upload_no_channel" { + status = http.StatusServiceUnavailable + } + writeError(w, status, err.Error()) + return + } + writeJSON(w, http.StatusOK, upload) +} + +func firstNonEmptyFormValue(r *http.Request, key string, fallback string) string { + if value := strings.TrimSpace(r.FormValue(key)); value != "" { + return value + } + return fallback +} diff --git a/apps/api/internal/httpapi/server.go b/apps/api/internal/httpapi/server.go index 5cff51b..25b86b5 100644 --- a/apps/api/internal/httpapi/server.go +++ b/apps/api/internal/httpapi/server.go @@ -102,6 +102,12 @@ func NewServerWithContext(ctx context.Context, cfg config.Config, db *store.Stor mux.Handle("GET /api/admin/runtime/runner-policy", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.getRunnerPolicy))) mux.Handle("PATCH /api/admin/runtime/runner-policy", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updateRunnerPolicy))) mux.Handle("GET /api/admin/config/network-proxy", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.getNetworkProxyConfig))) + mux.Handle("GET /api/admin/system/file-storage/settings", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.getFileStorageSettings))) + mux.Handle("PATCH /api/admin/system/file-storage/settings", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updateFileStorageSettings))) + mux.Handle("GET /api/admin/system/file-storage/channels", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listFileStorageChannels))) + mux.Handle("POST /api/admin/system/file-storage/channels", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.createFileStorageChannel))) + mux.Handle("PATCH /api/admin/system/file-storage/channels/{channelID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updateFileStorageChannel))) + mux.Handle("DELETE /api/admin/system/file-storage/channels/{channelID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.deleteFileStorageChannel))) mux.Handle("GET /api/admin/platforms", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listPlatforms))) mux.Handle("POST /api/admin/platforms", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.createPlatform))) mux.Handle("PATCH /api/admin/platforms/{platformID}", server.requireAdmin(auth.PermissionManager, http.HandlerFunc(server.updatePlatform))) @@ -123,6 +129,7 @@ func NewServerWithContext(ctx context.Context, cfg config.Config, db *store.Stor mux.Handle("POST /api/v1/images/generations", server.auth.Require(auth.PermissionBasic, server.createTask("images.generations", false))) mux.Handle("POST /api/v1/images/edits", server.auth.Require(auth.PermissionBasic, server.createTask("images.edits", false))) mux.Handle("POST /api/v1/videos/generations", server.auth.Require(auth.PermissionBasic, server.createTask("videos.generations", false))) + mux.Handle("POST /api/v1/files/upload", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.uploadFile))) mux.Handle("GET /api/v1/tasks", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.listTasks))) mux.Handle("GET /api/v1/tasks/{taskID}", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.getTask))) mux.Handle("GET /api/v1/tasks/{taskID}/param-preprocessing", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.taskParamPreprocessing))) @@ -135,6 +142,7 @@ func NewServerWithContext(ctx context.Context, cfg config.Config, db *store.Stor mux.Handle("POST /v1/images/generations", server.auth.Require(auth.PermissionBasic, server.createTask("images.generations", true))) mux.Handle("POST /images/edits", server.auth.Require(auth.PermissionBasic, server.createTask("images.edits", true))) mux.Handle("POST /v1/images/edits", server.auth.Require(auth.PermissionBasic, server.createTask("images.edits", true))) + mux.Handle("POST /v1/files/upload", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.uploadFile))) return server.recover(server.cors(mux)) } diff --git a/apps/api/internal/httpapi/system_settings_handlers.go b/apps/api/internal/httpapi/system_settings_handlers.go new file mode 100644 index 0000000..649315d --- /dev/null +++ b/apps/api/internal/httpapi/system_settings_handlers.go @@ -0,0 +1,150 @@ +package httpapi + +import ( + "encoding/json" + "net/http" + "strings" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" +) + +func (s *Server) listFileStorageChannels(w http.ResponseWriter, r *http.Request) { + items, err := s.store.ListFileStorageChannels(r.Context()) + if err != nil { + s.logger.Error("list file storage channels failed", "error", err) + writeError(w, http.StatusInternalServerError, "list file storage channels failed") + return + } + writeJSON(w, http.StatusOK, map[string]any{"items": items}) +} + +func (s *Server) getFileStorageSettings(w http.ResponseWriter, r *http.Request) { + settings, err := s.store.GetFileStorageSettings(r.Context()) + if err != nil { + if store.IsUndefinedDatabaseObject(err) { + writeJSON(w, http.StatusOK, store.DefaultFileStorageSettings()) + return + } + s.logger.Error("get file storage settings failed", "error", err) + writeError(w, http.StatusInternalServerError, "get file storage settings failed") + return + } + writeJSON(w, http.StatusOK, settings) +} + +func (s *Server) updateFileStorageSettings(w http.ResponseWriter, r *http.Request) { + var input store.FileStorageSettingsInput + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + writeError(w, http.StatusBadRequest, "invalid json body") + return + } + settings, err := s.store.UpdateFileStorageSettings(r.Context(), input) + if err != nil { + s.logger.Error("update file storage settings failed", "error", err) + writeError(w, http.StatusInternalServerError, "update file storage settings failed") + return + } + writeJSON(w, http.StatusOK, settings) +} + +func (s *Server) createFileStorageChannel(w http.ResponseWriter, r *http.Request) { + var input store.FileStorageChannelInput + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + writeError(w, http.StatusBadRequest, "invalid json body") + return + } + if message := validateFileStorageChannelInput(input, nil); message != "" { + writeError(w, http.StatusBadRequest, message) + return + } + item, err := s.store.CreateFileStorageChannel(r.Context(), input) + if err != nil { + if store.IsUniqueViolation(err) { + writeError(w, http.StatusConflict, "file storage channel key already exists") + return + } + s.logger.Error("create file storage channel failed", "error", err) + writeError(w, http.StatusInternalServerError, "create file storage channel failed") + return + } + writeJSON(w, http.StatusCreated, item) +} + +func (s *Server) updateFileStorageChannel(w http.ResponseWriter, r *http.Request) { + var input store.FileStorageChannelInput + if err := json.NewDecoder(r.Body).Decode(&input); err != nil { + writeError(w, http.StatusBadRequest, "invalid json body") + return + } + existing, err := s.store.GetFileStorageChannel(r.Context(), r.PathValue("channelID")) + if err != nil { + if store.IsNotFound(err) { + writeError(w, http.StatusNotFound, "file storage channel not found") + return + } + s.logger.Error("get file storage channel failed", "error", err) + writeError(w, http.StatusInternalServerError, "get file storage channel failed") + return + } + if message := validateFileStorageChannelInput(input, &existing); message != "" { + writeError(w, http.StatusBadRequest, message) + return + } + item, err := s.store.UpdateFileStorageChannel(r.Context(), r.PathValue("channelID"), input) + if err != nil { + if store.IsNotFound(err) { + writeError(w, http.StatusNotFound, "file storage channel not found") + return + } + if store.IsUniqueViolation(err) { + writeError(w, http.StatusConflict, "file storage channel key already exists") + return + } + s.logger.Error("update file storage channel failed", "error", err) + writeError(w, http.StatusInternalServerError, "update file storage channel failed") + return + } + writeJSON(w, http.StatusOK, item) +} + +func (s *Server) deleteFileStorageChannel(w http.ResponseWriter, r *http.Request) { + if err := s.store.DeleteFileStorageChannel(r.Context(), r.PathValue("channelID")); err != nil { + if store.IsNotFound(err) { + writeError(w, http.StatusNotFound, "file storage channel not found") + return + } + s.logger.Error("delete file storage channel failed", "error", err) + writeError(w, http.StatusInternalServerError, "delete file storage channel failed") + return + } + w.WriteHeader(http.StatusNoContent) +} + +func validateFileStorageChannelInput(input store.FileStorageChannelInput, existing *store.FileStorageChannel) string { + provider := strings.ToLower(strings.TrimSpace(input.Provider)) + if provider == "" { + provider = "server_main_openapi" + } + status := strings.ToLower(strings.TrimSpace(input.Status)) + if status == "" { + status = "disabled" + } + if strings.TrimSpace(input.ChannelKey) == "" || strings.TrimSpace(input.Name) == "" { + return "channelKey and name are required" + } + if status != "enabled" && status != "disabled" { + return "status must be enabled or disabled" + } + if provider == "server_main_openapi" { + hasAPIKey := false + if input.APIKey != nil { + hasAPIKey = strings.TrimSpace(*input.APIKey) != "" + } else if existing != nil { + hasAPIKey = strings.TrimSpace(existing.APIKey) != "" + } + if status == "enabled" && !hasAPIKey { + return "server-main OpenAPI channel requires API key before enabling" + } + } + return "" +} diff --git a/apps/api/internal/runner/service.go b/apps/api/internal/runner/service.go index ade0144..288e169 100644 --- a/apps/api/internal/runner/service.go +++ b/apps/api/internal/runner/service.go @@ -481,7 +481,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user s.applyCandidateFailurePolicies(ctx, task.ID, candidate, err, simulated) return clients.Response{}, err } - uploadedResult, err := s.uploadGeneratedAssets(ctx, response.Result) + uploadedResult, err := s.uploadGeneratedAssets(ctx, task.ID, task.Kind, response.Result) if err != nil { metrics := mergeMetrics(taskMetrics(task, user, body, candidate, response, simulated), parameterPreprocessingMetrics(preprocessing), map[string]any{ "error": err.Error(), diff --git a/apps/api/internal/runner/upload.go b/apps/api/internal/runner/upload.go index 2972137..e5d0d6f 100644 --- a/apps/api/internal/runner/upload.go +++ b/apps/api/internal/runner/upload.go @@ -3,24 +3,108 @@ package runner import ( "bytes" "context" + "crypto/rand" "encoding/base64" + "encoding/hex" "encoding/json" "fmt" + "io" "mime/multipart" "net/http" + "net/textproto" + "net/url" "strings" + "time" "github.com/easyai/easyai-ai-gateway/apps/api/internal/clients" + "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" ) -func (s *Service) uploadGeneratedAssets(ctx context.Context, result map[string]any) (map[string]any, error) { - if s.cfg.ServerMainBaseURL == "" || s.cfg.ServerMainInternalToken == "" { - return result, nil +const defaultServerMainOpenAPIUploadURL = "http://127.0.0.1:3001/v1/files/upload" +const maxGeneratedAssetFetchBytes = 256 << 20 + +type FileUploadPayload struct { + ContentType string + FileName string + Scene string + Source string + Bytes []byte +} + +type generatedAssetUploadPolicy struct { + UploadInlineMedia bool + UploadURLMedia bool +} + +type generatedAssetDecision struct { + Inline *generatedInlineAsset + URL *generatedURLAsset + StripKeys []string +} + +type generatedInlineAsset struct { + Bytes []byte + ContentType string + Kind string + SourceKey string +} + +type generatedURLAsset struct { + URL string + ContentType string + Kind string + SourceKey string +} + +func defaultGeneratedAssetUploadPolicy() generatedAssetUploadPolicy { + return generatedAssetUploadPolicy{ + UploadInlineMedia: true, + UploadURLMedia: false, } +} + +func (s *Service) uploadGeneratedAssets(ctx context.Context, taskID string, taskKind string, result map[string]any) (map[string]any, error) { data, _ := result["data"].([]any) if len(data) == 0 { return result, nil } + policy, err := s.generatedAssetUploadPolicy(ctx) + if err != nil { + return nil, &clients.ClientError{Code: "upload_config_failed", Message: err.Error(), Retryable: true} + } + decisions := make([]generatedAssetDecision, len(data)) + needsUpload := false + changed := false + for index, rawItem := range data { + item, _ := rawItem.(map[string]any) + if item == nil { + continue + } + decision, err := generatedAssetDecisionForItem(taskKind, item, policy) + if err != nil { + return nil, err + } + decisions[index] = decision + if decision.Inline != nil || decision.URL != nil { + needsUpload = true + } + if len(decision.StripKeys) > 0 { + changed = true + } + } + if !needsUpload && !changed { + return result, nil + } + var channels []store.FileStorageChannel + if needsUpload { + channels, err = s.activeFileStorageChannels(ctx, store.FileStorageSceneImageResult) + if err != nil { + return nil, &clients.ClientError{Code: "upload_config_failed", Message: err.Error(), Retryable: true} + } + if len(channels) == 0 { + return nil, &clients.ClientError{Code: "upload_no_channel", Message: "no enabled file storage channel for generated media results", Retryable: false} + } + } next := map[string]any{} for key, value := range result { next[key] = value @@ -32,24 +116,57 @@ func (s *Service) uploadGeneratedAssets(ctx context.Context, result map[string]a nextData = append(nextData, rawItem) continue } - b64 := stringFromMap(item, "b64_json") - if b64 == "" { - nextData = append(nextData, rawItem) - continue - } - upload, err := s.uploadBase64Image(ctx, b64, index) - if err != nil { - return nil, err - } + decision := decisions[index] merged := map[string]any{} for key, value := range item { - if key != "b64_json" { - merged[key] = value - } + merged[key] = value } - merged["upload"] = upload - if urlValue, ok := upload["url"].(string); ok && urlValue != "" { - merged["url"] = urlValue + for _, key := range decision.StripKeys { + delete(merged, key) + } + if decision.Inline != nil || decision.URL != nil { + var upload map[string]any + var sourceKey string + var strategy string + var kind string + var contentType string + var err error + if decision.Inline != nil { + upload, contentType, kind, err = s.uploadGeneratedAsset(ctx, taskID, decision.Inline, index, channels) + sourceKey = decision.Inline.SourceKey + strategy = "upload_inline_media" + } else { + upload, contentType, kind, err = s.uploadGeneratedURLAsset(ctx, taskID, decision.URL, index, channels) + sourceKey = decision.URL.SourceKey + strategy = "upload_url_media" + } + if err != nil { + return nil, err + } + merged["upload"] = upload + merged["assetStorage"] = map[string]any{ + "scene": store.FileStorageSceneImageResult, + "source": sourceKey, + "strategy": strategy, + } + if contentType != "" { + merged["assetStorage"].(map[string]any)["contentType"] = contentType + } + if urlValue := stringFromAny(upload["url"]); urlValue != "" { + merged["url"] = urlValue + if kind == "video" { + merged["video_url"] = urlValue + } + if kind == "image" { + merged["image_url"] = urlValue + } + } + if kind != "" && stringFromAny(merged["type"]) == "" { + merged["type"] = kind + } + if contentType != "" && stringFromAny(merged["mime_type"]) == "" { + merged["mime_type"] = contentType + } } nextData = append(nextData, merged) } @@ -57,43 +174,313 @@ func (s *Service) uploadGeneratedAssets(ctx context.Context, result map[string]a return next, nil } -func (s *Service) uploadBase64Image(ctx context.Context, b64 string, index int) (map[string]any, error) { - payload, err := base64.StdEncoding.DecodeString(stripDataURLPrefix(b64)) +func (s *Service) generatedAssetUploadPolicy(ctx context.Context) (generatedAssetUploadPolicy, error) { + settings, err := s.store.GetFileStorageSettings(ctx) if err != nil { - return nil, &clients.ClientError{Code: "upload_decode_failed", Message: err.Error(), Retryable: false} + if store.IsUndefinedDatabaseObject(err) { + return defaultGeneratedAssetUploadPolicy(), nil + } + return generatedAssetUploadPolicy{}, err + } + return generatedAssetUploadPolicyFromName(settings.ResultUploadPolicy), nil +} + +func generatedAssetUploadPolicyFromName(policyName string) generatedAssetUploadPolicy { + policyName = store.NormalizeFileStorageResultUploadPolicy(policyName) + switch policyName { + case store.FileStorageResultUploadPolicyUploadAll: + return generatedAssetUploadPolicy{UploadInlineMedia: true, UploadURLMedia: true} + case store.FileStorageResultUploadPolicyUploadNone: + return generatedAssetUploadPolicy{UploadInlineMedia: false, UploadURLMedia: false} + default: + return defaultGeneratedAssetUploadPolicy() + } +} + +func (s *Service) uploadGeneratedAsset(ctx context.Context, taskID string, asset *generatedInlineAsset, index int, channels []store.FileStorageChannel) (map[string]any, string, string, error) { + contentType := resolvedGeneratedAssetContentType(asset.ContentType, asset.Kind, asset.Bytes) + kind := generatedAssetKindFromContentType(asset.Kind, contentType) + upload, err := s.uploadFileWithFailover(ctx, FileUploadPayload{ + Bytes: asset.Bytes, + ContentType: contentType, + FileName: generatedAssetFileName(taskID, index, contentType, kind), + Scene: store.FileStorageSceneImageResult, + Source: "ai-gateway", + }, channels) + return upload, contentType, kind, err +} + +func (s *Service) uploadGeneratedURLAsset(ctx context.Context, taskID string, asset *generatedURLAsset, index int, channels []store.FileStorageChannel) (map[string]any, string, string, error) { + payload, contentType, err := s.readGeneratedURLAsset(ctx, asset) + if err != nil { + return nil, "", "", err + } + contentType = resolvedGeneratedAssetContentType(firstNonEmptyString(contentType, asset.ContentType), asset.Kind, payload) + kind := generatedAssetKindFromContentType(asset.Kind, contentType) + upload, err := s.uploadFileWithFailover(ctx, FileUploadPayload{ + Bytes: payload, + ContentType: contentType, + FileName: generatedAssetFileName(taskID, index, contentType, kind), + Scene: store.FileStorageSceneImageResult, + Source: "ai-gateway", + }, channels) + return upload, contentType, kind, err +} + +func (s *Service) readGeneratedURLAsset(ctx context.Context, asset *generatedURLAsset) ([]byte, string, error) { + fetchURL, err := s.generatedAssetFetchURL(asset.URL) + if err != nil { + return nil, "", err + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fetchURL, nil) + if err != nil { + return nil, "", err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, "", &clients.ClientError{Code: "upload_source_fetch_failed", Message: err.Error(), Retryable: true} + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + message := strings.TrimSpace(string(body)) + if message == "" { + message = "generated media source fetch failed" + } + return nil, "", &clients.ClientError{ + Code: "upload_source_fetch_failed", + Message: message, + StatusCode: resp.StatusCode, + Retryable: clients.HTTPRetryable(resp.StatusCode), + } + } + payload, err := io.ReadAll(io.LimitReader(resp.Body, maxGeneratedAssetFetchBytes+1)) + if err != nil { + return nil, "", &clients.ClientError{Code: "upload_source_read_failed", Message: err.Error(), StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode)} + } + if len(payload) > maxGeneratedAssetFetchBytes { + return nil, "", &clients.ClientError{Code: "upload_source_too_large", Message: "generated media source exceeds upload fetch limit", StatusCode: resp.StatusCode, Retryable: false} + } + contentType := firstNonEmptyString(resp.Header.Get("Content-Type"), asset.ContentType) + return payload, strings.TrimSpace(strings.Split(contentType, ";")[0]), nil +} + +func (s *Service) generatedAssetFetchURL(raw string) (string, error) { + value := strings.TrimSpace(raw) + if value == "" { + return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: "generated media source URL is empty", Retryable: false} + } + parsed, err := url.Parse(value) + if err != nil { + return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: err.Error(), Retryable: false} + } + if parsed.IsAbs() { + if parsed.Scheme == "http" || parsed.Scheme == "https" { + return value, nil + } + return "", &clients.ClientError{Code: "upload_source_unsupported_url", Message: "unsupported generated media source URL scheme: " + parsed.Scheme, Retryable: false} + } + if strings.HasPrefix(value, "/") { + baseURL := generatedAssetLocalBaseURL(s.cfg.HTTPAddr) + if baseURL == "" { + return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: "generated media source uses a relative URL without a local HTTP address", Retryable: false} + } + return baseURL + value, nil + } + return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: "generated media source URL must be absolute or root-relative", Retryable: false} +} + +func generatedAssetLocalBaseURL(httpAddr string) string { + addr := strings.TrimSpace(httpAddr) + if addr == "" { + return "http://127.0.0.1:8088" + } + if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") { + return strings.TrimRight(addr, "/") + } + if strings.HasPrefix(addr, ":") { + return "http://127.0.0.1" + addr + } + if strings.Contains(addr, "://") { + return "" + } + return "http://" + strings.TrimRight(addr, "/") +} + +func (s *Service) UploadFile(ctx context.Context, payload FileUploadPayload) (map[string]any, error) { + if strings.TrimSpace(payload.Scene) == "" { + payload.Scene = store.FileStorageSceneUpload + } + channels, err := s.activeFileStorageChannels(ctx, payload.Scene) + if err != nil { + return nil, &clients.ClientError{Code: "upload_config_failed", Message: err.Error(), Retryable: true} + } + if len(channels) == 0 { + return nil, &clients.ClientError{Code: "upload_no_channel", Message: "no enabled file storage channel", Retryable: false} + } + return s.uploadFileWithFailover(ctx, payload, channels) +} + +func (s *Service) activeFileStorageChannels(ctx context.Context, scene string) ([]store.FileStorageChannel, error) { + channels, err := s.store.ListEnabledFileStorageChannelsForScene(ctx, scene) + if err != nil && !store.IsUndefinedDatabaseObject(err) { + return nil, err + } + if len(channels) > 0 { + return channels, nil + } + fallback := s.fallbackFileStorageChannel() + if fallback == nil { + return nil, nil + } + if !fileStorageChannelSupportsScene(*fallback, scene) { + return nil, nil + } + return []store.FileStorageChannel{*fallback}, nil +} + +func (s *Service) fallbackFileStorageChannel() *store.FileStorageChannel { + baseURL := strings.TrimRight(strings.TrimSpace(s.cfg.ServerMainBaseURL), "/") + apiKey := strings.TrimSpace(s.cfg.ServerMainInternalToken) + if baseURL == "" || apiKey == "" { + return nil + } + return &store.FileStorageChannel{ + ChannelKey: "server-main-env-fallback", + Name: "server-main env fallback", + Provider: "server_main_openapi", + UploadURL: baseURL + "/v1/files/upload", + APIKey: apiKey, + Scenes: []string{store.FileStorageSceneUpload, store.FileStorageSceneImageResult}, + RetryPolicy: defaultUploadRetryPolicy(), + Priority: 100, + Status: "enabled", + } +} + +func (s *Service) uploadFileWithFailover(ctx context.Context, payload FileUploadPayload, channels []store.FileStorageChannel) (map[string]any, error) { + var lastErr error + for _, channel := range channels { + upload, err := s.uploadWithChannelRetries(ctx, payload, channel) + if err == nil { + _ = s.store.MarkFileStorageChannelSuccess(context.WithoutCancel(ctx), channel.ID) + return upload, nil + } + lastErr = err + _ = s.store.MarkFileStorageChannelFailure(context.WithoutCancel(ctx), channel.ID, err.Error()) + } + if lastErr != nil { + return nil, lastErr + } + return nil, &clients.ClientError{Code: "upload_no_channel", Message: "no enabled file storage channel", Retryable: false} +} + +func fileStorageChannelSupportsScene(channel store.FileStorageChannel, scene string) bool { + scene = strings.TrimSpace(scene) + if scene == "" { + return true + } + scenes := channel.Scenes + if len(scenes) == 0 { + scenes = []string{store.FileStorageSceneUpload, store.FileStorageSceneImageResult} + } + for _, item := range scenes { + if strings.TrimSpace(item) == scene { + return true + } + } + return false +} + +func (s *Service) uploadWithChannelRetries(ctx context.Context, payload FileUploadPayload, channel store.FileStorageChannel) (map[string]any, error) { + maxRetries, delays := uploadRetrySchedule(channel.RetryPolicy) + var lastErr error + for attempt := 0; attempt <= maxRetries; attempt++ { + upload, err := s.uploadOnce(ctx, payload, channel) + if err == nil { + return upload, nil + } + lastErr = err + if attempt >= maxRetries || !clients.IsRetryable(err) { + break + } + delay := retryDelayForAttempt(attempt, delays) + if err := sleepWithContext(ctx, delay); err != nil { + return nil, err + } + } + return nil, lastErr +} + +func (s *Service) uploadOnce(ctx context.Context, payload FileUploadPayload, channel store.FileStorageChannel) (map[string]any, error) { + if strings.ToLower(strings.TrimSpace(channel.Provider)) != "server_main_openapi" { + return nil, &clients.ClientError{Code: "upload_unsupported_channel", Message: "unsupported file storage channel: " + channel.Provider, Retryable: false} + } + uploadURL := strings.TrimSpace(channel.UploadURL) + if uploadURL == "" { + uploadURL = defaultServerMainOpenAPIUploadURL + } + apiKey := strings.TrimSpace(channel.APIKey) + if apiKey == "" { + return nil, &clients.ClientError{Code: "missing_credentials", Message: "file storage channel API key is required", Retryable: false} } var body bytes.Buffer writer := multipart.NewWriter(&body) - fileWriter, err := writer.CreateFormFile("file", fmt.Sprintf("gateway-result-%d.png", index+1)) + fileWriter, err := createUploadFormFile(writer, "file", firstNonEmptyString(payload.FileName, "upload.bin"), payload.ContentType) if err != nil { return nil, err } - if _, err := fileWriter.Write(payload); err != nil { + if _, err := fileWriter.Write(payload.Bytes); err != nil { return nil, err } - _ = writer.WriteField("source", "ai-gateway") + _ = writer.WriteField("source", firstNonEmptyString(payload.Source, "ai-gateway")) + _ = writer.WriteField("scene", firstNonEmptyString(payload.Scene, store.FileStorageSceneUpload)) if err := writer.Close(); err != nil { return nil, err } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimRight(s.cfg.ServerMainBaseURL, "/")+"/v1/files/upload", &body) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadURL, &body) if err != nil { return nil, err } req.Header.Set("Content-Type", writer.FormDataContentType()) - req.Header.Set("Authorization", "Bearer "+s.cfg.ServerMainInternalToken) + if payload.ContentType != "" { + req.Header.Set("X-Upload-Content-Type", payload.ContentType) + } + req.Header.Set("Authorization", "Bearer "+apiKey) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, &clients.ClientError{Code: "upload_network", Message: err.Error(), Retryable: true} } defer resp.Body.Close() - var decoded map[string]any - if err := json.NewDecoder(resp.Body).Decode(&decoded); err != nil { - return nil, &clients.ClientError{Code: "upload_invalid_response", Message: err.Error(), Retryable: false} + responseBody, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, &clients.ClientError{Code: "upload_read_failed", Message: readErr.Error(), StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode)} } if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return nil, &clients.ClientError{Code: "upload_failed", Message: "server-main upload failed", StatusCode: resp.StatusCode, Retryable: resp.StatusCode >= 500} + message := strings.TrimSpace(string(responseBody)) + if message == "" { + message = "file upload failed" + } + return nil, &clients.ClientError{Code: "upload_failed", Message: message, StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode)} } - return decoded, nil + var decoded map[string]any + if err := json.Unmarshal(responseBody, &decoded); err != nil { + return nil, &clients.ClientError{Code: "upload_invalid_response", Message: err.Error(), Retryable: false} + } + return normalizeUploadResponse(decoded, channel), nil +} + +func createUploadFormFile(writer *multipart.Writer, fieldName string, fileName string, contentType string) (io.Writer, error) { + header := make(textproto.MIMEHeader) + header.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"; filename="%s"`, escapeMultipartValue(fieldName), escapeMultipartValue(fileName))) + if strings.TrimSpace(contentType) != "" { + header.Set("Content-Type", strings.TrimSpace(contentType)) + } + return writer.CreatePart(header) +} + +func escapeMultipartValue(value string) string { + return strings.NewReplacer("\\", "\\\\", `"`, "\\\"").Replace(value) } func stripDataURLPrefix(value string) string { @@ -102,3 +489,594 @@ func stripDataURLPrefix(value string) string { } return value } + +func generatedAssetDecisionForItem(taskKind string, item map[string]any, policy generatedAssetUploadPolicy) (generatedAssetDecision, error) { + decision := generatedAssetDecision{} + urlKey, mediaURL := mediaURLSourceFromItem(item) + if mediaURL != "" { + if !policy.UploadURLMedia { + decision.StripKeys = inlineMediaKeys(item) + return decision, nil + } + contentType := mediaContentTypeFromItem(item) + kind := mediaKindForAsset(taskKind, item, urlKey, contentType) + decision.URL = &generatedURLAsset{ + URL: mediaURL, + ContentType: contentType, + Kind: kind, + SourceKey: urlKey, + } + decision.StripKeys = uniqueStringList(append(mediaURLKeys(item), inlineMediaKeys(item)...)) + return decision, nil + } + if !policy.UploadInlineMedia { + return decision, nil + } + asset, keys, err := inlineAssetFromItem(taskKind, item) + if err != nil { + return decision, err + } + if asset == nil { + return decision, nil + } + decision.Inline = asset + decision.StripKeys = keys + return decision, nil +} + +func inlineAssetFromItem(taskKind string, item map[string]any) (*generatedInlineAsset, []string, error) { + for _, key := range inlineMediaCandidateKeys() { + value, ok := item[key] + if !ok || value == nil { + continue + } + strictBase64 := inlineMediaKeyIsStrictBase64(key) + payload, contentType, ok, err := inlineMediaPayload(value, strictBase64) + if err != nil { + return nil, nil, err + } + if !ok { + continue + } + contentType = firstNonEmptyString(contentType, mediaContentTypeFromItem(item), defaultContentTypeForGeneratedAsset(mediaKindForAsset(taskKind, item, key, contentType))) + kind := mediaKindForAsset(taskKind, item, key, contentType) + return &generatedInlineAsset{ + Bytes: payload, + ContentType: contentType, + Kind: kind, + SourceKey: key, + }, inlineMediaKeys(item), nil + } + return nil, nil, nil +} + +func inlineMediaPayload(value any, strictBase64 bool) ([]byte, string, bool, error) { + switch typed := value.(type) { + case []byte: + if len(typed) == 0 { + return nil, "", false, nil + } + payload := make([]byte, len(typed)) + copy(payload, typed) + return payload, "", true, nil + case []any: + payload, ok := bytesFromNumberArray(typed) + return payload, "", ok, nil + case map[string]any: + if data, ok := typed["data"].([]any); ok { + payload, ok := bytesFromNumberArray(data) + return payload, firstNonEmptyString(stringFromAny(typed["mime_type"]), stringFromAny(typed["mimeType"])), ok, nil + } + if data, ok := typed["data"].([]byte); ok && len(data) > 0 { + payload := make([]byte, len(data)) + copy(payload, data) + return payload, firstNonEmptyString(stringFromAny(typed["mime_type"]), stringFromAny(typed["mimeType"])), true, nil + } + return nil, "", false, nil + case string: + return inlineMediaPayloadFromString(typed, strictBase64) + default: + return nil, "", false, nil + } +} + +func inlineMediaPayloadFromString(value string, strictBase64 bool) ([]byte, string, bool, error) { + raw := strings.TrimSpace(value) + if raw == "" || mediaURLString(raw) { + return nil, "", false, nil + } + if strings.HasPrefix(strings.ToLower(raw), "data:") { + contentType, encoded, ok, err := parseBase64DataURL(raw) + if err != nil || !ok { + return nil, "", false, err + } + payload, err := decodeBase64Payload(encoded) + if err != nil { + return nil, "", false, &clients.ClientError{Code: "upload_decode_failed", Message: err.Error(), Retryable: false} + } + return payload, contentType, true, nil + } + if !strictBase64 && len(raw) < 64 { + return nil, "", false, nil + } + payload, err := decodeBase64Payload(raw) + if err != nil { + if strictBase64 { + return nil, "", false, &clients.ClientError{Code: "upload_decode_failed", Message: err.Error(), Retryable: false} + } + return nil, "", false, nil + } + return payload, "", true, nil +} + +func parseBase64DataURL(value string) (string, string, bool, error) { + prefix, payload, ok := strings.Cut(value, ",") + if !ok { + return "", "", false, &clients.ClientError{Code: "upload_decode_failed", Message: "invalid data URL media payload", Retryable: false} + } + meta := strings.TrimPrefix(prefix, "data:") + meta = strings.TrimPrefix(meta, "DATA:") + parts := strings.Split(meta, ";") + contentType := strings.TrimSpace(parts[0]) + isBase64 := false + for _, part := range parts[1:] { + if strings.EqualFold(strings.TrimSpace(part), "base64") { + isBase64 = true + break + } + } + if !isBase64 { + return "", "", false, &clients.ClientError{Code: "upload_decode_failed", Message: "data URL media payload is not base64 encoded", Retryable: false} + } + return contentType, payload, true, nil +} + +func decodeBase64Payload(value string) ([]byte, error) { + normalized := strings.Map(func(r rune) rune { + switch r { + case '\n', '\r', '\t', ' ': + return -1 + default: + return r + } + }, stripDataURLPrefix(value)) + encodings := []*base64.Encoding{ + base64.StdEncoding, + base64.RawStdEncoding, + base64.URLEncoding, + base64.RawURLEncoding, + } + var lastErr error + for _, encoding := range encodings { + payload, err := encoding.DecodeString(normalized) + if err == nil && len(payload) > 0 { + return payload, nil + } + if err != nil { + lastErr = err + } + } + if lastErr == nil { + lastErr = fmt.Errorf("empty base64 payload") + } + return nil, lastErr +} + +func bytesFromNumberArray(values []any) ([]byte, bool) { + if len(values) == 0 { + return nil, false + } + payload := make([]byte, 0, len(values)) + for _, value := range values { + next, ok := byteFromAny(value) + if !ok { + return nil, false + } + payload = append(payload, next) + } + return payload, true +} + +func byteFromAny(value any) (byte, bool) { + switch typed := value.(type) { + case byte: + return typed, true + case int: + if typed >= 0 && typed <= 255 { + return byte(typed), true + } + case int64: + if typed >= 0 && typed <= 255 { + return byte(typed), true + } + case float64: + asInt := int(typed) + if typed == float64(asInt) && asInt >= 0 && asInt <= 255 { + return byte(asInt), true + } + } + return 0, false +} + +func inlineMediaKeys(item map[string]any) []string { + keys := []string{} + for _, key := range inlineMediaCandidateKeys() { + value, ok := item[key] + if !ok || value == nil { + continue + } + strictBase64 := inlineMediaKeyIsStrictBase64(key) + if strictBase64 && stringFromAny(value) != "" { + keys = append(keys, key) + continue + } + if _, _, ok, _ := inlineMediaPayload(value, strictBase64); ok { + keys = append(keys, key) + } + } + return uniqueStringList(keys) +} + +func inlineMediaCandidateKeys() []string { + return []string{ + "b64_json", + "image_base64", + "image_b64", + "video_base64", + "video_b64", + "base64", + "b64", + "url", + "image_url", + "imageUrl", + "video_url", + "videoUrl", + "output_url", + "outputUrl", + "output_video_url", + "outputVideoUrl", + "image", + "video", + "image_buffer", + "image_bytes", + "video_buffer", + "video_bytes", + "buffer", + "bytes", + "data", + } +} + +func inlineMediaKeyIsStrictBase64(key string) bool { + lower := strings.ToLower(key) + return lower == "b64_json" || lower == "base64" || lower == "b64" || strings.Contains(lower, "base64") || strings.Contains(lower, "_b64") +} + +func mediaURLSourceFromItem(item map[string]any) (string, string) { + for _, key := range mediaURLCandidateKeys() { + value := stringFromAny(item[key]) + if value != "" && mediaURLString(value) { + return key, value + } + } + return "", "" +} + +func mediaURLKeys(item map[string]any) []string { + keys := []string{} + for _, key := range mediaURLCandidateKeys() { + value := stringFromAny(item[key]) + if value != "" && mediaURLString(value) { + keys = append(keys, key) + } + } + return uniqueStringList(keys) +} + +func mediaURLCandidateKeys() []string { + return []string{"url", "image_url", "imageUrl", "video_url", "videoUrl", "output_url", "outputUrl", "output_video_url", "outputVideoUrl", "download_url", "downloadUrl", "file_url", "fileUrl"} +} + +func mediaURLString(value string) bool { + raw := strings.TrimSpace(value) + if raw == "" { + return false + } + lower := strings.ToLower(raw) + if strings.HasPrefix(lower, "data:") { + return false + } + return strings.HasPrefix(lower, "http://") || + strings.HasPrefix(lower, "https://") || + strings.HasPrefix(lower, "/") || + strings.Contains(lower, "://") +} + +func mediaContentTypeFromItem(item map[string]any) string { + return firstNonEmptyString( + stringFromAny(item["mime_type"]), + stringFromAny(item["mimeType"]), + stringFromAny(item["content_type"]), + stringFromAny(item["contentType"]), + ) +} + +func mediaKindForAsset(taskKind string, item map[string]any, sourceKey string, contentType string) string { + contentType = strings.ToLower(strings.TrimSpace(contentType)) + if strings.HasPrefix(contentType, "image/") { + return "image" + } + if strings.HasPrefix(contentType, "video/") { + return "video" + } + if strings.HasPrefix(contentType, "audio/") { + return "audio" + } + itemType := strings.ToLower(strings.TrimSpace(stringFromAny(item["type"]))) + if strings.Contains(itemType, "video") { + return "video" + } + if strings.Contains(itemType, "image") { + return "image" + } + key := strings.ToLower(sourceKey) + if strings.Contains(key, "video") { + return "video" + } + if strings.Contains(key, "image") { + return "image" + } + kind := strings.ToLower(strings.TrimSpace(taskKind)) + if strings.Contains(kind, "video") { + return "video" + } + if strings.Contains(kind, "image") { + return "image" + } + return "image" +} + +func defaultContentTypeForGeneratedAsset(kind string) string { + switch strings.ToLower(strings.TrimSpace(kind)) { + case "video": + return "video/mp4" + case "audio": + return "audio/mpeg" + default: + return "image/png" + } +} + +func resolvedGeneratedAssetContentType(declared string, kind string, payload []byte) string { + declared = normalizeGeneratedContentType(declared) + detected := detectGeneratedAssetContentType(payload) + if generatedContentTypeIsMedia(detected) { + return detected + } + if generatedContentTypeIsMedia(declared) { + return declared + } + return defaultContentTypeForGeneratedAsset(kind) +} + +func detectGeneratedAssetContentType(payload []byte) string { + if len(payload) == 0 { + return "" + } + return normalizeGeneratedContentType(http.DetectContentType(payload)) +} + +func normalizeGeneratedContentType(contentType string) string { + return strings.ToLower(strings.TrimSpace(strings.Split(contentType, ";")[0])) +} + +func generatedContentTypeIsMedia(contentType string) bool { + return strings.HasPrefix(contentType, "image/") || + strings.HasPrefix(contentType, "video/") || + strings.HasPrefix(contentType, "audio/") +} + +func generatedAssetKindFromContentType(fallback string, contentType string) string { + contentType = normalizeGeneratedContentType(contentType) + if strings.HasPrefix(contentType, "image/") { + return "image" + } + if strings.HasPrefix(contentType, "video/") { + return "video" + } + if strings.HasPrefix(contentType, "audio/") { + return "audio" + } + fallback = strings.ToLower(strings.TrimSpace(fallback)) + if fallback != "" { + return fallback + } + return "image" +} + +func generatedAssetFileName(taskID string, index int, contentType string, kind string) string { + token := sanitizeGeneratedAssetNamePart(taskID) + if token == "" { + token = fmt.Sprintf("%d", time.Now().UTC().UnixNano()) + } + if len(token) > 32 { + token = token[:32] + } + return fmt.Sprintf("gateway-result-%s-%02d-%s%s", token, index+1, randomHexSuffix(6), fileExtensionForContentType(contentType, kind)) +} + +func sanitizeGeneratedAssetNamePart(value string) string { + value = strings.ToLower(strings.TrimSpace(value)) + var builder strings.Builder + for _, item := range value { + if (item >= 'a' && item <= 'z') || (item >= '0' && item <= '9') || item == '-' || item == '_' { + builder.WriteRune(item) + } + } + return strings.Trim(builder.String(), "-_") +} + +func randomHexSuffix(byteCount int) string { + if byteCount <= 0 { + byteCount = 6 + } + payload := make([]byte, byteCount) + if _, err := rand.Read(payload); err == nil { + return hex.EncodeToString(payload) + } + return fmt.Sprintf("%d", time.Now().UTC().UnixNano()) +} + +func fileExtensionForContentType(contentType string, kind string) string { + normalized := strings.ToLower(strings.TrimSpace(strings.Split(contentType, ";")[0])) + switch normalized { + case "image/jpeg", "image/jpg": + return ".jpg" + case "image/webp": + return ".webp" + case "image/gif": + return ".gif" + case "image/avif": + return ".avif" + case "image/bmp": + return ".bmp" + case "image/svg+xml": + return ".svg" + case "video/webm": + return ".webm" + case "video/quicktime": + return ".mov" + case "video/mp4": + return ".mp4" + case "audio/wav", "audio/x-wav": + return ".wav" + case "audio/ogg": + return ".ogg" + case "audio/mpeg", "audio/mp3": + return ".mp3" + case "image/png": + return ".png" + } + if strings.EqualFold(kind, "video") { + return ".mp4" + } + if strings.EqualFold(kind, "audio") { + return ".mp3" + } + return ".png" +} + +func uniqueStringList(values []string) []string { + seen := map[string]bool{} + out := make([]string, 0, len(values)) + for _, value := range values { + value = strings.TrimSpace(value) + if value == "" || seen[value] { + continue + } + seen[value] = true + out = append(out, value) + } + return out +} + +func uploadRetrySchedule(policy map[string]any) (int, []time.Duration) { + if policy == nil { + policy = defaultUploadRetryPolicy() + } + if enabled, ok := policy["enabled"].(bool); ok && !enabled { + return 0, nil + } + maxRetries := intFromPolicy(policy, "maxRetries") + if maxRetries <= 0 { + maxRetries = 3 + } + delays := uploadRetryDelays(policy["backoffSeconds"]) + if len(delays) == 0 { + delays = []time.Duration{60 * time.Second, 120 * time.Second, 180 * time.Second} + } + return maxRetries, delays +} + +func uploadRetryDelays(value any) []time.Duration { + items, ok := value.([]any) + if !ok { + return nil + } + delays := make([]time.Duration, 0, len(items)) + for _, item := range items { + seconds := int(floatFromAny(item)) + if seconds > 0 { + delays = append(delays, time.Duration(seconds)*time.Second) + } + } + return delays +} + +func retryDelayForAttempt(attempt int, delays []time.Duration) time.Duration { + if attempt < len(delays) { + return delays[attempt] + } + return delays[len(delays)-1] +} + +func sleepWithContext(ctx context.Context, delay time.Duration) error { + timer := time.NewTimer(delay) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + +func defaultUploadRetryPolicy() map[string]any { + return map[string]any{ + "enabled": true, + "maxRetries": 3, + "backoffSeconds": []any{60, 120, 180}, + "strategy": "exponential", + } +} + +func normalizeUploadResponse(decoded map[string]any, channel store.FileStorageChannel) map[string]any { + if decoded == nil { + decoded = map[string]any{} + } + if stringFromAny(decoded["url"]) == "" { + if urlValue := uploadResponseURL(decoded); urlValue != "" { + decoded["url"] = urlValue + } + } + decoded["storageChannel"] = map[string]any{ + "id": channel.ID, + "channelKey": channel.ChannelKey, + "name": channel.Name, + "provider": channel.Provider, + } + return decoded +} + +func uploadResponseURL(decoded map[string]any) string { + for _, key := range []string{"url", "fileUrl", "file_url"} { + if value := stringFromAny(decoded[key]); value != "" { + return value + } + } + for _, key := range []string{"data", "file", "result"} { + if nested, ok := decoded[key].(map[string]any); ok { + if value := uploadResponseURL(nested); value != "" { + return value + } + } + if items, ok := decoded[key].([]any); ok && len(items) > 0 { + if nested, ok := items[0].(map[string]any); ok { + if value := uploadResponseURL(nested); value != "" { + return value + } + } + } + } + return "" +} diff --git a/apps/api/internal/runner/upload_test.go b/apps/api/internal/runner/upload_test.go new file mode 100644 index 0000000..d183c29 --- /dev/null +++ b/apps/api/internal/runner/upload_test.go @@ -0,0 +1,187 @@ +package runner + +import ( + "encoding/base64" + "strings" + "testing" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" +) + +func TestGeneratedAssetDecisionSkipsURLResultAndStripsInlinePayload(t *testing.T) { + item := map[string]any{ + "b64_json": base64.StdEncoding.EncodeToString([]byte("inline image")), + "url": "https://cdn.example.com/generated.png", + } + + decision, err := generatedAssetDecisionForItem("images.generations", item, defaultGeneratedAssetUploadPolicy()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if decision.Inline != nil { + t.Fatalf("URL media should not be uploaded by the default policy") + } + if !containsString(decision.StripKeys, "b64_json") { + t.Fatalf("inline payload should be stripped when URL is already available: %+v", decision.StripKeys) + } +} + +func TestGeneratedAssetDecisionUploadsInlineImageBase64(t *testing.T) { + item := map[string]any{ + "b64_json": base64.StdEncoding.EncodeToString([]byte("inline image")), + "mime_type": "image/jpeg", + } + + decision, err := generatedAssetDecisionForItem("images.generations", item, defaultGeneratedAssetUploadPolicy()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if decision.Inline == nil { + t.Fatalf("expected inline image to be uploaded") + } + if decision.Inline.Kind != "image" || decision.Inline.ContentType != "image/jpeg" { + t.Fatalf("unexpected inline image metadata: %+v", decision.Inline) + } + if !containsString(decision.StripKeys, "b64_json") { + t.Fatalf("uploaded inline payload should be stripped: %+v", decision.StripKeys) + } +} + +func TestGeneratedAssetDecisionUploadsInlineVideoBuffer(t *testing.T) { + item := map[string]any{ + "type": "video", + "video_buffer": []any{float64(0), float64(1), float64(2), float64(3)}, + } + + decision, err := generatedAssetDecisionForItem("videos.generations", item, defaultGeneratedAssetUploadPolicy()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if decision.Inline == nil { + t.Fatalf("expected inline video buffer to be uploaded") + } + if decision.Inline.Kind != "video" || decision.Inline.ContentType != "video/mp4" { + t.Fatalf("unexpected inline video metadata: %+v", decision.Inline) + } + if !containsString(decision.StripKeys, "video_buffer") { + t.Fatalf("uploaded video buffer should be stripped: %+v", decision.StripKeys) + } +} + +func TestGeneratedAssetDecisionUploadsDataURL(t *testing.T) { + item := map[string]any{ + "url": "data:image/webp;base64," + base64.StdEncoding.EncodeToString([]byte("inline webp")), + } + + decision, err := generatedAssetDecisionForItem("images.generations", item, defaultGeneratedAssetUploadPolicy()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if decision.Inline == nil { + t.Fatalf("expected data URL to be uploaded") + } + if decision.Inline.SourceKey != "url" || decision.Inline.ContentType != "image/webp" { + t.Fatalf("unexpected data URL metadata: %+v", decision.Inline) + } + if !containsString(decision.StripKeys, "url") { + t.Fatalf("uploaded data URL field should be stripped: %+v", decision.StripKeys) + } +} + +func TestGeneratedAssetDecisionUploadsURLWhenPolicyUploadAll(t *testing.T) { + item := map[string]any{ + "type": "video", + "video_url": "https://cdn.example.com/generated.mp4", + } + + decision, err := generatedAssetDecisionForItem("videos.generations", item, generatedAssetUploadPolicy{UploadInlineMedia: true, UploadURLMedia: true}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if decision.URL == nil { + t.Fatalf("expected URL media to be uploaded") + } + if decision.URL.Kind != "video" || decision.URL.SourceKey != "video_url" { + t.Fatalf("unexpected URL media metadata: %+v", decision.URL) + } + if !containsString(decision.StripKeys, "video_url") { + t.Fatalf("uploaded URL field should be stripped: %+v", decision.StripKeys) + } +} + +func TestGeneratedAssetDecisionSkipsAllWhenPolicyUploadNone(t *testing.T) { + item := map[string]any{ + "b64_json": base64.StdEncoding.EncodeToString([]byte("inline image")), + } + + decision, err := generatedAssetDecisionForItem("images.generations", item, generatedAssetUploadPolicy{UploadInlineMedia: false, UploadURLMedia: false}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if decision.Inline != nil || decision.URL != nil || len(decision.StripKeys) != 0 { + t.Fatalf("upload_none should keep the result unchanged: %+v", decision) + } +} + +func TestGeneratedAssetUploadPolicyFromName(t *testing.T) { + tests := []struct { + name string + policyName string + want generatedAssetUploadPolicy + }{ + { + name: "default", + policyName: store.FileStorageResultUploadPolicyDefault, + want: generatedAssetUploadPolicy{UploadInlineMedia: true, UploadURLMedia: false}, + }, + { + name: "upload all", + policyName: store.FileStorageResultUploadPolicyUploadAll, + want: generatedAssetUploadPolicy{UploadInlineMedia: true, UploadURLMedia: true}, + }, + { + name: "upload none", + policyName: store.FileStorageResultUploadPolicyUploadNone, + want: generatedAssetUploadPolicy{UploadInlineMedia: false, UploadURLMedia: false}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := generatedAssetUploadPolicyFromName(tt.policyName) + if got != tt.want { + t.Fatalf("unexpected policy: got %+v, want %+v", got, tt.want) + } + }) + } +} + +func TestResolvedGeneratedAssetContentTypePrefersDetectedMedia(t *testing.T) { + pngPayload := []byte{0x89, 'P', 'N', 'G', 0x0d, 0x0a, 0x1a, 0x0a, 0, 0, 0, 0} + + contentType := resolvedGeneratedAssetContentType("image/jpeg", "image", pngPayload) + if contentType != "image/png" { + t.Fatalf("expected detected PNG content type, got %s", contentType) + } + if extension := fileExtensionForContentType(contentType, "image"); extension != ".png" { + t.Fatalf("expected PNG extension, got %s", extension) + } +} + +func TestResolvedGeneratedAssetContentTypeKeepsDeclaredMediaWhenDetectionIsGeneric(t *testing.T) { + contentType := resolvedGeneratedAssetContentType("image/webp", "image", []byte("not enough media bytes")) + if contentType != "image/webp" { + t.Fatalf("expected declared webp content type, got %s", contentType) + } +} + +func TestGeneratedAssetFileNameIsUniqueAndTyped(t *testing.T) { + first := generatedAssetFileName("663e19cd4fa9d8078385c7c9", 0, "image/png", "image") + second := generatedAssetFileName("663e19cd4fa9d8078385c7c9", 0, "image/png", "image") + if first == second { + t.Fatalf("expected generated file names to be unique, both were %s", first) + } + if !strings.HasPrefix(first, "gateway-result-663e19cd4fa9d8078385c7c9-01-") || !strings.HasSuffix(first, ".png") { + t.Fatalf("unexpected generated file name: %s", first) + } +} diff --git a/apps/api/internal/store/file_storage_channels.go b/apps/api/internal/store/file_storage_channels.go new file mode 100644 index 0000000..7bcba18 --- /dev/null +++ b/apps/api/internal/store/file_storage_channels.go @@ -0,0 +1,499 @@ +package store + +import ( + "context" + "encoding/json" + "strings" + "time" + + "github.com/jackc/pgx/v5" +) + +const defaultServerMainUploadURL = "http://127.0.0.1:3001/v1/files/upload" + +const ( + FileStorageSceneUpload = "upload" + FileStorageSceneImageResult = "image_result" +) + +const ( + FileStorageResultUploadPolicyDefault = "default" + FileStorageResultUploadPolicyUploadAll = "upload_all" + FileStorageResultUploadPolicyUploadNone = "upload_none" +) + +const SystemSettingFileStorage = "file_storage" + +const fileStorageChannelColumns = ` +id::text, channel_key, name, provider, COALESCE(upload_url, ''), credentials, +config, retry_policy, priority, status, COALESCE(last_error, ''), +COALESCE(last_failed_at::text, ''), COALESCE(last_succeeded_at::text, ''), +created_at, updated_at` + +type FileStorageChannel struct { + ID string `json:"id"` + ChannelKey string `json:"channelKey"` + Name string `json:"name"` + Provider string `json:"provider"` + UploadURL string `json:"uploadUrl,omitempty"` + APIKey string `json:"-"` + CredentialsPreview map[string]any `json:"credentialsPreview,omitempty"` + Scenes []string `json:"scenes,omitempty"` + Config map[string]any `json:"config,omitempty"` + RetryPolicy map[string]any `json:"retryPolicy,omitempty"` + Priority int `json:"priority"` + Status string `json:"status"` + LastError string `json:"lastError,omitempty"` + LastFailedAt string `json:"lastFailedAt,omitempty"` + LastSucceededAt string `json:"lastSucceededAt,omitempty"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + +type FileStorageChannelInput struct { + ChannelKey string `json:"channelKey"` + Name string `json:"name"` + Provider string `json:"provider"` + UploadURL string `json:"uploadUrl"` + APIKey *string `json:"apiKey"` + Scenes []string `json:"scenes"` + Config map[string]any `json:"config"` + RetryPolicy map[string]any `json:"retryPolicy"` + Priority int `json:"priority"` + Status string `json:"status"` +} + +type FileStorageSettings struct { + ResultUploadPolicy string `json:"resultUploadPolicy"` +} + +type FileStorageSettingsInput struct { + ResultUploadPolicy string `json:"resultUploadPolicy"` +} + +type fileStorageChannelScanner interface { + Scan(dest ...any) error +} + +func (s *Store) ListFileStorageChannels(ctx context.Context) ([]FileStorageChannel, error) { + rows, err := s.pool.Query(ctx, ` +SELECT `+fileStorageChannelColumns+` +FROM file_storage_channels +WHERE deleted_at IS NULL +ORDER BY priority ASC, created_at ASC`) + if err != nil { + return nil, err + } + defer rows.Close() + items := make([]FileStorageChannel, 0) + for rows.Next() { + item, err := scanFileStorageChannel(rows) + if err != nil { + return nil, err + } + items = append(items, item) + } + return items, rows.Err() +} + +func (s *Store) ListEnabledFileStorageChannels(ctx context.Context) ([]FileStorageChannel, error) { + return s.listEnabledFileStorageChannels(ctx, "") +} + +func (s *Store) ListEnabledFileStorageChannelsForScene(ctx context.Context, scene string) ([]FileStorageChannel, error) { + return s.listEnabledFileStorageChannels(ctx, normalizeFileStorageScene(scene)) +} + +func (s *Store) listEnabledFileStorageChannels(ctx context.Context, scene string) ([]FileStorageChannel, error) { + rows, err := s.pool.Query(ctx, ` +SELECT `+fileStorageChannelColumns+` +FROM file_storage_channels +WHERE deleted_at IS NULL + AND status = 'enabled' + AND ( + $1 = '' + OR NOT (config ? 'scenes') + OR jsonb_typeof(config->'scenes') <> 'array' + OR (config->'scenes') ? $1 + ) +ORDER BY priority ASC, created_at ASC`, scene) + if err != nil { + return nil, err + } + defer rows.Close() + items := make([]FileStorageChannel, 0) + for rows.Next() { + item, err := scanFileStorageChannel(rows) + if err != nil { + return nil, err + } + items = append(items, item) + } + return items, rows.Err() +} + +func (s *Store) GetFileStorageChannel(ctx context.Context, id string) (FileStorageChannel, error) { + return scanFileStorageChannel(s.pool.QueryRow(ctx, ` +SELECT `+fileStorageChannelColumns+` +FROM file_storage_channels +WHERE id = $1::uuid + AND deleted_at IS NULL`, id)) +} + +func (s *Store) CreateFileStorageChannel(ctx context.Context, input FileStorageChannelInput) (FileStorageChannel, error) { + input = normalizeFileStorageChannelInput(input) + credentials, _ := json.Marshal(credentialsFromFileStorageInput(input)) + config, _ := json.Marshal(configFromFileStorageInput(input)) + retryPolicy, _ := json.Marshal(defaultFileStorageRetryPolicyIfEmpty(input.RetryPolicy)) + + return scanFileStorageChannel(s.pool.QueryRow(ctx, ` +INSERT INTO file_storage_channels ( + channel_key, name, provider, upload_url, credentials, config, retry_policy, priority, status +) +VALUES ($1, $2, $3, NULLIF($4, ''), $5, $6, $7, $8, $9) +RETURNING `+fileStorageChannelColumns, + input.ChannelKey, + input.Name, + input.Provider, + input.UploadURL, + credentials, + config, + retryPolicy, + input.Priority, + input.Status, + )) +} + +func (s *Store) UpdateFileStorageChannel(ctx context.Context, id string, input FileStorageChannelInput) (FileStorageChannel, error) { + input = normalizeFileStorageChannelInput(input) + replaceCredentials := input.APIKey != nil + credentials, _ := json.Marshal(credentialsFromFileStorageInput(input)) + config, _ := json.Marshal(configFromFileStorageInput(input)) + retryPolicy, _ := json.Marshal(defaultFileStorageRetryPolicyIfEmpty(input.RetryPolicy)) + + return scanFileStorageChannel(s.pool.QueryRow(ctx, ` +UPDATE file_storage_channels +SET channel_key = $2, + name = $3, + provider = $4, + upload_url = NULLIF($5, ''), + credentials = CASE WHEN $6::boolean THEN $7 ELSE credentials END, + config = $8, + retry_policy = $9, + priority = $10, + status = $11, + updated_at = now() +WHERE id = $1::uuid + AND deleted_at IS NULL +RETURNING `+fileStorageChannelColumns, + id, + input.ChannelKey, + input.Name, + input.Provider, + input.UploadURL, + replaceCredentials, + credentials, + config, + retryPolicy, + input.Priority, + input.Status, + )) +} + +func (s *Store) DeleteFileStorageChannel(ctx context.Context, id string) error { + result, err := s.pool.Exec(ctx, ` +UPDATE file_storage_channels +SET deleted_at = now(), + status = 'disabled', + updated_at = now() +WHERE id = $1::uuid + AND deleted_at IS NULL`, id) + if err != nil { + return err + } + if result.RowsAffected() == 0 { + return pgx.ErrNoRows + } + return nil +} + +func (s *Store) MarkFileStorageChannelFailure(ctx context.Context, id string, message string) error { + if strings.TrimSpace(id) == "" { + return nil + } + _, err := s.pool.Exec(ctx, ` +UPDATE file_storage_channels +SET last_error = NULLIF($2, ''), + last_failed_at = now(), + updated_at = now() +WHERE id = $1::uuid + AND deleted_at IS NULL`, id, strings.TrimSpace(message)) + return err +} + +func (s *Store) MarkFileStorageChannelSuccess(ctx context.Context, id string) error { + if strings.TrimSpace(id) == "" { + return nil + } + _, err := s.pool.Exec(ctx, ` +UPDATE file_storage_channels +SET last_error = NULL, + last_succeeded_at = now(), + updated_at = now() +WHERE id = $1::uuid + AND deleted_at IS NULL`, id) + return err +} + +func scanFileStorageChannel(scanner fileStorageChannelScanner) (FileStorageChannel, error) { + var item FileStorageChannel + var credentials []byte + var config []byte + var retryPolicy []byte + if err := scanner.Scan( + &item.ID, + &item.ChannelKey, + &item.Name, + &item.Provider, + &item.UploadURL, + &credentials, + &config, + &retryPolicy, + &item.Priority, + &item.Status, + &item.LastError, + &item.LastFailedAt, + &item.LastSucceededAt, + &item.CreatedAt, + &item.UpdatedAt, + ); err != nil { + return FileStorageChannel{}, err + } + credentialObject := decodeObject(credentials) + item.APIKey = stringFromObject(credentialObject, "apiKey") + item.CredentialsPreview = maskCredentialsPreview(credentials) + configObject := decodeObject(config) + item.Scenes = fileStorageScenesFromConfig(configObject) + item.Config = fileStorageConfigWithoutManagedFields(configObject) + item.RetryPolicy = decodeObject(retryPolicy) + return item, nil +} + +func normalizeFileStorageChannelInput(input FileStorageChannelInput) FileStorageChannelInput { + input.ChannelKey = strings.TrimSpace(input.ChannelKey) + input.Name = strings.TrimSpace(input.Name) + input.Provider = strings.ToLower(strings.TrimSpace(input.Provider)) + input.UploadURL = strings.TrimSpace(input.UploadURL) + if input.APIKey != nil { + apiKey := strings.TrimSpace(*input.APIKey) + input.APIKey = &apiKey + } + input.Scenes = normalizeFileStorageScenes(input.Scenes) + input.Status = strings.ToLower(strings.TrimSpace(input.Status)) + if input.Provider == "" { + input.Provider = "server_main_openapi" + } + if input.Provider == "server_main_openapi" && input.UploadURL == "" { + input.UploadURL = defaultServerMainUploadURL + } + if input.Status == "" { + input.Status = "disabled" + } + if input.Priority <= 0 { + input.Priority = 100 + } + return input +} + +func credentialsFromFileStorageInput(input FileStorageChannelInput) map[string]any { + apiKey := fileStorageInputAPIKey(input) + if apiKey == "" { + return map[string]any{} + } + return map[string]any{"apiKey": apiKey} +} + +func fileStorageInputAPIKey(input FileStorageChannelInput) string { + if input.APIKey == nil { + return "" + } + return strings.TrimSpace(*input.APIKey) +} + +func configFromFileStorageInput(input FileStorageChannelInput) map[string]any { + config := map[string]any{} + for key, value := range emptyObjectIfNil(input.Config) { + config[key] = value + } + config["scenes"] = normalizeFileStorageScenes(input.Scenes) + return config +} + +func fileStorageConfigWithoutManagedFields(config map[string]any) map[string]any { + out := map[string]any{} + for key, value := range config { + if key == "scenes" || key == "resultUploadPolicy" { + continue + } + out[key] = value + } + if len(out) == 0 { + return nil + } + return out +} + +func DefaultFileStorageSettings() FileStorageSettings { + return FileStorageSettings{ResultUploadPolicy: FileStorageResultUploadPolicyDefault} +} + +func (s *Store) GetFileStorageSettings(ctx context.Context) (FileStorageSettings, error) { + var value []byte + err := s.pool.QueryRow(ctx, ` +SELECT value +FROM system_settings +WHERE setting_key = $1`, SystemSettingFileStorage).Scan(&value) + if err != nil { + if IsNotFound(err) { + return DefaultFileStorageSettings(), nil + } + return FileStorageSettings{}, err + } + return fileStorageSettingsFromValue(decodeObject(value)), nil +} + +func (s *Store) UpdateFileStorageSettings(ctx context.Context, input FileStorageSettingsInput) (FileStorageSettings, error) { + settings := FileStorageSettings{ResultUploadPolicy: NormalizeFileStorageResultUploadPolicy(input.ResultUploadPolicy)} + value, _ := json.Marshal(settings) + var saved []byte + err := s.upsertFileStorageSettings(ctx, value, &saved) + if err != nil && IsUndefinedDatabaseObject(err) { + if ensureErr := s.ensureSystemSettingsTable(ctx); ensureErr != nil { + return FileStorageSettings{}, ensureErr + } + err = s.upsertFileStorageSettings(ctx, value, &saved) + } + if err != nil { + return FileStorageSettings{}, err + } + return fileStorageSettingsFromValue(decodeObject(saved)), nil +} + +func (s *Store) upsertFileStorageSettings(ctx context.Context, value []byte, saved *[]byte) error { + return s.pool.QueryRow(ctx, ` +INSERT INTO system_settings (setting_key, value) +VALUES ($1, $2) +ON CONFLICT (setting_key) +DO UPDATE SET value = EXCLUDED.value, updated_at = now() +RETURNING value`, SystemSettingFileStorage, value).Scan(saved) +} + +func (s *Store) ensureSystemSettingsTable(ctx context.Context) error { + _, err := s.pool.Exec(ctx, ` +CREATE TABLE IF NOT EXISTS system_settings ( + setting_key text PRIMARY KEY, + value jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +)`) + return err +} + +func fileStorageSettingsFromValue(value map[string]any) FileStorageSettings { + settings := DefaultFileStorageSettings() + if value == nil { + return settings + } + settings.ResultUploadPolicy = NormalizeFileStorageResultUploadPolicy(stringFromAny(value["resultUploadPolicy"])) + return settings +} + +func NormalizeFileStorageResultUploadPolicy(policy string) string { + normalized := strings.ToLower(strings.TrimSpace(policy)) + normalized = strings.ReplaceAll(normalized, "-", "_") + switch normalized { + case "", "default", "non_link_only", "inline_only", "nonlink_only", "non_link": + return FileStorageResultUploadPolicyDefault + case "upload_all", "all", "always", "all_upload": + return FileStorageResultUploadPolicyUploadAll + case "upload_none", "none", "never", "disabled", "no_upload", "skip", "skip_all": + return FileStorageResultUploadPolicyUploadNone + default: + return FileStorageResultUploadPolicyDefault + } +} + +func fileStorageScenesFromConfig(config map[string]any) []string { + if config == nil { + return defaultFileStorageScenes() + } + raw, ok := config["scenes"] + if !ok { + return defaultFileStorageScenes() + } + items, ok := raw.([]any) + if !ok { + return defaultFileStorageScenes() + } + scenes := make([]string, 0, len(items)) + for _, item := range items { + if value, ok := item.(string); ok { + scenes = append(scenes, value) + } + } + return normalizeFileStorageScenes(scenes) +} + +func normalizeFileStorageScenes(scenes []string) []string { + seen := map[string]bool{} + out := make([]string, 0, len(scenes)) + for _, item := range scenes { + scene := normalizeFileStorageScene(item) + if scene == "" || seen[scene] { + continue + } + seen[scene] = true + out = append(out, scene) + } + if len(out) == 0 { + return defaultFileStorageScenes() + } + return out +} + +func normalizeFileStorageScene(scene string) string { + return strings.ToLower(strings.TrimSpace(scene)) +} + +func defaultFileStorageScenes() []string { + return []string{FileStorageSceneUpload, FileStorageSceneImageResult} +} + +func defaultFileStorageRetryPolicyIfEmpty(policy map[string]any) map[string]any { + if len(policy) > 0 { + return policy + } + return map[string]any{ + "enabled": true, + "maxRetries": 3, + "backoffSeconds": []any{60, 120, 180}, + "strategy": "exponential", + } +} + +func stringFromObject(value map[string]any, key string) string { + if value == nil { + return "" + } + raw, _ := value[key].(string) + return strings.TrimSpace(raw) +} + +func stringFromAny(value any) string { + switch typed := value.(type) { + case string: + return strings.TrimSpace(typed) + default: + return "" + } +} diff --git a/apps/api/migrations/0036_file_storage_channels.sql b/apps/api/migrations/0036_file_storage_channels.sql new file mode 100644 index 0000000..85c7466 --- /dev/null +++ b/apps/api/migrations/0036_file_storage_channels.sql @@ -0,0 +1,85 @@ +CREATE TABLE IF NOT EXISTS system_settings ( + setting_key text PRIMARY KEY, + value jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +INSERT INTO system_settings (setting_key, value) +VALUES ( + 'file_storage', + '{"resultUploadPolicy": "default"}'::jsonb +) +ON CONFLICT (setting_key) DO NOTHING; + +CREATE TABLE IF NOT EXISTS file_storage_channels ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + channel_key text NOT NULL UNIQUE, + name text NOT NULL, + provider text NOT NULL DEFAULT 'server_main_openapi', + upload_url text, + credentials jsonb NOT NULL DEFAULT '{}'::jsonb, + config jsonb NOT NULL DEFAULT '{}'::jsonb, + retry_policy jsonb NOT NULL DEFAULT '{ + "enabled": true, + "maxRetries": 3, + "backoffSeconds": [60, 120, 180], + "strategy": "exponential" + }'::jsonb, + priority integer NOT NULL DEFAULT 100, + status text NOT NULL DEFAULT 'disabled', + last_error text, + last_failed_at timestamptz, + last_succeeded_at timestamptz, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now(), + deleted_at timestamptz +); + +ALTER TABLE IF EXISTS file_storage_channels + ADD COLUMN IF NOT EXISTS upload_url text, + ADD COLUMN IF NOT EXISTS config jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS retry_policy jsonb NOT NULL DEFAULT '{ + "enabled": true, + "maxRetries": 3, + "backoffSeconds": [60, 120, 180], + "strategy": "exponential" + }'::jsonb, + ADD COLUMN IF NOT EXISTS priority integer NOT NULL DEFAULT 100, + ADD COLUMN IF NOT EXISTS last_error text, + ADD COLUMN IF NOT EXISTS last_failed_at timestamptz, + ADD COLUMN IF NOT EXISTS last_succeeded_at timestamptz, + ADD COLUMN IF NOT EXISTS deleted_at timestamptz; + +CREATE INDEX IF NOT EXISTS idx_file_storage_channels_active + ON file_storage_channels (status, priority, created_at) + WHERE deleted_at IS NULL; + +INSERT INTO file_storage_channels ( + channel_key, + name, + provider, + upload_url, + credentials, + config, + retry_policy, + priority, + status +) +VALUES ( + 'server-main-openapi', + 'server-main OpenAPI', + 'server_main_openapi', + 'http://127.0.0.1:3001/v1/files/upload', + '{}'::jsonb, + '{"scenes": ["upload", "image_result"]}'::jsonb, + '{ + "enabled": true, + "maxRetries": 3, + "backoffSeconds": [60, 120, 180], + "strategy": "exponential" + }'::jsonb, + 100, + 'disabled' +) +ON CONFLICT (channel_key) DO NOTHING; diff --git a/apps/api/migrations/0037_file_storage_settings.sql b/apps/api/migrations/0037_file_storage_settings.sql new file mode 100644 index 0000000..2d2e92a --- /dev/null +++ b/apps/api/migrations/0037_file_storage_settings.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS system_settings ( + setting_key text PRIMARY KEY, + value jsonb NOT NULL DEFAULT '{}'::jsonb, + created_at timestamptz NOT NULL DEFAULT now(), + updated_at timestamptz NOT NULL DEFAULT now() +); + +INSERT INTO system_settings (setting_key, value) +VALUES ( + 'file_storage', + '{"resultUploadPolicy": "default"}'::jsonb +) +ON CONFLICT (setting_key) DO NOTHING; diff --git a/apps/web/src/App.tsx b/apps/web/src/App.tsx index 2d508af..11cc3fb 100644 --- a/apps/web/src/App.tsx +++ b/apps/web/src/App.tsx @@ -2,6 +2,10 @@ import { useEffect, useMemo, useRef, useState, type FormEvent } from 'react'; import type { BaseModelCatalogItem, CatalogProvider, + FileStorageChannel, + FileStorageSettings, + FileStorageSettingsUpdateRequest, + FileStorageChannelUpsertRequest, GatewayAccessRuleBatchRequest, GatewayAccessRule, GatewayAccessRuleUpsertRequest, @@ -34,17 +38,21 @@ import { batchApiKeyAccessRules, createAccessRule, createApiKey, + createFileStorageChannel, createGatewayUser, createPlatform, createTenant, createUserGroup, deleteAccessRule, deleteApiKey, + deleteFileStorageChannel, deleteGatewayUser, deletePlatform, deleteTenant, deleteUserGroup, getHealth, + listFileStorageChannels, + getFileStorageSettings, getNetworkProxyConfig, getRunnerPolicy, getWalletSummary, @@ -78,6 +86,8 @@ import { setUserWalletBalance, type HealthResponse, updateAccessRule, + updateFileStorageChannel, + updateFileStorageSettings, updateGatewayUser, updatePlatform, updatePlatformDynamicPriority, @@ -135,6 +145,8 @@ type DataKey = | 'playgroundModels' | 'modelCatalog' | 'networkProxyConfig' + | 'fileStorageChannels' + | 'fileStorageSettings' | 'platforms' | 'models' | 'providers' @@ -179,6 +191,8 @@ export function App() { }); const [playgroundModels, setPlaygroundModels] = useState([]); const [networkProxyConfig, setNetworkProxyConfig] = useState(null); + const [fileStorageChannels, setFileStorageChannels] = useState([]); + const [fileStorageSettings, setFileStorageSettings] = useState(null); const [providers, setProviders] = useState([]); const [baseModels, setBaseModels] = useState([]); const [pricingRules, setPricingRules] = useState([]); @@ -296,6 +310,8 @@ export function App() { auditLogs, apiKeys, baseModels, + fileStorageChannels, + fileStorageSettings, modelCatalog, models, networkProxyConfig, @@ -315,7 +331,7 @@ export function App() { users, walletAccounts, walletTransactions, - }), [accessRules, apiKeys, auditLogs, baseModels, modelCatalog, modelRateLimits, modelRateLimitsUpdatedAt, models, networkProxyConfig, platforms, pricingRuleSets, pricingRules, providers, rateLimitWindows, runnerPolicy, runtimePolicySets, taskResult, tasks, tenants, userGroups, users, walletAccounts, walletTransactions]); + }), [accessRules, apiKeys, auditLogs, baseModels, fileStorageChannels, fileStorageSettings, modelCatalog, modelRateLimits, modelRateLimitsUpdatedAt, models, networkProxyConfig, platforms, pricingRuleSets, pricingRules, providers, rateLimitWindows, runnerPolicy, runtimePolicySets, taskResult, tasks, tenants, userGroups, users, walletAccounts, walletTransactions]); async function refresh(nextToken = token) { await ensureRouteData(nextToken, true); @@ -388,6 +404,12 @@ export function App() { case 'networkProxyConfig': setNetworkProxyConfig(await getNetworkProxyConfig(nextToken)); return; + case 'fileStorageChannels': + setFileStorageChannels((await listFileStorageChannels(nextToken)).items); + return; + case 'fileStorageSettings': + setFileStorageSettings(await getFileStorageSettings(nextToken)); + return; case 'playgroundModels': setPlaygroundModels((await listPlayableModels(nextToken)).items); return; @@ -818,6 +840,53 @@ export function App() { } } + async function saveFileStorageChannel(input: FileStorageChannelUpsertRequest, channelId?: string) { + setCoreState('loading'); + setCoreMessage(''); + try { + const item = channelId + ? await updateFileStorageChannel(token, channelId, input) + : await createFileStorageChannel(token, input); + setFileStorageChannels((current) => [item, ...current.filter((channel) => channel.id !== item.id)]); + setCoreState('ready'); + setCoreMessage(channelId ? '文件存储渠道已更新。' : '文件存储渠道已新增。'); + } catch (err) { + setCoreState('error'); + setCoreMessage(err instanceof Error ? err.message : channelId ? '更新文件存储渠道失败' : '新增文件存储渠道失败'); + throw err; + } + } + + async function saveFileStorageSettings(input: FileStorageSettingsUpdateRequest) { + setCoreState('loading'); + setCoreMessage(''); + try { + const settings = await updateFileStorageSettings(token, input); + setFileStorageSettings(settings); + setCoreState('ready'); + setCoreMessage('文件存储全局策略已更新。'); + } catch (err) { + setCoreState('error'); + setCoreMessage(err instanceof Error ? err.message : '更新文件存储全局策略失败'); + throw err; + } + } + + async function removeFileStorageChannel(channelId: string) { + setCoreState('loading'); + setCoreMessage(''); + try { + await deleteFileStorageChannel(token, channelId); + setFileStorageChannels((current) => current.filter((channel) => channel.id !== channelId)); + setCoreState('ready'); + setCoreMessage('文件存储渠道已删除。'); + } catch (err) { + setCoreState('error'); + setCoreMessage(err instanceof Error ? err.message : '删除文件存储渠道失败'); + throw err; + } + } + async function batchSaveAPIKeyAccessRules(input: GatewayAccessRuleBatchRequest) { setCoreState('loading'); setCoreMessage(''); @@ -867,6 +936,7 @@ export function App() { setModelCatalog({ items: [], filters: { capabilities: [], providers: [] }, summary: { modelCount: 0, sourceCount: 0 } }); setPlaygroundModels([]); setNetworkProxyConfig(null); + setFileStorageChannels([]); setProviders([]); setBaseModels([]); setPricingRules([]); @@ -1039,6 +1109,7 @@ export function App() { onDeletePricingRuleSet={removePricingRuleSet} onDeleteRuntimePolicySet={removeRuntimePolicySet} onDeleteAccessRule={removeAccessRule} + onDeleteFileStorageChannel={removeFileStorageChannel} onDeleteTenant={removeTenant} onDeleteUser={removeUser} onDeleteUserGroup={removeUserGroup} @@ -1054,6 +1125,8 @@ export function App() { onSaveRuntimePolicySet={saveRuntimePolicySet} onBatchAccessRules={batchSaveAccessRules} onSaveAccessRule={saveAccessRule} + onSaveFileStorageChannel={saveFileStorageChannel} + onSaveFileStorageSettings={saveFileStorageSettings} onSaveTenant={saveTenant} onSaveUser={saveUser} onSetUserWalletBalance={saveUserWalletBalance} @@ -1267,6 +1340,8 @@ function dataKeysForRoute( return ['auditLogs']; case 'accessRules': return ['accessRules', 'userGroups', 'platforms', 'models']; + case 'systemSettings': + return ['fileStorageSettings', 'fileStorageChannels']; default: return []; } diff --git a/apps/web/src/api.ts b/apps/web/src/api.ts index eeb407c..74e8f8f 100644 --- a/apps/web/src/api.ts +++ b/apps/web/src/api.ts @@ -5,6 +5,10 @@ import type { CatalogProvider, CatalogProviderUpsertRequest, CreatedGatewayApiKey, + FileStorageChannel, + FileStorageSettings, + FileStorageSettingsUpdateRequest, + FileStorageChannelUpsertRequest, GatewayAccessRuleBatchRequest, GatewayAccessRule, GatewayAccessRuleUpsertRequest, @@ -601,10 +605,17 @@ export async function createImageGenerationTask( model: string; prompt: string; aspect_ratio?: string; + content?: Array>; count?: number; height?: number; + image?: string | string[]; + image_url?: string | string[]; + image_urls?: string[]; + images?: string[]; n?: number; quality?: string; + referenceImage?: string | string[]; + reference_image?: string | string[]; resolution?: string; runMode?: string; simulation?: boolean; @@ -622,7 +633,26 @@ export async function createImageGenerationTask( export async function createImageEditTask( token: string, - input: { model: string; prompt: string; image?: string; mask?: string; runMode?: string; simulation?: boolean }, + input: { + model: string; + prompt: string; + aspect_ratio?: string; + content?: Array>; + count?: number; + height?: number; + image?: string | string[]; + image_url?: string | string[]; + image_urls?: string[]; + images?: string[]; + mask?: string; + n?: number; + quality?: string; + resolution?: string; + runMode?: string; + simulation?: boolean; + size?: string; + width?: number; + }, ): Promise<{ task: GatewayTask; next: Record }> { return request<{ task: GatewayTask; next: Record }>('/api/v1/images/edits', { body: input, @@ -636,6 +666,9 @@ export async function createVideoGenerationTask( token: string, input: { audio?: boolean; + audioUrl?: string | string[]; + audio_url?: string | string[]; + content?: Array>; model: string; prompt: string; aspect_ratio?: string; @@ -643,12 +676,24 @@ export async function createVideoGenerationTask( duration?: number; duration_seconds?: number; height?: number; + image?: string | string[]; + imageUrl?: string | string[]; + image_url?: string | string[]; + imageUrls?: string[]; + image_urls?: string[]; n?: number; output_audio?: boolean; + referenceAudio?: string | string[]; + referenceVideo?: string | string[]; + reference_audio?: string | string[]; + reference_image?: string | string[]; + reference_video?: string | string[]; resolution?: string; runMode?: string; simulation?: boolean; size?: string; + videoUrl?: string | string[]; + video_url?: string | string[]; width?: number; }, ): Promise<{ task: GatewayTask; next: Record }> { @@ -660,6 +705,41 @@ export async function createVideoGenerationTask( }); } +export interface GatewayFileUploadResponse extends Record { + fileUrl?: string; + file_url?: string; + url?: string; +} + +export async function uploadFileToStorage( + token: string, + file: File, + source = 'ai-gateway-playground', +): Promise { + const form = new FormData(); + form.append('file', file); + form.append('source', source); + + const response = await fetch(`${API_BASE}/v1/files/upload`, { + body: form, + headers: { + Authorization: `Bearer ${token}`, + }, + method: 'POST', + }); + const body = await response.text(); + if (!response.ok) { + throw new GatewayApiError(parseErrorDetails(body, response.status, `Request failed: ${response.status}`)); + } + if (!body) return {}; + try { + const parsed = JSON.parse(body) as unknown; + return recordFromUnknown(parsed) ? (parsed as GatewayFileUploadResponse) : {}; + } catch { + return { url: body }; + } +} + export async function estimatePricing( token: string, input: Record, @@ -758,6 +838,55 @@ export async function getNetworkProxyConfig(token: string): Promise('/api/admin/config/network-proxy', { token }); } +export async function listFileStorageChannels(token: string): Promise> { + return request>('/api/admin/system/file-storage/channels', { token }); +} + +export async function getFileStorageSettings(token: string): Promise { + return request('/api/admin/system/file-storage/settings', { token }); +} + +export async function updateFileStorageSettings( + token: string, + input: FileStorageSettingsUpdateRequest, +): Promise { + return request('/api/admin/system/file-storage/settings', { + body: input, + method: 'PATCH', + token, + }); +} + +export async function createFileStorageChannel( + token: string, + input: FileStorageChannelUpsertRequest, +): Promise { + return request('/api/admin/system/file-storage/channels', { + body: input, + method: 'POST', + token, + }); +} + +export async function updateFileStorageChannel( + token: string, + channelId: string, + input: FileStorageChannelUpsertRequest, +): Promise { + return request(`/api/admin/system/file-storage/channels/${channelId}`, { + body: input, + method: 'PATCH', + token, + }); +} + +export async function deleteFileStorageChannel(token: string, channelId: string): Promise { + await request(`/api/admin/system/file-storage/channels/${channelId}`, { + method: 'DELETE', + token, + }); +} + async function request( path: string, options: { token?: string; auth?: boolean; method?: string; body?: unknown; headers?: Record } = {}, diff --git a/apps/web/src/app-state.ts b/apps/web/src/app-state.ts index 83a567f..6b5e8bb 100644 --- a/apps/web/src/app-state.ts +++ b/apps/web/src/app-state.ts @@ -1,6 +1,8 @@ import type { BaseModelCatalogItem, CatalogProvider, + FileStorageChannel, + FileStorageSettings, GatewayAccessRule, GatewayApiKey, GatewayAuditLog, @@ -27,6 +29,8 @@ export interface ConsoleData { auditLogs: GatewayAuditLog[]; apiKeys: GatewayApiKey[]; baseModels: BaseModelCatalogItem[]; + fileStorageChannels: FileStorageChannel[]; + fileStorageSettings: FileStorageSettings | null; modelCatalog: ModelCatalogResponse; models: PlatformModel[]; networkProxyConfig: GatewayNetworkProxyConfig | null; diff --git a/apps/web/src/pages/AdminPage.tsx b/apps/web/src/pages/AdminPage.tsx index 0866033..3df5d1b 100644 --- a/apps/web/src/pages/AdminPage.tsx +++ b/apps/web/src/pages/AdminPage.tsx @@ -1,8 +1,10 @@ import type { ReactNode } from 'react'; -import { Boxes, Building2, Gauge, History, KeyRound, Route, ServerCog, ShieldCheck, UsersRound, Workflow } from 'lucide-react'; +import { Boxes, Building2, Gauge, History, KeyRound, Route, ServerCog, Settings, ShieldCheck, UsersRound, Workflow } from 'lucide-react'; import type { BaseModelUpsertRequest, CatalogProviderUpsertRequest, + FileStorageChannelUpsertRequest, + FileStorageSettingsUpdateRequest, GatewayAccessRuleBatchRequest, GatewayAccessRuleUpsertRequest, GatewayTenantUpsertRequest, @@ -29,6 +31,7 @@ import { PricingRulesPanel } from './admin/PricingRulesPanel'; import { ProviderManagementPanel } from './admin/ProviderManagementPanel'; import { RealtimeLoadPanel } from './admin/RealtimeLoadPanel'; import { RuntimePoliciesPanel } from './admin/RuntimePoliciesPanel'; +import { SystemSettingsPanel } from './admin/SystemSettingsPanel'; const tabs = [ { value: 'overview', label: '总览', icon: }, @@ -42,6 +45,7 @@ const tabs = [ { value: 'users', label: '用户', icon: }, { value: 'userGroups', label: '用户组', icon: }, { value: 'accessRules', label: '模型权限', icon: }, + { value: 'systemSettings', label: '系统设置', icon: }, { value: 'auditLogs', label: '审计日志', icon: }, ] satisfies Array<{ value: AdminSection; label: string; icon: ReactNode }>; @@ -57,6 +61,7 @@ export function AdminPage(props: { onDeletePricingRuleSet: (ruleSetId: string) => Promise; onDeleteRuntimePolicySet: (policySetId: string) => Promise; onDeleteAccessRule: (ruleId: string) => Promise; + onDeleteFileStorageChannel: (channelId: string) => Promise; onDeleteTenant: (tenantId: string) => Promise; onDeleteUser: (userId: string) => Promise; onDeleteUserGroup: (groupId: string) => Promise; @@ -72,6 +77,8 @@ export function AdminPage(props: { onSaveRunnerPolicy: (input: GatewayRunnerPolicyUpsertRequest) => Promise; onSaveRuntimePolicySet: (input: RuntimePolicySetUpsertRequest, policySetId?: string) => Promise; onSaveAccessRule: (input: GatewayAccessRuleUpsertRequest, ruleId?: string) => Promise; + onSaveFileStorageChannel: (input: FileStorageChannelUpsertRequest, channelId?: string) => Promise; + onSaveFileStorageSettings: (input: FileStorageSettingsUpdateRequest) => Promise; onSaveTenant: (input: GatewayTenantUpsertRequest, tenantId?: string) => Promise; onSaveUser: (input: GatewayUserUpsertRequest, userId?: string) => Promise; onSetUserWalletBalance: (userId: string, input: WalletBalanceAdjustmentRequest) => Promise; @@ -172,6 +179,17 @@ export function AdminPage(props: { {props.section === 'users' && } {props.section === 'userGroups' && } {props.section === 'auditLogs' && } + {props.section === 'systemSettings' && ( + + )} diff --git a/apps/web/src/pages/ApiDocsPage.tsx b/apps/web/src/pages/ApiDocsPage.tsx index 9fb423b..ed99f39 100644 --- a/apps/web/src/pages/ApiDocsPage.tsx +++ b/apps/web/src/pages/ApiDocsPage.tsx @@ -34,6 +34,7 @@ export function ApiDocsPage(props: { onTaskFormChange: (value: TaskForm) => void; }) { const current = docs.find((item) => item.key === props.activeDocSection) ?? docs[0]; + const isFileDoc = current.key === 'files'; const bodyExample = useMemo(() => requestBodyExample(props.taskForm), [props.taskForm]); function handleSubmit(event: FormEvent) { @@ -87,7 +88,7 @@ export function ApiDocsPage(props: {

Header 参数

- + @@ -97,10 +98,19 @@ export function ApiDocsPage(props: {

Body 参数

application/json - - - - + {isFileDoc ? ( + <> + + + + ) : ( + <> + + + + + + )} diff --git a/apps/web/src/pages/PlaygroundPage.tsx b/apps/web/src/pages/PlaygroundPage.tsx index 7fd456a..0ab04b0 100644 --- a/apps/web/src/pages/PlaygroundPage.tsx +++ b/apps/web/src/pages/PlaygroundPage.tsx @@ -19,9 +19,9 @@ import { code } from '@streamdown/code'; import { math } from '@streamdown/math'; import { mermaid } from '@streamdown/mermaid'; import type { GatewayApiKey, GatewayTask, PlatformModel } from '@easyai-ai-gateway/contracts'; -import { Bot, ChevronDown, Image as ImageIcon, MessageSquarePlus, Paperclip, Send, Sparkles, Video } from 'lucide-react'; +import { Bot, ChevronDown, FileText, Image as ImageIcon, LoaderCircle, MessageSquarePlus, Music2, Paperclip, Send, Sparkles, Video, X } from 'lucide-react'; import { Badge, Button, Select, Textarea } from '../components/ui'; -import { GatewayApiError, createImageGenerationTask, createVideoGenerationTask, pollTaskUntilSettled, streamChatCompletionText, taskIsPending } from '../api'; +import { GatewayApiError, createImageEditTask, createImageGenerationTask, createVideoGenerationTask, pollTaskUntilSettled, streamChatCompletionText, taskIsPending, uploadFileToStorage } from '../api'; import type { PlaygroundMode } from '../types'; import { defaultMediaGenerationSettings, @@ -58,6 +58,18 @@ interface ModelOption { value: string; } +type PlaygroundUploadKind = 'audio' | 'file' | 'image' | 'video'; + +interface PlaygroundUpload { + contentType: string; + id: string; + kind: PlaygroundUploadKind; + name: string; + raw: Record; + size: number; + url: string; +} + const modeOptions: Array<{ description: string; icon: ReactNode; label: string; value: PlaygroundMode }> = [ { value: 'chat', label: '大模型对话', description: '对话、推理、结构化输出', icon: }, { value: 'image', label: '图像生成', description: '文生图、图像编辑参数预览', icon: }, @@ -82,6 +94,35 @@ const quickPrompts: Record = { video: ['5 秒运镜', '首帧转视频', '宣传短片'], }; +const mediaUploadAccept = 'image/*,video/*,audio/*'; +const chatUploadAccept = [ + mediaUploadAccept, + '.csv', + '.doc', + '.docx', + '.json', + '.jsonl', + '.md', + '.markdown', + '.pdf', + '.ppt', + '.pptx', + '.txt', + '.xls', + '.xlsx', + '.yaml', + '.yml', + 'application/json', + 'application/msword', + 'application/pdf', + 'application/vnd.ms-excel', + 'application/vnd.ms-powerpoint', + 'application/vnd.openxmlformats-officedocument.presentationml.presentation', + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', + 'text/*', +].join(','); + const publicWorks = [ { title: '雨夜霓虹街区', type: '图像生成', image: 'https://picsum.photos/seed/easyai-neon-city/720/960' }, { title: '玻璃温室晨光', type: '图像生成', image: 'https://picsum.photos/seed/easyai-glasshouse/720/540' }, @@ -119,13 +160,17 @@ export function PlaygroundPage(props: { const [mediaSettings, setMediaSettings] = useState(defaultMediaGenerationSettings); const [mediaRuns, setMediaRuns] = useState(readStoredMediaRuns); const [mediaMessage, setMediaMessage] = useState(''); + const [mediaUploadMessage, setMediaUploadMessage] = useState(''); + const [mediaUploads, setMediaUploads] = useState([]); + const [mediaUploading, setMediaUploading] = useState(false); const isMountedRef = useRef(false); const pendingMediaModelRef = useRef(''); const resumedTaskIdsRef = useRef(new Set()); const activeMode = useMemo(() => modeOptions.find((item) => item.value === props.mode) ?? modeOptions[0], [props.mode]); + const effectiveImageHasReference = imageHasReference || (props.mode === 'image' && mediaUploads.some((item) => item.kind === 'image')); const modelOptions = useMemo( - () => buildModelOptions(filterModelsForMode(props.models, props.mode, imageHasReference, videoMode)), - [imageHasReference, props.mode, props.models, videoMode], + () => buildModelOptions(filterModelsForMode(props.models, props.mode, effectiveImageHasReference, videoMode)), + [effectiveImageHasReference, props.mode, props.models, videoMode], ); const activeApiKeyId = resolveSelectedApiKeyId(props.apiKeys, props.apiKeySecretsById, props.selectedApiKeyId); const activeApiKeySecret = activeApiKeyId ? props.apiKeySecretsById[activeApiKeyId] ?? '' : ''; @@ -170,6 +215,38 @@ export function PlaygroundPage(props: { writeStoredMediaRuns(mediaRuns); }, [mediaRuns]); + async function uploadMediaFiles(files: File[]) { + if (!files.length) return; + const credential = activeApiKeySecret || props.token; + if (!props.token) { + props.onLogin(); + return; + } + if (!credential) { + setMediaUploadMessage('请选择可用于测试的 API Key 后再上传。'); + return; + } + setMediaUploading(true); + setMediaUploadMessage(''); + try { + const { items, warnings } = await uploadPlaygroundFiles(credential, files, { + allowFiles: false, + source: `ai-gateway-playground-${props.mode}`, + }); + if (items.length) { + setMediaUploads((current) => [...current, ...items]); + if (props.mode === 'image' && items.some((item) => item.kind === 'image')) { + setImageHasReference(true); + } + } + setMediaUploadMessage(warnings[0] ?? (items.length ? `已上传 ${items.length} 个参考素材。` : '')); + } catch (err) { + setMediaUploadMessage(err instanceof Error ? err.message : '文件上传失败'); + } finally { + setMediaUploading(false); + } + } + useEffect(() => { const credential = activeApiKeySecret || props.token; if (!credential) return; @@ -227,6 +304,7 @@ export function PlaygroundPage(props: { } const localId = newLocalId(); + const runUploads = overrides ? [] : mediaUploads; const modelLabel = modelOptions.find((item) => item.value === runModel)?.label ?? runModel; const run: MediaGenerationRun = { createdAt: new Date().toISOString(), @@ -242,15 +320,24 @@ export function PlaygroundPage(props: { setMediaRuns((current) => [...current, run]); setMediaMessage(''); try { + const uploadPayload = mediaUploadRequestPayload(runUploads, runMode); const requestPayload = { model: runModel, - prompt: trimmedPrompt, + prompt: promptWithUploadSummary(trimmedPrompt, runUploads), ...mediaRequestPayload(runSettings, runMode), + ...uploadPayload, }; const response = runMode === 'video' ? await createVideoGenerationTask(credential, requestPayload) - : await createImageGenerationTask(credential, requestPayload); + : runUploads.some((item) => item.kind === 'image') + ? await createImageEditTask(credential, requestPayload) + : await createImageGenerationTask(credential, requestPayload); setMediaRuns((current) => updateMediaRun(current, localId, { status: response.task.status, task: response.task })); + if (!overrides) { + setMediaUploads([]); + setMediaUploadMessage(''); + setImageHasReference(false); + } void pollMediaRunUntilSettled(credential, localId, response.task); } catch (err) { const errorMessage = err instanceof Error ? err.message : '生成任务提交失败'; @@ -325,9 +412,13 @@ export function PlaygroundPage(props: { prompt={prompt} selectedApiKeyId={activeApiKeyId} selectedModel={selectedModel} - imageHasReference={imageHasReference} + imageHasReference={effectiveImageHasReference} mediaSettings={mediaSettings} mediaCapabilities={mediaCapabilities} + uploadAccept={mediaUploadAccept} + uploadMessage={mediaUploadMessage} + uploads={mediaUploads} + uploading={mediaUploading} videoMode={videoMode} onApiKeyChange={props.onApiKeyChange} onCreateApiKey={props.onCreateApiKey} @@ -336,7 +427,15 @@ export function PlaygroundPage(props: { onModeChange={props.onModeChange} onModelChange={setSelectedModel} onPromptChange={setPrompt} + onRemoveUpload={(id) => setMediaUploads((current) => { + const next = current.filter((item) => item.id !== id); + if (!next.some((item) => item.kind === 'image')) { + setImageHasReference(false); + } + return next; + })} onSubmit={() => void submitMediaTask()} + onUploadFiles={(files) => void uploadMediaFiles(files)} onVideoModeChange={setVideoMode} /> ); @@ -491,6 +590,42 @@ function AssistantChatPlayground(props: { const canRun = Boolean(props.token && props.selectedModel && activeApiKeySecret); const apiKeyNotice = apiKeyNoticeText(props.apiKeys, props.apiKeySecretsById); const initialMessages = useMemo(() => readStoredChatMessages(), []); + const [chatUploadMessage, setChatUploadMessage] = useState(''); + const [chatUploads, setChatUploads] = useState([]); + const [chatUploading, setChatUploading] = useState(false); + const chatUploadsRef = useRef(chatUploads); + useEffect(() => { + chatUploadsRef.current = chatUploads; + }, [chatUploads]); + + async function uploadChatFiles(files: File[]) { + if (!files.length) return; + if (!props.token) { + props.onLogin(); + return; + } + if (!activeApiKeySecret) { + setChatUploadMessage('请选择可用于测试的 API Key 后再上传。'); + return; + } + setChatUploading(true); + setChatUploadMessage(''); + try { + const { items, warnings } = await uploadPlaygroundFiles(activeApiKeySecret, files, { + allowFiles: true, + source: 'ai-gateway-playground-chat', + }); + if (items.length) { + setChatUploads((current) => [...current, ...items]); + } + setChatUploadMessage(warnings[0] ?? (items.length ? `已上传 ${items.length} 个附件。` : '')); + } catch (err) { + setChatUploadMessage(err instanceof Error ? err.message : '文件上传失败'); + } finally { + setChatUploading(false); + } + } + const adapter = useMemo(() => ({ async *run({ abortSignal, messages }) { if (!props.token) { @@ -503,11 +638,17 @@ function AssistantChatPlayground(props: { if (!props.selectedModel) { throw new GatewayApiError('当前没有可用的大模型,请确认用户组权限或平台模型配置。'); } + const requestUploads = chatUploadsRef.current; + if (requestUploads.length) { + chatUploadsRef.current = []; + setChatUploads([]); + setChatUploadMessage(''); + } let text = ''; for await (const delta of streamChatCompletionText( activeApiKeySecret, { - messages: toGatewayChatMessages(messages), + messages: toGatewayChatMessages(messages, requestUploads), model: props.selectedModel, }, abortSignal, @@ -541,10 +682,16 @@ function AssistantChatPlayground(props: { selectedModel={props.selectedModel} token={props.token} activeApiKeySecret={activeApiKeySecret} + uploadAccept={chatUploadAccept} + uploadMessage={chatUploadMessage} + uploads={chatUploads} + uploading={chatUploading} onApiKeyChange={props.onApiKeyChange} onCreateApiKey={props.onCreateApiKey} onModeChange={props.onModeChange} onModelChange={props.onModelChange} + onRemoveUpload={(id) => setChatUploads((current) => current.filter((item) => item.id !== id))} + onUploadFiles={(files) => void uploadChatFiles(files)} /> @@ -573,10 +720,16 @@ function AssistantChatPlayground(props: { placeholder={assistantPlaceholder(props.token, props.selectedModel, activeApiKeySecret)} selectedApiKeyId={activeApiKeyId} selectedModel={props.selectedModel} + uploadAccept={chatUploadAccept} + uploadMessage={chatUploadMessage} + uploads={chatUploads} + uploading={chatUploading} onApiKeyChange={props.onApiKeyChange} onCreateApiKey={props.onCreateApiKey} onModeChange={props.onModeChange} onModelChange={props.onModelChange} + onRemoveUpload={(id) => setChatUploads((current) => current.filter((item) => item.id !== id))} + onUploadFiles={(files) => void uploadChatFiles(files)} /> @@ -645,10 +798,16 @@ function AssistantEmptyState(props: { selectedApiKeyId: string; selectedModel: string; token: string; + uploadAccept: string; + uploadMessage: string; + uploads: PlaygroundUpload[]; + uploading: boolean; onApiKeyChange: (apiKeyId: string) => void; onCreateApiKey: () => void; onModeChange: (mode: PlaygroundMode) => void; onModelChange: (value: string) => void; + onRemoveUpload: (id: string) => void; + onUploadFiles: (files: File[]) => void; }) { const activeMode = modeOptions.find((item) => item.value === 'chat') ?? modeOptions[0]; const placeholder = props.canRun ? placeholderByMode.chat : assistantPlaceholder(props.token, props.selectedModel, props.activeApiKeySecret); @@ -666,10 +825,16 @@ function AssistantEmptyState(props: { placeholder={placeholder} selectedApiKeyId={props.selectedApiKeyId} selectedModel={props.selectedModel} + uploadAccept={props.uploadAccept} + uploadMessage={props.uploadMessage} + uploads={props.uploads} + uploading={props.uploading} onApiKeyChange={props.onApiKeyChange} onCreateApiKey={props.onCreateApiKey} onModeChange={props.onModeChange} onModelChange={props.onModelChange} + onRemoveUpload={props.onRemoveUpload} + onUploadFiles={props.onUploadFiles} /> ); @@ -685,24 +850,41 @@ function AssistantChatComposer(props: { placeholder: string; selectedApiKeyId: string; selectedModel: string; + uploadAccept?: string; + uploadMessage?: string; + uploads?: PlaygroundUpload[]; + uploading?: boolean; onApiKeyChange: (apiKeyId: string) => void; onCreateApiKey: () => void; onModeChange: (mode: PlaygroundMode) => void; onModelChange: (value: string) => void; + onRemoveUpload?: (id: string) => void; + onUploadFiles?: (files: File[]) => void; }) { const className = ['playgroundComposer', 'assistantChatComposer', props.docked ? 'assistantDockComposer' : 'assistantEmptyComposer'].join(' '); return (
- - +
+ + +