easyai-ai-gateway/apps/api/internal/runner/upload.go

1196 lines
36 KiB
Go

package runner
import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"net/textproto"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/clients"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/config"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/store"
)
const defaultServerMainOpenAPIUploadURL = "http://127.0.0.1:3001/v1/files/upload"
const maxGeneratedAssetFetchBytes = 256 << 20
const (
localStaticGeneratedPathPrefix = "/static/generated/"
localStaticUploadedPathPrefix = "/static/uploaded/"
)
type FileUploadPayload struct {
ContentType string
FileName string
Scene string
Source string
Bytes []byte
}
type generatedAssetUploadPolicy struct {
UploadInlineMedia bool
UploadURLMedia bool
StoreInlineMediaLocally bool
}
type generatedAssetDecision struct {
Inline *generatedInlineAsset
URL *generatedURLAsset
StripKeys []string
}
type generatedInlineAsset struct {
Bytes []byte
ContentType string
Kind string
SourceKey string
}
type generatedURLAsset struct {
URL string
ContentType string
Kind string
SourceKey string
}
func defaultGeneratedAssetUploadPolicy() generatedAssetUploadPolicy {
return generatedAssetUploadPolicy{
UploadInlineMedia: true,
UploadURLMedia: false,
}
}
func (s *Service) uploadGeneratedAssets(ctx context.Context, taskID string, taskKind string, result map[string]any) (map[string]any, error) {
data, _ := result["data"].([]any)
if len(data) == 0 {
return result, nil
}
policy, err := s.generatedAssetUploadPolicy(ctx)
if err != nil {
return nil, &clients.ClientError{Code: "upload_config_failed", Message: err.Error(), Retryable: true}
}
decisions := make([]generatedAssetDecision, len(data))
needsUpload := false
changed := false
for index, rawItem := range data {
item, _ := rawItem.(map[string]any)
if item == nil {
continue
}
decision, err := generatedAssetDecisionForItem(taskKind, item, policy)
if err != nil {
return nil, err
}
decisions[index] = decision
if decision.Inline != nil || decision.URL != nil {
needsUpload = true
}
if len(decision.StripKeys) > 0 {
changed = true
}
}
if !needsUpload && !changed {
return result, nil
}
var channels []store.FileStorageChannel
if needsUpload && generatedAssetNeedsChannelLookup(policy, decisions) {
channels, err = s.activeFileStorageChannels(ctx, store.FileStorageSceneImageResult)
if err != nil {
return nil, &clients.ClientError{Code: "upload_config_failed", Message: err.Error(), Retryable: true}
}
}
next := map[string]any{}
for key, value := range result {
next[key] = value
}
nextData := make([]any, 0, len(data))
for index, rawItem := range data {
item, _ := rawItem.(map[string]any)
if item == nil {
nextData = append(nextData, rawItem)
continue
}
decision := decisions[index]
merged := map[string]any{}
for key, value := range item {
merged[key] = value
}
for _, key := range decision.StripKeys {
delete(merged, key)
}
if decision.Inline != nil || decision.URL != nil {
var upload map[string]any
var sourceKey string
var strategy string
var kind string
var contentType string
var err error
if decision.Inline != nil {
upload, contentType, kind, strategy, err = s.uploadGeneratedAsset(ctx, taskID, decision.Inline, index, channels, policy.StoreInlineMediaLocally)
sourceKey = decision.Inline.SourceKey
} else {
upload, contentType, kind, strategy, err = s.uploadGeneratedURLAsset(ctx, taskID, decision.URL, index, channels)
sourceKey = decision.URL.SourceKey
}
if err != nil {
return nil, err
}
merged["upload"] = upload
merged["assetStorage"] = map[string]any{
"scene": store.FileStorageSceneImageResult,
"source": sourceKey,
"strategy": strategy,
}
if contentType != "" {
merged["assetStorage"].(map[string]any)["contentType"] = contentType
}
if urlValue := stringFromAny(upload["url"]); urlValue != "" {
merged["url"] = urlValue
if kind == "video" {
merged["video_url"] = urlValue
}
if kind == "image" {
merged["image_url"] = urlValue
}
}
if kind != "" && stringFromAny(merged["type"]) == "" {
merged["type"] = kind
}
if contentType != "" && stringFromAny(merged["mime_type"]) == "" {
merged["mime_type"] = contentType
}
}
nextData = append(nextData, merged)
}
next["data"] = nextData
return next, nil
}
func (s *Service) generatedAssetUploadPolicy(ctx context.Context) (generatedAssetUploadPolicy, error) {
settings, err := s.store.GetFileStorageSettings(ctx)
if err != nil {
if store.IsUndefinedDatabaseObject(err) {
return defaultGeneratedAssetUploadPolicy(), nil
}
return generatedAssetUploadPolicy{}, err
}
return generatedAssetUploadPolicyFromName(settings.ResultUploadPolicy), nil
}
func generatedAssetUploadPolicyFromName(policyName string) generatedAssetUploadPolicy {
policyName = store.NormalizeFileStorageResultUploadPolicy(policyName)
switch policyName {
case store.FileStorageResultUploadPolicyUploadAll:
return generatedAssetUploadPolicy{UploadInlineMedia: true, UploadURLMedia: true}
case store.FileStorageResultUploadPolicyUploadNone:
return generatedAssetUploadPolicy{UploadInlineMedia: true, UploadURLMedia: false, StoreInlineMediaLocally: true}
default:
return defaultGeneratedAssetUploadPolicy()
}
}
func generatedAssetNeedsChannelLookup(policy generatedAssetUploadPolicy, decisions []generatedAssetDecision) bool {
for _, decision := range decisions {
if decision.URL != nil {
return true
}
if decision.Inline != nil && !policy.StoreInlineMediaLocally {
return true
}
}
return false
}
func (s *Service) uploadGeneratedAsset(ctx context.Context, taskID string, asset *generatedInlineAsset, index int, channels []store.FileStorageChannel, forceLocal bool) (map[string]any, string, string, string, error) {
contentType := resolvedGeneratedAssetContentType(asset.ContentType, asset.Kind, asset.Bytes)
kind := generatedAssetKindFromContentType(asset.Kind, contentType)
payload := FileUploadPayload{
Bytes: asset.Bytes,
ContentType: contentType,
FileName: generatedAssetFileName(taskID, index, contentType, kind),
Scene: store.FileStorageSceneImageResult,
Source: "ai-gateway",
}
if forceLocal || len(channels) == 0 {
upload, err := s.storeFileLocally(payload, s.cfg.LocalGeneratedStorageDir, config.DefaultLocalGeneratedStorageDir, localStaticGeneratedPathPrefix)
return upload, contentType, kind, "local_static_inline_media", err
}
upload, err := s.uploadFileWithFailover(ctx, payload, channels)
return upload, contentType, kind, "upload_inline_media", err
}
func (s *Service) uploadGeneratedURLAsset(ctx context.Context, taskID string, asset *generatedURLAsset, index int, channels []store.FileStorageChannel) (map[string]any, string, string, string, error) {
payload, contentType, err := s.readGeneratedURLAsset(ctx, asset)
if err != nil {
return nil, "", "", "", err
}
contentType = resolvedGeneratedAssetContentType(firstNonEmptyString(contentType, asset.ContentType), asset.Kind, payload)
kind := generatedAssetKindFromContentType(asset.Kind, contentType)
uploadPayload := FileUploadPayload{
Bytes: payload,
ContentType: contentType,
FileName: generatedAssetFileName(taskID, index, contentType, kind),
Scene: store.FileStorageSceneImageResult,
Source: "ai-gateway",
}
if len(channels) == 0 {
upload, err := s.storeFileLocally(uploadPayload, s.cfg.LocalGeneratedStorageDir, config.DefaultLocalGeneratedStorageDir, localStaticGeneratedPathPrefix)
return upload, contentType, kind, "local_static_url_media", err
}
upload, err := s.uploadFileWithFailover(ctx, uploadPayload, channels)
return upload, contentType, kind, "upload_url_media", err
}
func (s *Service) storeFileLocally(payload FileUploadPayload, storageDir string, fallbackStorageDir string, pathPrefix string) (map[string]any, error) {
storageDir = strings.TrimSpace(storageDir)
if storageDir == "" {
storageDir = fallbackStorageDir
}
if err := os.MkdirAll(storageDir, 0o755); err != nil {
return nil, &clients.ClientError{Code: "local_static_store_failed", Message: err.Error(), Retryable: true}
}
fileName := filepath.Base(strings.TrimSpace(payload.FileName))
if fileName == "" || fileName == "." || fileName == ".." || fileName == string(filepath.Separator) {
kind := generatedAssetKindFromContentType("", payload.ContentType)
fileName = generatedAssetFileName("generated", 0, payload.ContentType, kind)
}
targetPath := filepath.Join(storageDir, fileName)
file, err := os.OpenFile(targetPath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o644)
if err != nil {
return nil, &clients.ClientError{Code: "local_static_store_failed", Message: err.Error(), Retryable: true}
}
_, writeErr := file.Write(payload.Bytes)
closeErr := file.Close()
if writeErr != nil {
_ = os.Remove(targetPath)
return nil, &clients.ClientError{Code: "local_static_store_failed", Message: writeErr.Error(), Retryable: true}
}
if closeErr != nil {
_ = os.Remove(targetPath)
return nil, &clients.ClientError{Code: "local_static_store_failed", Message: closeErr.Error(), Retryable: true}
}
return map[string]any{
"url": s.localStaticFileURL(fileName, pathPrefix),
"fileName": fileName,
"contentType": payload.ContentType,
"size": len(payload.Bytes),
"storageChannel": map[string]any{
"id": "local-static",
"channelKey": "local-static",
"name": "AI Gateway local static storage",
"provider": "local_static",
},
}, nil
}
func (s *Service) localStaticFileURL(fileName string, pathPrefix string) string {
if strings.TrimSpace(pathPrefix) == "" {
pathPrefix = localStaticUploadedPathPrefix
}
path := pathPrefix + url.PathEscape(filepath.Base(fileName))
baseURL := strings.TrimRight(strings.TrimSpace(s.cfg.PublicBaseURL), "/")
if baseURL == "" {
return path
}
return baseURL + path
}
func localStaticUploadFileName(originalName string, contentType string) string {
baseName := filepath.Base(strings.TrimSpace(originalName))
originalExt := strings.ToLower(filepath.Ext(baseName))
namePart := strings.TrimSuffix(baseName, originalExt)
namePart = sanitizeGeneratedAssetNamePart(namePart)
if namePart == "" {
namePart = "gateway-upload"
}
if len(namePart) > 48 {
namePart = namePart[:48]
}
return fmt.Sprintf("%s-%s%s", namePart, randomHexSuffix(6), uploadFileExtension(contentType, originalExt))
}
func uploadFileExtension(contentType string, fallbackExt string) string {
normalized := normalizeGeneratedContentType(contentType)
if generatedContentTypeIsMedia(normalized) {
return fileExtensionForContentType(normalized, generatedAssetKindFromContentType("", normalized))
}
if normalized != "" && normalized != "application/octet-stream" {
if extensions, err := mime.ExtensionsByType(normalized); err == nil && len(extensions) > 0 {
if ext := sanitizeFileExtension(extensions[0]); ext != "" {
return ext
}
}
}
if ext := sanitizeFileExtension(fallbackExt); ext != "" {
return ext
}
if normalized == "application/json" {
return ".json"
}
if strings.HasPrefix(normalized, "text/") {
return ".txt"
}
return ".bin"
}
func sanitizeFileExtension(value string) string {
value = strings.ToLower(strings.TrimSpace(value))
if value == "" {
return ""
}
if !strings.HasPrefix(value, ".") {
value = "." + value
}
if len(value) > 16 {
return ""
}
for _, item := range value[1:] {
if (item >= 'a' && item <= 'z') || (item >= '0' && item <= '9') {
continue
}
return ""
}
return value
}
func (s *Service) readGeneratedURLAsset(ctx context.Context, asset *generatedURLAsset) ([]byte, string, error) {
fetchURL, err := s.generatedAssetFetchURL(asset.URL)
if err != nil {
return nil, "", err
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fetchURL, nil)
if err != nil {
return nil, "", err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, "", &clients.ClientError{Code: "upload_source_fetch_failed", Message: err.Error(), Retryable: true}
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
message := strings.TrimSpace(string(body))
if message == "" {
message = "generated media source fetch failed"
}
return nil, "", &clients.ClientError{
Code: "upload_source_fetch_failed",
Message: message,
StatusCode: resp.StatusCode,
Retryable: clients.HTTPRetryable(resp.StatusCode),
}
}
payload, err := io.ReadAll(io.LimitReader(resp.Body, maxGeneratedAssetFetchBytes+1))
if err != nil {
return nil, "", &clients.ClientError{Code: "upload_source_read_failed", Message: err.Error(), StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode)}
}
if len(payload) > maxGeneratedAssetFetchBytes {
return nil, "", &clients.ClientError{Code: "upload_source_too_large", Message: "generated media source exceeds upload fetch limit", StatusCode: resp.StatusCode, Retryable: false}
}
contentType := firstNonEmptyString(resp.Header.Get("Content-Type"), asset.ContentType)
return payload, strings.TrimSpace(strings.Split(contentType, ";")[0]), nil
}
func (s *Service) generatedAssetFetchURL(raw string) (string, error) {
value := strings.TrimSpace(raw)
if value == "" {
return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: "generated media source URL is empty", Retryable: false}
}
parsed, err := url.Parse(value)
if err != nil {
return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: err.Error(), Retryable: false}
}
if parsed.IsAbs() {
if parsed.Scheme == "http" || parsed.Scheme == "https" {
return value, nil
}
return "", &clients.ClientError{Code: "upload_source_unsupported_url", Message: "unsupported generated media source URL scheme: " + parsed.Scheme, Retryable: false}
}
if strings.HasPrefix(value, "/") {
baseURL := generatedAssetLocalBaseURL(s.cfg.HTTPAddr)
if baseURL == "" {
return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: "generated media source uses a relative URL without a local HTTP address", Retryable: false}
}
return baseURL + value, nil
}
return "", &clients.ClientError{Code: "upload_source_invalid_url", Message: "generated media source URL must be absolute or root-relative", Retryable: false}
}
func generatedAssetLocalBaseURL(httpAddr string) string {
addr := strings.TrimSpace(httpAddr)
if addr == "" {
return "http://127.0.0.1:8088"
}
if strings.HasPrefix(addr, "http://") || strings.HasPrefix(addr, "https://") {
return strings.TrimRight(addr, "/")
}
if strings.HasPrefix(addr, ":") {
return "http://127.0.0.1" + addr
}
if strings.Contains(addr, "://") {
return ""
}
return "http://" + strings.TrimRight(addr, "/")
}
func (s *Service) UploadFile(ctx context.Context, payload FileUploadPayload) (map[string]any, error) {
if strings.TrimSpace(payload.Scene) == "" {
payload.Scene = store.FileStorageSceneUpload
}
channels, err := s.activeFileStorageChannels(ctx, payload.Scene)
if err != nil {
return nil, &clients.ClientError{Code: "upload_config_failed", Message: err.Error(), Retryable: true}
}
if len(channels) == 0 {
payload.FileName = localStaticUploadFileName(payload.FileName, payload.ContentType)
upload, err := s.storeFileLocally(payload, s.cfg.LocalUploadedStorageDir, config.DefaultLocalUploadedStorageDir, localStaticUploadedPathPrefix)
if err != nil {
return nil, err
}
upload["assetStorage"] = map[string]any{
"scene": payload.Scene,
"source": firstNonEmptyString(payload.Source, "ai-gateway-openapi"),
"strategy": "local_static_upload",
}
return upload, nil
}
return s.uploadFileWithFailover(ctx, payload, channels)
}
func (s *Service) activeFileStorageChannels(ctx context.Context, scene string) ([]store.FileStorageChannel, error) {
if s.store == nil {
return nil, nil
}
channels, err := s.store.ListEnabledFileStorageChannelsForScene(ctx, scene)
if err != nil && !store.IsUndefinedDatabaseObject(err) {
return nil, err
}
if len(channels) > 0 {
return channels, nil
}
return nil, nil
}
func (s *Service) uploadFileWithFailover(ctx context.Context, payload FileUploadPayload, channels []store.FileStorageChannel) (map[string]any, error) {
var lastErr error
for _, channel := range channels {
upload, err := s.uploadWithChannelRetries(ctx, payload, channel)
if err == nil {
if s.store != nil {
_ = s.store.MarkFileStorageChannelSuccess(context.WithoutCancel(ctx), channel.ID)
}
return upload, nil
}
lastErr = err
if s.store != nil {
_ = s.store.MarkFileStorageChannelFailure(context.WithoutCancel(ctx), channel.ID, err.Error())
}
}
if lastErr != nil {
return nil, lastErr
}
return nil, &clients.ClientError{Code: "upload_no_channel", Message: "no enabled file storage channel", Retryable: false}
}
func (s *Service) uploadWithChannelRetries(ctx context.Context, payload FileUploadPayload, channel store.FileStorageChannel) (map[string]any, error) {
maxRetries, delays := uploadRetrySchedule(channel.RetryPolicy)
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
upload, err := s.uploadOnce(ctx, payload, channel)
if err == nil {
return upload, nil
}
lastErr = err
if attempt >= maxRetries || !clients.IsRetryable(err) {
break
}
delay := retryDelayForAttempt(attempt, delays)
if err := sleepWithContext(ctx, delay); err != nil {
return nil, err
}
}
return nil, lastErr
}
func (s *Service) uploadOnce(ctx context.Context, payload FileUploadPayload, channel store.FileStorageChannel) (map[string]any, error) {
if strings.ToLower(strings.TrimSpace(channel.Provider)) != "server_main_openapi" {
return nil, &clients.ClientError{Code: "upload_unsupported_channel", Message: "unsupported file storage channel: " + channel.Provider, Retryable: false}
}
uploadURL := strings.TrimSpace(channel.UploadURL)
if uploadURL == "" {
uploadURL = defaultServerMainOpenAPIUploadURL
}
apiKey := strings.TrimSpace(channel.APIKey)
if apiKey == "" {
return nil, &clients.ClientError{Code: "missing_credentials", Message: "file storage channel API key is required", Retryable: false}
}
var body bytes.Buffer
writer := multipart.NewWriter(&body)
fileWriter, err := createUploadFormFile(writer, "file", firstNonEmptyString(payload.FileName, "upload.bin"), payload.ContentType)
if err != nil {
return nil, err
}
if _, err := fileWriter.Write(payload.Bytes); err != nil {
return nil, err
}
_ = writer.WriteField("source", firstNonEmptyString(payload.Source, "ai-gateway"))
_ = writer.WriteField("scene", firstNonEmptyString(payload.Scene, store.FileStorageSceneUpload))
if err := writer.Close(); err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uploadURL, &body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", writer.FormDataContentType())
if payload.ContentType != "" {
req.Header.Set("X-Upload-Content-Type", payload.ContentType)
}
req.Header.Set("Authorization", "Bearer "+apiKey)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, &clients.ClientError{Code: "upload_network", Message: err.Error(), Retryable: true}
}
defer resp.Body.Close()
responseBody, readErr := io.ReadAll(resp.Body)
if readErr != nil {
return nil, &clients.ClientError{Code: "upload_read_failed", Message: readErr.Error(), StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode)}
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
message := strings.TrimSpace(string(responseBody))
if message == "" {
message = "file upload failed"
}
return nil, &clients.ClientError{Code: "upload_failed", Message: message, StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode)}
}
var decoded map[string]any
if err := json.Unmarshal(responseBody, &decoded); err != nil {
return nil, &clients.ClientError{Code: "upload_invalid_response", Message: err.Error(), Retryable: false}
}
return normalizeUploadResponse(decoded, channel), nil
}
func createUploadFormFile(writer *multipart.Writer, fieldName string, fileName string, contentType string) (io.Writer, error) {
header := make(textproto.MIMEHeader)
header.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s"; filename="%s"`, escapeMultipartValue(fieldName), escapeMultipartValue(fileName)))
if strings.TrimSpace(contentType) != "" {
header.Set("Content-Type", strings.TrimSpace(contentType))
}
return writer.CreatePart(header)
}
func escapeMultipartValue(value string) string {
return strings.NewReplacer("\\", "\\\\", `"`, "\\\"").Replace(value)
}
func stripDataURLPrefix(value string) string {
if index := strings.Index(value, ","); strings.HasPrefix(value, "data:") && index >= 0 {
return value[index+1:]
}
return value
}
func generatedAssetDecisionForItem(taskKind string, item map[string]any, policy generatedAssetUploadPolicy) (generatedAssetDecision, error) {
decision := generatedAssetDecision{}
urlKey, mediaURL := mediaURLSourceFromItem(item)
if mediaURL != "" {
if !policy.UploadURLMedia {
decision.StripKeys = inlineMediaKeys(item)
return decision, nil
}
contentType := mediaContentTypeFromItem(item)
kind := mediaKindForAsset(taskKind, item, urlKey, contentType)
decision.URL = &generatedURLAsset{
URL: mediaURL,
ContentType: contentType,
Kind: kind,
SourceKey: urlKey,
}
decision.StripKeys = uniqueStringList(append(mediaURLKeys(item), inlineMediaKeys(item)...))
return decision, nil
}
if !policy.UploadInlineMedia {
return decision, nil
}
asset, keys, err := inlineAssetFromItem(taskKind, item)
if err != nil {
return decision, err
}
if asset == nil {
return decision, nil
}
decision.Inline = asset
decision.StripKeys = keys
return decision, nil
}
func inlineAssetFromItem(taskKind string, item map[string]any) (*generatedInlineAsset, []string, error) {
for _, key := range inlineMediaCandidateKeys() {
value, ok := item[key]
if !ok || value == nil {
continue
}
strictBase64 := inlineMediaKeyIsStrictBase64(key)
payload, contentType, ok, err := inlineMediaPayload(value, strictBase64)
if err != nil {
return nil, nil, err
}
if !ok {
continue
}
contentType = firstNonEmptyString(contentType, mediaContentTypeFromItem(item), defaultContentTypeForGeneratedAsset(mediaKindForAsset(taskKind, item, key, contentType)))
kind := mediaKindForAsset(taskKind, item, key, contentType)
return &generatedInlineAsset{
Bytes: payload,
ContentType: contentType,
Kind: kind,
SourceKey: key,
}, inlineMediaKeys(item), nil
}
return nil, nil, nil
}
func inlineMediaPayload(value any, strictBase64 bool) ([]byte, string, bool, error) {
switch typed := value.(type) {
case []byte:
if len(typed) == 0 {
return nil, "", false, nil
}
payload := make([]byte, len(typed))
copy(payload, typed)
return payload, "", true, nil
case []any:
payload, ok := bytesFromNumberArray(typed)
return payload, "", ok, nil
case map[string]any:
if data, ok := typed["data"].([]any); ok {
payload, ok := bytesFromNumberArray(data)
return payload, firstNonEmptyString(stringFromAny(typed["mime_type"]), stringFromAny(typed["mimeType"])), ok, nil
}
if data, ok := typed["data"].([]byte); ok && len(data) > 0 {
payload := make([]byte, len(data))
copy(payload, data)
return payload, firstNonEmptyString(stringFromAny(typed["mime_type"]), stringFromAny(typed["mimeType"])), true, nil
}
return nil, "", false, nil
case string:
return inlineMediaPayloadFromString(typed, strictBase64)
default:
return nil, "", false, nil
}
}
func inlineMediaPayloadFromString(value string, strictBase64 bool) ([]byte, string, bool, error) {
raw := strings.TrimSpace(value)
if raw == "" || mediaURLString(raw) {
return nil, "", false, nil
}
if strings.HasPrefix(strings.ToLower(raw), "data:") {
contentType, encoded, ok, err := parseBase64DataURL(raw)
if err != nil || !ok {
return nil, "", false, err
}
payload, err := decodeBase64Payload(encoded)
if err != nil {
return nil, "", false, &clients.ClientError{Code: "upload_decode_failed", Message: err.Error(), Retryable: false}
}
return payload, contentType, true, nil
}
if !strictBase64 && len(raw) < 64 {
return nil, "", false, nil
}
payload, err := decodeBase64Payload(raw)
if err != nil {
if strictBase64 {
return nil, "", false, &clients.ClientError{Code: "upload_decode_failed", Message: err.Error(), Retryable: false}
}
return nil, "", false, nil
}
return payload, "", true, nil
}
func parseBase64DataURL(value string) (string, string, bool, error) {
prefix, payload, ok := strings.Cut(value, ",")
if !ok {
return "", "", false, &clients.ClientError{Code: "upload_decode_failed", Message: "invalid data URL media payload", Retryable: false}
}
meta := strings.TrimPrefix(prefix, "data:")
meta = strings.TrimPrefix(meta, "DATA:")
parts := strings.Split(meta, ";")
contentType := strings.TrimSpace(parts[0])
isBase64 := false
for _, part := range parts[1:] {
if strings.EqualFold(strings.TrimSpace(part), "base64") {
isBase64 = true
break
}
}
if !isBase64 {
return "", "", false, &clients.ClientError{Code: "upload_decode_failed", Message: "data URL media payload is not base64 encoded", Retryable: false}
}
return contentType, payload, true, nil
}
func decodeBase64Payload(value string) ([]byte, error) {
normalized := strings.Map(func(r rune) rune {
switch r {
case '\n', '\r', '\t', ' ':
return -1
default:
return r
}
}, stripDataURLPrefix(value))
encodings := []*base64.Encoding{
base64.StdEncoding,
base64.RawStdEncoding,
base64.URLEncoding,
base64.RawURLEncoding,
}
var lastErr error
for _, encoding := range encodings {
payload, err := encoding.DecodeString(normalized)
if err == nil && len(payload) > 0 {
return payload, nil
}
if err != nil {
lastErr = err
}
}
if lastErr == nil {
lastErr = fmt.Errorf("empty base64 payload")
}
return nil, lastErr
}
func bytesFromNumberArray(values []any) ([]byte, bool) {
if len(values) == 0 {
return nil, false
}
payload := make([]byte, 0, len(values))
for _, value := range values {
next, ok := byteFromAny(value)
if !ok {
return nil, false
}
payload = append(payload, next)
}
return payload, true
}
func byteFromAny(value any) (byte, bool) {
switch typed := value.(type) {
case byte:
return typed, true
case int:
if typed >= 0 && typed <= 255 {
return byte(typed), true
}
case int64:
if typed >= 0 && typed <= 255 {
return byte(typed), true
}
case float64:
asInt := int(typed)
if typed == float64(asInt) && asInt >= 0 && asInt <= 255 {
return byte(asInt), true
}
}
return 0, false
}
func inlineMediaKeys(item map[string]any) []string {
keys := []string{}
for _, key := range inlineMediaCandidateKeys() {
value, ok := item[key]
if !ok || value == nil {
continue
}
strictBase64 := inlineMediaKeyIsStrictBase64(key)
if strictBase64 && stringFromAny(value) != "" {
keys = append(keys, key)
continue
}
if _, _, ok, _ := inlineMediaPayload(value, strictBase64); ok {
keys = append(keys, key)
}
}
return uniqueStringList(keys)
}
func inlineMediaCandidateKeys() []string {
return []string{
"b64_json",
"image_base64",
"image_b64",
"video_base64",
"video_b64",
"base64",
"b64",
"url",
"image_url",
"imageUrl",
"video_url",
"videoUrl",
"output_url",
"outputUrl",
"output_video_url",
"outputVideoUrl",
"image",
"video",
"image_buffer",
"image_bytes",
"video_buffer",
"video_bytes",
"buffer",
"bytes",
"data",
}
}
func inlineMediaKeyIsStrictBase64(key string) bool {
lower := strings.ToLower(key)
return lower == "b64_json" || lower == "base64" || lower == "b64" || strings.Contains(lower, "base64") || strings.Contains(lower, "_b64")
}
func mediaURLSourceFromItem(item map[string]any) (string, string) {
for _, key := range mediaURLCandidateKeys() {
value := stringFromAny(item[key])
if value != "" && mediaURLString(value) {
return key, value
}
}
return "", ""
}
func mediaURLKeys(item map[string]any) []string {
keys := []string{}
for _, key := range mediaURLCandidateKeys() {
value := stringFromAny(item[key])
if value != "" && mediaURLString(value) {
keys = append(keys, key)
}
}
return uniqueStringList(keys)
}
func mediaURLCandidateKeys() []string {
return []string{"url", "image_url", "imageUrl", "video_url", "videoUrl", "output_url", "outputUrl", "output_video_url", "outputVideoUrl", "download_url", "downloadUrl", "file_url", "fileUrl"}
}
func mediaURLString(value string) bool {
raw := strings.TrimSpace(value)
if raw == "" {
return false
}
lower := strings.ToLower(raw)
if strings.HasPrefix(lower, "data:") {
return false
}
return strings.HasPrefix(lower, "http://") ||
strings.HasPrefix(lower, "https://") ||
strings.HasPrefix(lower, "/") ||
strings.Contains(lower, "://")
}
func mediaContentTypeFromItem(item map[string]any) string {
return firstNonEmptyString(
stringFromAny(item["mime_type"]),
stringFromAny(item["mimeType"]),
stringFromAny(item["content_type"]),
stringFromAny(item["contentType"]),
)
}
func mediaKindForAsset(taskKind string, item map[string]any, sourceKey string, contentType string) string {
contentType = strings.ToLower(strings.TrimSpace(contentType))
if strings.HasPrefix(contentType, "image/") {
return "image"
}
if strings.HasPrefix(contentType, "video/") {
return "video"
}
if strings.HasPrefix(contentType, "audio/") {
return "audio"
}
itemType := strings.ToLower(strings.TrimSpace(stringFromAny(item["type"])))
if strings.Contains(itemType, "video") {
return "video"
}
if strings.Contains(itemType, "image") {
return "image"
}
key := strings.ToLower(sourceKey)
if strings.Contains(key, "video") {
return "video"
}
if strings.Contains(key, "image") {
return "image"
}
kind := strings.ToLower(strings.TrimSpace(taskKind))
if strings.Contains(kind, "video") {
return "video"
}
if strings.Contains(kind, "image") {
return "image"
}
return "image"
}
func defaultContentTypeForGeneratedAsset(kind string) string {
switch strings.ToLower(strings.TrimSpace(kind)) {
case "video":
return "video/mp4"
case "audio":
return "audio/mpeg"
default:
return "image/png"
}
}
func resolvedGeneratedAssetContentType(declared string, kind string, payload []byte) string {
declared = normalizeGeneratedContentType(declared)
detected := detectGeneratedAssetContentType(payload)
if generatedContentTypeIsMedia(detected) {
return detected
}
if generatedContentTypeIsMedia(declared) {
return declared
}
return defaultContentTypeForGeneratedAsset(kind)
}
func detectGeneratedAssetContentType(payload []byte) string {
if len(payload) == 0 {
return ""
}
return normalizeGeneratedContentType(http.DetectContentType(payload))
}
func normalizeGeneratedContentType(contentType string) string {
return strings.ToLower(strings.TrimSpace(strings.Split(contentType, ";")[0]))
}
func generatedContentTypeIsMedia(contentType string) bool {
return strings.HasPrefix(contentType, "image/") ||
strings.HasPrefix(contentType, "video/") ||
strings.HasPrefix(contentType, "audio/")
}
func generatedAssetKindFromContentType(fallback string, contentType string) string {
contentType = normalizeGeneratedContentType(contentType)
if strings.HasPrefix(contentType, "image/") {
return "image"
}
if strings.HasPrefix(contentType, "video/") {
return "video"
}
if strings.HasPrefix(contentType, "audio/") {
return "audio"
}
fallback = strings.ToLower(strings.TrimSpace(fallback))
if fallback != "" {
return fallback
}
return "image"
}
func generatedAssetFileName(taskID string, index int, contentType string, kind string) string {
token := sanitizeGeneratedAssetNamePart(taskID)
if token == "" {
token = fmt.Sprintf("%d", time.Now().UTC().UnixNano())
}
if len(token) > 32 {
token = token[:32]
}
return fmt.Sprintf("gateway-result-%s-%02d-%s%s", token, index+1, randomHexSuffix(6), fileExtensionForContentType(contentType, kind))
}
func sanitizeGeneratedAssetNamePart(value string) string {
value = strings.ToLower(strings.TrimSpace(value))
var builder strings.Builder
for _, item := range value {
if (item >= 'a' && item <= 'z') || (item >= '0' && item <= '9') || item == '-' || item == '_' {
builder.WriteRune(item)
}
}
return strings.Trim(builder.String(), "-_")
}
func randomHexSuffix(byteCount int) string {
if byteCount <= 0 {
byteCount = 6
}
payload := make([]byte, byteCount)
if _, err := rand.Read(payload); err == nil {
return hex.EncodeToString(payload)
}
return fmt.Sprintf("%d", time.Now().UTC().UnixNano())
}
func fileExtensionForContentType(contentType string, kind string) string {
normalized := strings.ToLower(strings.TrimSpace(strings.Split(contentType, ";")[0]))
switch normalized {
case "image/jpeg", "image/jpg":
return ".jpg"
case "image/webp":
return ".webp"
case "image/gif":
return ".gif"
case "image/avif":
return ".avif"
case "image/bmp":
return ".bmp"
case "image/svg+xml":
return ".svg"
case "video/webm":
return ".webm"
case "video/quicktime":
return ".mov"
case "video/mp4":
return ".mp4"
case "audio/wav", "audio/x-wav":
return ".wav"
case "audio/ogg":
return ".ogg"
case "audio/mpeg", "audio/mp3":
return ".mp3"
case "image/png":
return ".png"
}
if strings.EqualFold(kind, "video") {
return ".mp4"
}
if strings.EqualFold(kind, "audio") {
return ".mp3"
}
return ".png"
}
func uniqueStringList(values []string) []string {
seen := map[string]bool{}
out := make([]string, 0, len(values))
for _, value := range values {
value = strings.TrimSpace(value)
if value == "" || seen[value] {
continue
}
seen[value] = true
out = append(out, value)
}
return out
}
func uploadRetrySchedule(policy map[string]any) (int, []time.Duration) {
if policy == nil {
policy = defaultUploadRetryPolicy()
}
if enabled, ok := policy["enabled"].(bool); ok && !enabled {
return 0, nil
}
maxRetries := intFromPolicy(policy, "maxRetries")
if maxRetries <= 0 {
maxRetries = 3
}
delays := uploadRetryDelays(policy["backoffSeconds"])
if len(delays) == 0 {
delays = []time.Duration{60 * time.Second, 120 * time.Second, 180 * time.Second}
}
return maxRetries, delays
}
func uploadRetryDelays(value any) []time.Duration {
items, ok := value.([]any)
if !ok {
return nil
}
delays := make([]time.Duration, 0, len(items))
for _, item := range items {
seconds := int(floatFromAny(item))
if seconds > 0 {
delays = append(delays, time.Duration(seconds)*time.Second)
}
}
return delays
}
func retryDelayForAttempt(attempt int, delays []time.Duration) time.Duration {
if attempt < len(delays) {
return delays[attempt]
}
return delays[len(delays)-1]
}
func sleepWithContext(ctx context.Context, delay time.Duration) error {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}
func defaultUploadRetryPolicy() map[string]any {
return map[string]any{
"enabled": true,
"maxRetries": 3,
"backoffSeconds": []any{60, 120, 180},
"strategy": "exponential",
}
}
func normalizeUploadResponse(decoded map[string]any, channel store.FileStorageChannel) map[string]any {
if decoded == nil {
decoded = map[string]any{}
}
if stringFromAny(decoded["url"]) == "" {
if urlValue := uploadResponseURL(decoded); urlValue != "" {
decoded["url"] = urlValue
}
}
decoded["storageChannel"] = map[string]any{
"id": channel.ID,
"channelKey": channel.ChannelKey,
"name": channel.Name,
"provider": channel.Provider,
}
return decoded
}
func uploadResponseURL(decoded map[string]any) string {
for _, key := range []string{"url", "fileUrl", "file_url"} {
if value := stringFromAny(decoded[key]); value != "" {
return value
}
}
for _, key := range []string{"data", "file", "result"} {
if nested, ok := decoded[key].(map[string]any); ok {
if value := uploadResponseURL(nested); value != "" {
return value
}
}
if items, ok := decoded[key].([]any); ok && len(items) > 0 {
if nested, ok := items[0].(map[string]any); ok {
if value := uploadResponseURL(nested); value != "" {
return value
}
}
}
}
return ""
}