package runner import ( "bytes" "context" "crypto/rand" "encoding/base64" "encoding/hex" "encoding/json" "fmt" "io" "mime/multipart" "net/http" "net/textproto" "net/url" "strings" "time" "github.com/easyai/easyai-ai-gateway/apps/api/internal/clients" "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" ) const defaultServerMainOpenAPIUploadURL = "http://127.0.0.1:3001/v1/files/upload" const maxGeneratedAssetFetchBytes = 256 << 20 type FileUploadPayload struct { ContentType string FileName string Scene string Source string Bytes []byte } type generatedAssetUploadPolicy struct { UploadInlineMedia bool UploadURLMedia bool } type generatedAssetDecision struct { Inline *generatedInlineAsset URL *generatedURLAsset StripKeys []string } type generatedInlineAsset struct { Bytes []byte ContentType string Kind string SourceKey string } type generatedURLAsset struct { URL string ContentType string Kind string SourceKey string } func defaultGeneratedAssetUploadPolicy() generatedAssetUploadPolicy { return generatedAssetUploadPolicy{ UploadInlineMedia: true, UploadURLMedia: false, } } func (s *Service) uploadGeneratedAssets(ctx context.Context, taskID string, taskKind string, result map[string]any) (map[string]any, error) { data, _ := result["data"].([]any) if len(data) == 0 { return result, nil } policy, err := s.generatedAssetUploadPolicy(ctx) if err != nil { return nil, &clients.ClientError{Code: "upload_config_failed", Message: err.Error(), Retryable: true} } decisions := make([]generatedAssetDecision, len(data)) needsUpload := false changed := false for index, rawItem := range data { item, _ := rawItem.(map[string]any) if item == nil { continue } decision, err := generatedAssetDecisionForItem(taskKind, item, policy) if err != nil { return nil, err } decisions[index] = decision if decision.Inline != nil || decision.URL != nil { needsUpload = true } if len(decision.StripKeys) > 0 { changed = true } } if !needsUpload && !changed { return result, nil } var channels []store.FileStorageChannel if needsUpload { channels, err = s.activeFileStorageChannels(ctx, store.FileStorageSceneImageResult) if err != nil { return nil, &clients.ClientError{Code: "upload_config_failed", Message: err.Error(), Retryable: true} } if len(channels) == 0 { return nil, &clients.ClientError{Code: "upload_no_channel", Message: "no enabled file storage channel for generated media results", Retryable: false} } } next := map[string]any{} for key, value := range result { next[key] = value } nextData := make([]any, 0, len(data)) for index, rawItem := range data { item, _ := rawItem.(map[string]any) if item == nil { nextData = append(nextData, rawItem) continue } decision := decisions[index] merged := map[string]any{} for key, value := range item { merged[key] = value } for _, key := range decision.StripKeys { delete(merged, key) } if decision.Inline != nil || decision.URL != nil { var upload map[string]any var sourceKey string var strategy string var kind string var contentType string var err error if decision.Inline != nil { upload, contentType, kind, err = s.uploadGeneratedAsset(ctx, taskID, decision.Inline, index, channels) sourceKey = decision.Inline.SourceKey strategy = "upload_inline_media" } else { upload, contentType, kind, err = s.uploadGeneratedURLAsset(ctx, taskID, decision.URL, index, channels) sourceKey = decision.URL.SourceKey strategy = "upload_url_media" } if err != nil { return nil, err } merged["upload"] = upload merged["assetStorage"] = map[string]any{ "scene": store.FileStorageSceneImageResult, "source": sourceKey, "strategy": strategy, } if contentType != "" { merged["assetStorage"].(map[string]any)["contentType"] = contentType } if urlValue := stringFromAny(upload["url"]); urlValue != "" { merged["url"] = urlValue if kind == "video" { merged["video_url"] = urlValue } if kind == "image" { merged["image_url"] = urlValue } } if kind != "" && stringFromAny(merged["type"]) == "" { merged["type"] = kind } if contentType != "" && stringFromAny(merged["mime_type"]) == "" { merged["mime_type"] = contentType } } nextData = append(nextData, merged) } next["data"] = nextData return next, nil } func (s *Service) generatedAssetUploadPolicy(ctx context.Context) (generatedAssetUploadPolicy, error) { settings, err := s.store.GetFileStorageSettings(ctx) if err != nil { if store.IsUndefinedDatabaseObject(err) { return defaultGeneratedAssetUploadPolicy(), nil } return generatedAssetUploadPolicy{}, err } return generatedAssetUploadPolicyFromName(settings.ResultUploadPolicy), nil } func generatedAssetUploadPolicyFromName(policyName string) generatedAssetUploadPolicy { policyName = store.NormalizeFileStorageResultUploadPolicy(policyName) switch policyName { case store.FileStorageResultUploadPolicyUploadAll: return generatedAssetUploadPolicy{UploadInlineMedia: true, UploadURLMedia: true} case store.FileStorageResultUploadPolicyUploadNone: return generatedAssetUploadPolicy{UploadInlineMedia: false, UploadURLMedia: false} default: return defaultGeneratedAssetUploadPolicy() } } func (s *Service) uploadGeneratedAsset(ctx context.Context, taskID string, asset *generatedInlineAsset, index int, channels []store.FileStorageChannel) (map[string]any, string, string, error) { contentType := resolvedGeneratedAssetContentType(asset.ContentType, asset.Kind, asset.Bytes) kind := generatedAssetKindFromContentType(asset.Kind, contentType) upload, err := s.uploadFileWithFailover(ctx, FileUploadPayload{ Bytes: asset.Bytes, ContentType: contentType, FileName: generatedAssetFileName(taskID, index, contentType, kind), Scene: store.FileStorageSceneImageResult, Source: "ai-gateway", }, channels) return upload, contentType, kind, err } func (s *Service) uploadGeneratedURLAsset(ctx context.Context, taskID string, asset *generatedURLAsset, index int, channels []store.FileStorageChannel) (map[string]any, string, string, error) { payload, contentType, err := s.readGeneratedURLAsset(ctx, asset) if err != nil { return nil, "", "", err } contentType = resolvedGeneratedAssetContentType(firstNonEmptyString(contentType, asset.ContentType), asset.Kind, payload) kind := generatedAssetKindFromContentType(asset.Kind, contentType) upload, err := s.uploadFileWithFailover(ctx, FileUploadPayload{ Bytes: payload, ContentType: contentType, FileName: generatedAssetFileName(taskID, index, contentType, kind), Scene: store.FileStorageSceneImageResult, Source: "ai-gateway", }, channels) return upload, contentType, kind, err } func (s *Service) readGeneratedURLAsset(ctx context.Context, asset *generatedURLAsset) ([]byte, string, error) { fetchURL, err := s.generatedAssetFetchURL(asset.URL) if err != nil { return nil, "", err } req, err := http.NewRequestWithContext(ctx, http.MethodGet, fetchURL, nil) if err != nil { return nil, "", err } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, "", &clients.ClientError{Code: "upload_source_fetch_failed", Message: err.Error(), Retryable: true} } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) message := strings.TrimSpace(string(body)) if message == "" { message = "generated media source fetch failed" } return nil, "", &clients.ClientError{ Code: "upload_source_fetch_failed", Message: message, StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode), } } payload, err := io.ReadAll(io.LimitReader(resp.Body, maxGeneratedAssetFetchBytes+1)) if err != nil { return nil, "", &clients.ClientError{Code: "upload_source_read_failed", Message: err.Error(), StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode)} } if len(payload) > maxGeneratedAssetFetchBytes { return nil, "", &clients.ClientError{Code: "upload_source_too_large", Message: "generated media source exceeds upload fetch limit", StatusCode: resp.StatusCode, Retryable: false} } contentType := firstNonEmptyString(resp.Header.Get("Content-Type"), asset.ContentType) return payload, strings.TrimSpace(strings.Split(contentType, ";")[0]), nil } func (s *Service) generatedAssetFetchURL(raw string) (string, error) { value := strings.TrimSpace(raw) if value == "" { return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: "generated media source URL is empty", Retryable: false} } parsed, err := url.Parse(value) if err != nil { return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: err.Error(), Retryable: false} } if parsed.IsAbs() { if parsed.Scheme == "http" || parsed.Scheme == "https" { return value, nil } return "", &clients.ClientError{Code: "upload_source_unsupported_url", Message: "unsupported generated media source URL scheme: " + parsed.Scheme, Retryable: false} } if strings.HasPrefix(value, "/") { baseURL := generatedAssetLocalBaseURL(s.cfg.HTTPAddr) if baseURL == "" { return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: "generated media source uses a relative URL without a local HTTP address", Retryable: false} } return baseURL + value, nil } return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: "generated media source URL must be absolute or root-relative", Retryable: false} } func generatedAssetLocalBaseURL(httpAddr string) string { addr := strings.TrimSpace(httpAddr) if addr == "" { return "http://127.0.0.1:8088" } if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") { return strings.TrimRight(addr, "/") } if strings.HasPrefix(addr, ":") { return "http://127.0.0.1" + addr } if strings.Contains(addr, "://") { return "" } return "http://" + strings.TrimRight(addr, "/") } func (s *Service) UploadFile(ctx context.Context, payload FileUploadPayload) (map[string]any, error) { if strings.TrimSpace(payload.Scene) == "" { payload.Scene = store.FileStorageSceneUpload } channels, err := s.activeFileStorageChannels(ctx, payload.Scene) if err != nil { return nil, &clients.ClientError{Code: "upload_config_failed", Message: err.Error(), Retryable: true} } if len(channels) == 0 { return nil, &clients.ClientError{Code: "upload_no_channel", Message: "no enabled file storage channel", Retryable: false} } return s.uploadFileWithFailover(ctx, payload, channels) } func (s *Service) activeFileStorageChannels(ctx context.Context, scene string) ([]store.FileStorageChannel, error) { channels, err := s.store.ListEnabledFileStorageChannelsForScene(ctx, scene) if err != nil && !store.IsUndefinedDatabaseObject(err) { return nil, err } if len(channels) > 0 { return channels, nil } fallback := s.fallbackFileStorageChannel() if fallback == nil { return nil, nil } if !fileStorageChannelSupportsScene(*fallback, scene) { return nil, nil } return []store.FileStorageChannel{*fallback}, nil } func (s *Service) fallbackFileStorageChannel() *store.FileStorageChannel { baseURL := strings.TrimRight(strings.TrimSpace(s.cfg.ServerMainBaseURL), "/") apiKey := strings.TrimSpace(s.cfg.ServerMainInternalToken) if baseURL == "" || apiKey == "" { return nil } return &store.FileStorageChannel{ ChannelKey: "server-main-env-fallback", Name: "server-main env fallback", Provider: "server_main_openapi", UploadURL: baseURL + "/v1/files/upload", APIKey: apiKey, Scenes: []string{store.FileStorageSceneUpload, store.FileStorageSceneImageResult}, RetryPolicy: defaultUploadRetryPolicy(), Priority: 100, Status: "enabled", } } func (s *Service) uploadFileWithFailover(ctx context.Context, payload FileUploadPayload, channels []store.FileStorageChannel) (map[string]any, error) { var lastErr error for _, channel := range channels { upload, err := s.uploadWithChannelRetries(ctx, payload, channel) if err == nil { _ = s.store.MarkFileStorageChannelSuccess(context.WithoutCancel(ctx), channel.ID) return upload, nil } lastErr = err _ = s.store.MarkFileStorageChannelFailure(context.WithoutCancel(ctx), channel.ID, err.Error()) } if lastErr != nil { return nil, lastErr } return nil, &clients.ClientError{Code: "upload_no_channel", Message: "no enabled file storage channel", Retryable: false} } func fileStorageChannelSupportsScene(channel store.FileStorageChannel, scene string) bool { scene = strings.TrimSpace(scene) if scene == "" { return true } scenes := channel.Scenes if len(scenes) == 0 { scenes = []string{store.FileStorageSceneUpload, store.FileStorageSceneImageResult} } for _, item := range scenes { if strings.TrimSpace(item) == scene { return true } } return false } func (s *Service) uploadWithChannelRetries(ctx context.Context, payload FileUploadPayload, channel store.FileStorageChannel) (map[string]any, error) { maxRetries, delays := uploadRetrySchedule(channel.RetryPolicy) var lastErr error for attempt := 0; attempt <= maxRetries; attempt++ { upload, err := s.uploadOnce(ctx, payload, channel) if err == nil { return upload, nil } lastErr = err if attempt >= maxRetries || !clients.IsRetryable(err) { break } delay := retryDelayForAttempt(attempt, delays) if err := sleepWithContext(ctx, delay); err != nil { return nil, err } } return nil, lastErr } func (s *Service) uploadOnce(ctx context.Context, payload FileUploadPayload, channel store.FileStorageChannel) (map[string]any, error) { if strings.ToLower(strings.TrimSpace(channel.Provider)) != "server_main_openapi" { return nil, &clients.ClientError{Code: "upload_unsupported_channel", Message: "unsupported file storage channel: " + channel.Provider, Retryable: false} } uploadURL := strings.TrimSpace(channel.UploadURL) if uploadURL == "" { uploadURL = defaultServerMainOpenAPIUploadURL } apiKey := strings.TrimSpace(channel.APIKey) if apiKey == "" { return nil, &clients.ClientError{Code: "missing_credentials", Message: "file storage channel API key is required", Retryable: false} } var body bytes.Buffer writer := multipart.NewWriter(&body) fileWriter, err := createUploadFormFile(writer, "file", firstNonEmptyString(payload.FileName, "upload.bin"), payload.ContentType) if err != nil { return nil, err } if _, err := fileWriter.Write(payload.Bytes); err != nil { return nil, err } _ = writer.WriteField("source", firstNonEmptyString(payload.Source, "ai-gateway")) _ = writer.WriteField("scene", firstNonEmptyString(payload.Scene, store.FileStorageSceneUpload)) if err := writer.Close(); err != nil { return nil, err } req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadURL, &body) if err != nil { return nil, err } req.Header.Set("Content-Type", writer.FormDataContentType()) if payload.ContentType != "" { req.Header.Set("X-Upload-Content-Type", payload.ContentType) } req.Header.Set("Authorization", "Bearer "+apiKey) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, &clients.ClientError{Code: "upload_network", Message: err.Error(), Retryable: true} } defer resp.Body.Close() responseBody, readErr := io.ReadAll(resp.Body) if readErr != nil { return nil, &clients.ClientError{Code: "upload_read_failed", Message: readErr.Error(), StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode)} } if resp.StatusCode < 200 || resp.StatusCode >= 300 { message := strings.TrimSpace(string(responseBody)) if message == "" { message = "file upload failed" } return nil, &clients.ClientError{Code: "upload_failed", Message: message, StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode)} } var decoded map[string]any if err := json.Unmarshal(responseBody, &decoded); err != nil { return nil, &clients.ClientError{Code: "upload_invalid_response", Message: err.Error(), Retryable: false} } return normalizeUploadResponse(decoded, channel), nil } func createUploadFormFile(writer *multipart.Writer, fieldName string, fileName string, contentType string) (io.Writer, error) { header := make(textproto.MIMEHeader) header.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"; filename="%s"`, escapeMultipartValue(fieldName), escapeMultipartValue(fileName))) if strings.TrimSpace(contentType) != "" { header.Set("Content-Type", strings.TrimSpace(contentType)) } return writer.CreatePart(header) } func escapeMultipartValue(value string) string { return strings.NewReplacer("\\", "\\\\", `"`, "\\\"").Replace(value) } func stripDataURLPrefix(value string) string { if index := strings.Index(value, ","); strings.HasPrefix(value, "data:") && index >= 0 { return value[index+1:] } return value } func generatedAssetDecisionForItem(taskKind string, item map[string]any, policy generatedAssetUploadPolicy) (generatedAssetDecision, error) { decision := generatedAssetDecision{} urlKey, mediaURL := mediaURLSourceFromItem(item) if mediaURL != "" { if !policy.UploadURLMedia { decision.StripKeys = inlineMediaKeys(item) return decision, nil } contentType := mediaContentTypeFromItem(item) kind := mediaKindForAsset(taskKind, item, urlKey, contentType) decision.URL = &generatedURLAsset{ URL: mediaURL, ContentType: contentType, Kind: kind, SourceKey: urlKey, } decision.StripKeys = uniqueStringList(append(mediaURLKeys(item), inlineMediaKeys(item)...)) return decision, nil } if !policy.UploadInlineMedia { return decision, nil } asset, keys, err := inlineAssetFromItem(taskKind, item) if err != nil { return decision, err } if asset == nil { return decision, nil } decision.Inline = asset decision.StripKeys = keys return decision, nil } func inlineAssetFromItem(taskKind string, item map[string]any) (*generatedInlineAsset, []string, error) { for _, key := range inlineMediaCandidateKeys() { value, ok := item[key] if !ok || value == nil { continue } strictBase64 := inlineMediaKeyIsStrictBase64(key) payload, contentType, ok, err := inlineMediaPayload(value, strictBase64) if err != nil { return nil, nil, err } if !ok { continue } contentType = firstNonEmptyString(contentType, mediaContentTypeFromItem(item), defaultContentTypeForGeneratedAsset(mediaKindForAsset(taskKind, item, key, contentType))) kind := mediaKindForAsset(taskKind, item, key, contentType) return &generatedInlineAsset{ Bytes: payload, ContentType: contentType, Kind: kind, SourceKey: key, }, inlineMediaKeys(item), nil } return nil, nil, nil } func inlineMediaPayload(value any, strictBase64 bool) ([]byte, string, bool, error) { switch typed := value.(type) { case []byte: if len(typed) == 0 { return nil, "", false, nil } payload := make([]byte, len(typed)) copy(payload, typed) return payload, "", true, nil case []any: payload, ok := bytesFromNumberArray(typed) return payload, "", ok, nil case map[string]any: if data, ok := typed["data"].([]any); ok { payload, ok := bytesFromNumberArray(data) return payload, firstNonEmptyString(stringFromAny(typed["mime_type"]), stringFromAny(typed["mimeType"])), ok, nil } if data, ok := typed["data"].([]byte); ok && len(data) > 0 { payload := make([]byte, len(data)) copy(payload, data) return payload, firstNonEmptyString(stringFromAny(typed["mime_type"]), stringFromAny(typed["mimeType"])), true, nil } return nil, "", false, nil case string: return inlineMediaPayloadFromString(typed, strictBase64) default: return nil, "", false, nil } } func inlineMediaPayloadFromString(value string, strictBase64 bool) ([]byte, string, bool, error) { raw := strings.TrimSpace(value) if raw == "" || mediaURLString(raw) { return nil, "", false, nil } if strings.HasPrefix(strings.ToLower(raw), "data:") { contentType, encoded, ok, err := parseBase64DataURL(raw) if err != nil || !ok { return nil, "", false, err } payload, err := decodeBase64Payload(encoded) if err != nil { return nil, "", false, &clients.ClientError{Code: "upload_decode_failed", Message: err.Error(), Retryable: false} } return payload, contentType, true, nil } if !strictBase64 && len(raw) < 64 { return nil, "", false, nil } payload, err := decodeBase64Payload(raw) if err != nil { if strictBase64 { return nil, "", false, &clients.ClientError{Code: "upload_decode_failed", Message: err.Error(), Retryable: false} } return nil, "", false, nil } return payload, "", true, nil } func parseBase64DataURL(value string) (string, string, bool, error) { prefix, payload, ok := strings.Cut(value, ",") if !ok { return "", "", false, &clients.ClientError{Code: "upload_decode_failed", Message: "invalid data URL media payload", Retryable: false} } meta := strings.TrimPrefix(prefix, "data:") meta = strings.TrimPrefix(meta, "DATA:") parts := strings.Split(meta, ";") contentType := strings.TrimSpace(parts[0]) isBase64 := false for _, part := range parts[1:] { if strings.EqualFold(strings.TrimSpace(part), "base64") { isBase64 = true break } } if !isBase64 { return "", "", false, &clients.ClientError{Code: "upload_decode_failed", Message: "data URL media payload is not base64 encoded", Retryable: false} } return contentType, payload, true, nil } func decodeBase64Payload(value string) ([]byte, error) { normalized := strings.Map(func(r rune) rune { switch r { case '\n', '\r', '\t', ' ': return -1 default: return r } }, stripDataURLPrefix(value)) encodings := []*base64.Encoding{ base64.StdEncoding, base64.RawStdEncoding, base64.URLEncoding, base64.RawURLEncoding, } var lastErr error for _, encoding := range encodings { payload, err := encoding.DecodeString(normalized) if err == nil && len(payload) > 0 { return payload, nil } if err != nil { lastErr = err } } if lastErr == nil { lastErr = fmt.Errorf("empty base64 payload") } return nil, lastErr } func bytesFromNumberArray(values []any) ([]byte, bool) { if len(values) == 0 { return nil, false } payload := make([]byte, 0, len(values)) for _, value := range values { next, ok := byteFromAny(value) if !ok { return nil, false } payload = append(payload, next) } return payload, true } func byteFromAny(value any) (byte, bool) { switch typed := value.(type) { case byte: return typed, true case int: if typed >= 0 && typed <= 255 { return byte(typed), true } case int64: if typed >= 0 && typed <= 255 { return byte(typed), true } case float64: asInt := int(typed) if typed == float64(asInt) && asInt >= 0 && asInt <= 255 { return byte(asInt), true } } return 0, false } func inlineMediaKeys(item map[string]any) []string { keys := []string{} for _, key := range inlineMediaCandidateKeys() { value, ok := item[key] if !ok || value == nil { continue } strictBase64 := inlineMediaKeyIsStrictBase64(key) if strictBase64 && stringFromAny(value) != "" { keys = append(keys, key) continue } if _, _, ok, _ := inlineMediaPayload(value, strictBase64); ok { keys = append(keys, key) } } return uniqueStringList(keys) } func inlineMediaCandidateKeys() []string { return []string{ "b64_json", "image_base64", "image_b64", "video_base64", "video_b64", "base64", "b64", "url", "image_url", "imageUrl", "video_url", "videoUrl", "output_url", "outputUrl", "output_video_url", "outputVideoUrl", "image", "video", "image_buffer", "image_bytes", "video_buffer", "video_bytes", "buffer", "bytes", "data", } } func inlineMediaKeyIsStrictBase64(key string) bool { lower := strings.ToLower(key) return lower == "b64_json" || lower == "base64" || lower == "b64" || strings.Contains(lower, "base64") || strings.Contains(lower, "_b64") } func mediaURLSourceFromItem(item map[string]any) (string, string) { for _, key := range mediaURLCandidateKeys() { value := stringFromAny(item[key]) if value != "" && mediaURLString(value) { return key, value } } return "", "" } func mediaURLKeys(item map[string]any) []string { keys := []string{} for _, key := range mediaURLCandidateKeys() { value := stringFromAny(item[key]) if value != "" && mediaURLString(value) { keys = append(keys, key) } } return uniqueStringList(keys) } func mediaURLCandidateKeys() []string { return []string{"url", "image_url", "imageUrl", "video_url", "videoUrl", "output_url", "outputUrl", "output_video_url", "outputVideoUrl", "download_url", "downloadUrl", "file_url", "fileUrl"} } func mediaURLString(value string) bool { raw := strings.TrimSpace(value) if raw == "" { return false } lower := strings.ToLower(raw) if strings.HasPrefix(lower, "data:") { return false } return strings.HasPrefix(lower, "http://") || strings.HasPrefix(lower, "https://") || strings.HasPrefix(lower, "/") || strings.Contains(lower, "://") } func mediaContentTypeFromItem(item map[string]any) string { return firstNonEmptyString( stringFromAny(item["mime_type"]), stringFromAny(item["mimeType"]), stringFromAny(item["content_type"]), stringFromAny(item["contentType"]), ) } func mediaKindForAsset(taskKind string, item map[string]any, sourceKey string, contentType string) string { contentType = strings.ToLower(strings.TrimSpace(contentType)) if strings.HasPrefix(contentType, "image/") { return "image" } if strings.HasPrefix(contentType, "video/") { return "video" } if strings.HasPrefix(contentType, "audio/") { return "audio" } itemType := strings.ToLower(strings.TrimSpace(stringFromAny(item["type"]))) if strings.Contains(itemType, "video") { return "video" } if strings.Contains(itemType, "image") { return "image" } key := strings.ToLower(sourceKey) if strings.Contains(key, "video") { return "video" } if strings.Contains(key, "image") { return "image" } kind := strings.ToLower(strings.TrimSpace(taskKind)) if strings.Contains(kind, "video") { return "video" } if strings.Contains(kind, "image") { return "image" } return "image" } func defaultContentTypeForGeneratedAsset(kind string) string { switch strings.ToLower(strings.TrimSpace(kind)) { case "video": return "video/mp4" case "audio": return "audio/mpeg" default: return "image/png" } } func resolvedGeneratedAssetContentType(declared string, kind string, payload []byte) string { declared = normalizeGeneratedContentType(declared) detected := detectGeneratedAssetContentType(payload) if generatedContentTypeIsMedia(detected) { return detected } if generatedContentTypeIsMedia(declared) { return declared } return defaultContentTypeForGeneratedAsset(kind) } func detectGeneratedAssetContentType(payload []byte) string { if len(payload) == 0 { return "" } return normalizeGeneratedContentType(http.DetectContentType(payload)) } func normalizeGeneratedContentType(contentType string) string { return strings.ToLower(strings.TrimSpace(strings.Split(contentType, ";")[0])) } func generatedContentTypeIsMedia(contentType string) bool { return strings.HasPrefix(contentType, "image/") || strings.HasPrefix(contentType, "video/") || strings.HasPrefix(contentType, "audio/") } func generatedAssetKindFromContentType(fallback string, contentType string) string { contentType = normalizeGeneratedContentType(contentType) if strings.HasPrefix(contentType, "image/") { return "image" } if strings.HasPrefix(contentType, "video/") { return "video" } if strings.HasPrefix(contentType, "audio/") { return "audio" } fallback = strings.ToLower(strings.TrimSpace(fallback)) if fallback != "" { return fallback } return "image" } func generatedAssetFileName(taskID string, index int, contentType string, kind string) string { token := sanitizeGeneratedAssetNamePart(taskID) if token == "" { token = fmt.Sprintf("%d", time.Now().UTC().UnixNano()) } if len(token) > 32 { token = token[:32] } return fmt.Sprintf("gateway-result-%s-%02d-%s%s", token, index+1, randomHexSuffix(6), fileExtensionForContentType(contentType, kind)) } func sanitizeGeneratedAssetNamePart(value string) string { value = strings.ToLower(strings.TrimSpace(value)) var builder strings.Builder for _, item := range value { if (item >= 'a' && item <= 'z') || (item >= '0' && item <= '9') || item == '-' || item == '_' { builder.WriteRune(item) } } return strings.Trim(builder.String(), "-_") } func randomHexSuffix(byteCount int) string { if byteCount <= 0 { byteCount = 6 } payload := make([]byte, byteCount) if _, err := rand.Read(payload); err == nil { return hex.EncodeToString(payload) } return fmt.Sprintf("%d", time.Now().UTC().UnixNano()) } func fileExtensionForContentType(contentType string, kind string) string { normalized := strings.ToLower(strings.TrimSpace(strings.Split(contentType, ";")[0])) switch normalized { case "image/jpeg", "image/jpg": return ".jpg" case "image/webp": return ".webp" case "image/gif": return ".gif" case "image/avif": return ".avif" case "image/bmp": return ".bmp" case "image/svg+xml": return ".svg" case "video/webm": return ".webm" case "video/quicktime": return ".mov" case "video/mp4": return ".mp4" case "audio/wav", "audio/x-wav": return ".wav" case "audio/ogg": return ".ogg" case "audio/mpeg", "audio/mp3": return ".mp3" case "image/png": return ".png" } if strings.EqualFold(kind, "video") { return ".mp4" } if strings.EqualFold(kind, "audio") { return ".mp3" } return ".png" } func uniqueStringList(values []string) []string { seen := map[string]bool{} out := make([]string, 0, len(values)) for _, value := range values { value = strings.TrimSpace(value) if value == "" || seen[value] { continue } seen[value] = true out = append(out, value) } return out } func uploadRetrySchedule(policy map[string]any) (int, []time.Duration) { if policy == nil { policy = defaultUploadRetryPolicy() } if enabled, ok := policy["enabled"].(bool); ok && !enabled { return 0, nil } maxRetries := intFromPolicy(policy, "maxRetries") if maxRetries <= 0 { maxRetries = 3 } delays := uploadRetryDelays(policy["backoffSeconds"]) if len(delays) == 0 { delays = []time.Duration{60 * time.Second, 120 * time.Second, 180 * time.Second} } return maxRetries, delays } func uploadRetryDelays(value any) []time.Duration { items, ok := value.([]any) if !ok { return nil } delays := make([]time.Duration, 0, len(items)) for _, item := range items { seconds := int(floatFromAny(item)) if seconds > 0 { delays = append(delays, time.Duration(seconds)*time.Second) } } return delays } func retryDelayForAttempt(attempt int, delays []time.Duration) time.Duration { if attempt < len(delays) { return delays[attempt] } return delays[len(delays)-1] } func sleepWithContext(ctx context.Context, delay time.Duration) error { timer := time.NewTimer(delay) defer timer.Stop() select { case <-ctx.Done(): return ctx.Err() case <-timer.C: return nil } } func defaultUploadRetryPolicy() map[string]any { return map[string]any{ "enabled": true, "maxRetries": 3, "backoffSeconds": []any{60, 120, 180}, "strategy": "exponential", } } func normalizeUploadResponse(decoded map[string]any, channel store.FileStorageChannel) map[string]any { if decoded == nil { decoded = map[string]any{} } if stringFromAny(decoded["url"]) == "" { if urlValue := uploadResponseURL(decoded); urlValue != "" { decoded["url"] = urlValue } } decoded["storageChannel"] = map[string]any{ "id": channel.ID, "channelKey": channel.ChannelKey, "name": channel.Name, "provider": channel.Provider, } return decoded } func uploadResponseURL(decoded map[string]any) string { for _, key := range []string{"url", "fileUrl", "file_url"} { if value := stringFromAny(decoded[key]); value != "" { return value } } for _, key := range []string{"data", "file", "result"} { if nested, ok := decoded[key].(map[string]any); ok { if value := uploadResponseURL(nested); value != "" { return value } } if items, ok := decoded[key].([]any); ok && len(items) > 0 { if nested, ok := items[0].(map[string]any); ok { if value := uploadResponseURL(nested); value != "" { return value } } } } return "" }