package runner import ( "context" "encoding/base64" "fmt" "io" "net/http" "net/netip" "net/url" "os" "path/filepath" "strings" "time" "github.com/easyai/easyai-ai-gateway/apps/api/internal/clients" "github.com/easyai/easyai-ai-gateway/apps/api/internal/config" "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" ) func (s *Service) restoreTaskRequestReferences(ctx context.Context, task store.GatewayTask) (map[string]any, error) { body := cloneMap(task.Request) if body["messages"] != nil || body["messageRefs"] == nil || s.store == nil { return body, nil } refs, err := s.store.ListTaskConversationMessages(ctx, task.ID) if err != nil { return nil, err } if len(refs) == 0 { return body, nil } messages := make([]any, 0, len(refs)) for _, ref := range refs { messages = append(messages, ref.Message) } body["messages"] = messages return body, nil } func (s *Service) slimTaskRequestSnapshot(task store.GatewayTask, body map[string]any) map[string]any { out := cloneMap(body) messageRefs := task.Request["messageRefs"] if messageRefs == nil { return out } delete(out, "messages") out["messageRefs"] = messageRefs for _, key := range []string{"conversationId", "conversationRecordId", "newMessageCount"} { if value := task.Request[key]; value != nil { out[key] = value } } return out } func (s *Service) slimParameterPreprocessingLog(task store.GatewayTask, log parameterPreprocessingLog) parameterPreprocessingLog { log.Input = s.slimTaskRequestSnapshot(task, log.Input) log.Output = s.slimTaskRequestSnapshot(task, log.Output) return log } type requestAssetHydrationStyle string const ( requestAssetHydrateURL requestAssetHydrationStyle = "url" requestAssetHydrateDataURL requestAssetHydrationStyle = "data_url" requestAssetHydrateRawBase64 requestAssetHydrationStyle = "raw_base64" requestAssetHydrateUnsupported requestAssetHydrationStyle = "unsupported" ) type requestAssetInputFormatFlag struct { Set bool Value bool } type requestAssetInputFormatSupport struct { URL requestAssetInputFormatFlag Base64 requestAssetInputFormatFlag } func (s *Service) hydrateProviderRequestAssets(ctx context.Context, body map[string]any, candidate store.RuntimeModelCandidate) (map[string]any, error) { value, err := s.hydrateProviderRequestAssetValue(ctx, body, nil, candidate) if err != nil { return nil, err } out, _ := value.(map[string]any) if out == nil { return map[string]any{}, nil } return out, nil } func (s *Service) hydrateProviderRequestAssetValue(ctx context.Context, value any, path []string, candidate store.RuntimeModelCandidate) (any, error) { switch typed := value.(type) { case map[string]any: if ref, ok := typed["assetRef"].(map[string]any); ok { return s.hydrateProviderRequestAssetRef(ctx, ref, path, candidate) } next := make(map[string]any, len(typed)) for key, item := range typed { hydrated, err := s.hydrateProviderRequestAssetValue(ctx, item, append(path, key), candidate) if err != nil { return nil, err } next[key] = hydrated } return next, nil case []any: next := make([]any, 0, len(typed)) for index, item := range typed { hydrated, err := s.hydrateProviderRequestAssetValue(ctx, item, append(path, fmt.Sprintf("[%d]", index)), candidate) if err != nil { return nil, err } next = append(next, hydrated) } return next, nil case string: return s.hydrateProviderRequestAssetString(ctx, typed, path, candidate) default: return value, nil } } func (s *Service) hydrateProviderRequestAssetRef(ctx context.Context, ref map[string]any, path []string, candidate store.RuntimeModelCandidate) (any, error) { asset, err := s.resolveRequestAsset(ctx, ref) if err != nil { return nil, err } switch requestAssetHydrationForField(path, asset, candidate) { case requestAssetHydrateUnsupported: return nil, requestAssetUnsupportedInputFormatError(path, asset) case requestAssetHydrateDataURL: payload, err := s.readRequestAssetBytes(ctx, asset) if err != nil { return nil, err } contentType := strings.TrimSpace(asset.ContentType) if contentType == "" { contentType = "application/octet-stream" } return "data:" + contentType + ";base64," + base64.StdEncoding.EncodeToString(payload), nil case requestAssetHydrateRawBase64: payload, err := s.readRequestAssetBytes(ctx, asset) if err != nil { return nil, err } return base64.StdEncoding.EncodeToString(payload), nil } if strings.TrimSpace(asset.URL) == "" { return nil, requestAssetExpiredError(asset) } return asset.URL, nil } func (s *Service) hydrateProviderRequestAssetString(ctx context.Context, value string, path []string, candidate store.RuntimeModelCandidate) (any, error) { raw := strings.TrimSpace(value) if raw == "" || !imageInputFieldNeedsHydration(path) { return value, nil } style, ok := requestAssetCapabilityHydrationForMedia("image", candidate, raw, "") if !ok { return value, nil } switch style { case requestAssetHydrateUnsupported: return nil, requestAssetUnsupportedInputFormatError(path, store.RequestAsset{URL: raw}) case requestAssetHydrateURL: return value, nil case requestAssetHydrateRawBase64: return s.hydrateProviderRequestAssetStringBase64(ctx, raw, false) case requestAssetHydrateDataURL: return s.hydrateProviderRequestAssetStringBase64(ctx, raw, true) default: return value, nil } } func (s *Service) hydrateProviderRequestAssetStringBase64(ctx context.Context, value string, withPrefix bool) (string, error) { if strings.HasPrefix(value, "data:") { if withPrefix { return value, nil } return stripDataURLPrefix(value), nil } if requestAssetStringLooksURL(value) { asset := store.RequestAsset{URL: value} payload, err := s.readRequestAssetBytes(ctx, asset) if err != nil { return "", err } contentType := http.DetectContentType(payload) if contentType == "" { contentType = "application/octet-stream" } encoded := base64.StdEncoding.EncodeToString(payload) if withPrefix { return "data:" + contentType + ";base64," + encoded, nil } return encoded, nil } payload, err := decodeBase64Payload(value) if err != nil { return "", &clients.ClientError{Code: "request_asset_input_format_unsupported", Message: "image input must be a URL or base64 payload for the selected model", Retryable: false} } encoded := base64.StdEncoding.EncodeToString(payload) if !withPrefix { return encoded, nil } contentType := http.DetectContentType(payload) if contentType == "" { contentType = "application/octet-stream" } return "data:" + contentType + ";base64," + encoded, nil } func (s *Service) resolveRequestAsset(ctx context.Context, ref map[string]any) (store.RequestAsset, error) { sha := stringFromAny(ref["sha256"]) contentType := stringFromAny(ref["contentType"]) asset := store.RequestAsset{ SHA256: sha, ContentType: contentType, URL: stringFromAny(ref["url"]), StorageProvider: stringFromAny(ref["storageProvider"]), } if size := floatFromAny(ref["size"]); size > 0 { asset.ByteSize = int64(size) } if expiresAt := stringFromAny(ref["expiresAt"]); expiresAt != "" { if parsed, err := time.Parse(time.RFC3339, expiresAt); err == nil { asset.ExpiresAt = &parsed } } if s.store != nil && sha != "" && contentType != "" { if stored, ok, err := s.store.FindRequestAsset(ctx, sha, contentType); err != nil && !store.IsUndefinedDatabaseObject(err) { return store.RequestAsset{}, err } else if ok { asset = stored } } if requestAssetIsExpired(asset, time.Now()) { return store.RequestAsset{}, requestAssetExpiredError(asset) } return asset, nil } func (s *Service) readRequestAssetBytes(ctx context.Context, asset store.RequestAsset) ([]byte, error) { if requestAssetIsExpired(asset, time.Now()) { return nil, requestAssetExpiredError(asset) } if strings.TrimSpace(asset.LocalPath) != "" { payload, err := os.ReadFile(asset.LocalPath) if err != nil { return nil, requestAssetExpiredError(asset) } return payload, nil } if localPath := s.localPathFromRequestAssetURL(asset.URL); localPath != "" { payload, err := os.ReadFile(localPath) if err != nil { return nil, requestAssetExpiredError(asset) } return payload, nil } if strings.HasPrefix(asset.URL, "http://") || strings.HasPrefix(asset.URL, "https://") { req, err := http.NewRequestWithContext(ctx, http.MethodGet, asset.URL, nil) if err != nil { return nil, err } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, &clients.ClientError{Code: "request_asset_fetch_failed", Message: err.Error(), Retryable: true} } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { return nil, &clients.ClientError{Code: "request_asset_fetch_failed", Message: resp.Status, StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode)} } payload, err := io.ReadAll(io.LimitReader(resp.Body, 256<<20)) if err != nil { return nil, &clients.ClientError{Code: "request_asset_fetch_failed", Message: err.Error(), Retryable: true} } return payload, nil } return nil, requestAssetExpiredError(asset) } func (s *Service) localPathFromRequestAssetURL(value string) string { raw := strings.TrimSpace(value) if raw == "" { return "" } pathValue := raw if parsed, err := url.Parse(raw); err == nil && parsed.Path != "" { pathValue = parsed.Path } const uploadedPrefix = "/static/uploaded/" if !strings.HasPrefix(pathValue, uploadedPrefix) { return "" } fileName := filepath.Base(strings.TrimPrefix(pathValue, uploadedPrefix)) if !strings.HasPrefix(fileName, "gateway-request-asset-") { return "" } storageDir := strings.TrimSpace(s.cfg.LocalUploadedStorageDir) if storageDir == "" { storageDir = config.DefaultLocalUploadedStorageDir } return filepath.Join(storageDir, fileName) } func requestAssetHydrationForField(path []string, asset store.RequestAsset, candidate store.RuntimeModelCandidate) requestAssetHydrationStyle { if providerFieldNeedsRawBase64(path) { return requestAssetHydrateRawBase64 } if requestAssetMediaKindForHydration(path, asset) == "image" { if style, ok := requestAssetCapabilityHydrationForMedia("image", candidate, asset.URL, asset.StorageProvider); ok { return style } } if mediaURLFieldNeedsHydration(path) { if style := configuredRequestAssetMediaURLHydration(candidate, requestAssetMediaURLKind(path)); style != "" { return style } if providerMediaURLNeedsDataURL(candidate) { return requestAssetHydrateDataURL } } return requestAssetHydrateURL } func requestAssetMediaKindForHydration(path []string, asset store.RequestAsset) string { if mediaURLFieldNeedsHydration(path) { return requestAssetMediaURLKind(path) } if imageInputFieldNeedsHydration(path) { return "image" } return "" } func requestAssetCapabilityHydrationForMedia(kind string, candidate store.RuntimeModelCandidate, urlValue string, storageProvider string) (requestAssetHydrationStyle, bool) { if kind != "image" { return "", false } support := requestAssetInputFormatSupportForCandidate(candidate) if !support.URL.Set && !support.Base64.Set { return "", false } hasPublicURL := requestAssetURLIsPublic(storageProvider, urlValue) if support.URL.Set && !support.URL.Value && support.Base64.Set && !support.Base64.Value { return requestAssetHydrateUnsupported, true } if support.URL.Set && !support.URL.Value { if support.Base64.Set && support.Base64.Value { return requestAssetBase64CapabilityHydration(candidate), true } return requestAssetBase64CapabilityHydration(candidate), true } if support.Base64.Set && !support.Base64.Value { if support.URL.Set && support.URL.Value { if hasPublicURL { return requestAssetHydrateURL, true } return requestAssetHydrateUnsupported, true } if hasPublicURL { return requestAssetHydrateURL, true } return requestAssetHydrateUnsupported, true } if support.URL.Set && support.URL.Value && hasPublicURL { return requestAssetHydrateURL, true } if support.URL.Set && support.URL.Value { return requestAssetBase64CapabilityHydration(candidate), true } if support.Base64.Set && support.Base64.Value { return requestAssetBase64CapabilityHydration(candidate), true } return "", false } func requestAssetBase64CapabilityHydration(candidate store.RuntimeModelCandidate) requestAssetHydrationStyle { switch configuredRequestAssetMediaURLHydration(candidate, "image") { case requestAssetHydrateRawBase64: return requestAssetHydrateRawBase64 case requestAssetHydrateDataURL: return requestAssetHydrateDataURL default: return requestAssetHydrateDataURL } } func requestAssetInputFormatSupportForCandidate(candidate store.RuntimeModelCandidate) requestAssetInputFormatSupport { support := requestAssetInputFormatSupport{} for _, values := range requestAssetCandidateCapabilityMaps(candidate) { mergeRequestAssetInputFormatSupport(&support, values) if support.URL.Set || support.Base64.Set { return support } } mergeRequestAssetInputFormatSupport(&support, candidate.PlatformConfig) return support } func requestAssetCandidateCapabilityMaps(candidate store.RuntimeModelCandidate) []map[string]any { capabilities := effectiveModelCapability(candidate) out := make([]map[string]any, 0) seen := map[string]bool{} for _, modelType := range []string{candidate.ModelType, "image_edit", "image_generate", "image_analysis", "image_to_video", "video_generate", "omni_video", "omni"} { modelType = strings.TrimSpace(modelType) if modelType == "" || seen[modelType] { continue } seen[modelType] = true if values := capabilityForType(capabilities, modelType); len(values) > 0 { out = append(out, values) } } if len(capabilities) > 0 { out = append(out, capabilities) } return out } func mergeRequestAssetInputFormatSupport(support *requestAssetInputFormatSupport, values map[string]any) { if values == nil { return } if value, ok := requestAssetBoolSetting(values, "support_url_input", "supportUrlInput", "support_url", "supportUrl", "url_input", "urlInput", "supportURLInput"); ok { support.URL = requestAssetInputFormatFlag{Set: true, Value: value} } if value, ok := requestAssetBoolSetting(values, "support_base64_input", "supportBase64Input", "support_base64", "supportBase64", "base64_input", "base64Input", "supportBase64"); ok { support.Base64 = requestAssetInputFormatFlag{Set: true, Value: value} } } func requestAssetBoolSetting(values map[string]any, keys ...string) (bool, bool) { for _, key := range keys { value, ok := values[key] if !ok { continue } switch typed := value.(type) { case bool: return typed, true case string: switch strings.ToLower(strings.TrimSpace(typed)) { case "true", "1", "yes", "y", "on", "enabled", "enable": return true, true case "false", "0", "no", "n", "off", "disabled", "disable": return false, true } } } return false, false } func imageInputFieldNeedsHydration(path []string) bool { key, parent := requestAssetFieldPath(path) switch key { case "image", "images", "image_url", "imageurl", "image_urls", "imageurls", "mask", "first_frame", "firstframe", "last_frame", "lastframe", "reference_image", "referenceimage", "reference_images", "referenceimages", "frontal_image_url", "frontalimageurl", "front_image_url", "frontimageurl", "refer_images", "referimages": return true case "url": switch parent { case "image_url", "imageurl", "image_urls", "imageurls", "refer_images", "referimages": return true } } return false } func requestAssetStringLooksURL(value string) bool { lower := strings.ToLower(strings.TrimSpace(value)) return strings.HasPrefix(lower, "http://") || strings.HasPrefix(lower, "https://") || strings.HasPrefix(lower, "/static/uploaded/") } func requestAssetURLIsPublic(storageProvider string, value string) bool { if strings.EqualFold(strings.TrimSpace(storageProvider), "local_static") { return false } raw := strings.TrimSpace(value) if raw == "" { return false } parsed, err := url.Parse(raw) if err != nil { return false } if !strings.EqualFold(parsed.Scheme, "http") && !strings.EqualFold(parsed.Scheme, "https") { return false } return requestAssetHostIsPublic(parsed.Hostname()) } func requestAssetHostIsPublic(host string) bool { normalized := strings.ToLower(strings.Trim(strings.TrimSpace(host), "[]")) if normalized == "" || normalized == "localhost" || strings.HasSuffix(normalized, ".local") { return false } if addr, err := netip.ParseAddr(normalized); err == nil { return !addr.IsPrivate() && !addr.IsLoopback() && !addr.IsLinkLocalUnicast() && !addr.IsLinkLocalMulticast() && !addr.IsUnspecified() } return strings.Contains(normalized, ".") } func mediaURLFieldNeedsHydration(path []string) bool { key, parent := requestAssetFieldPath(path) return key == "url" && (parent == "image_url" || parent == "audio_url" || parent == "video_url" || parent == "file_url") } func providerFieldNeedsRawBase64(path []string) bool { key, parent := requestAssetFieldPath(path) return key == "b64_json" || key == "base64" || key == "b64" || strings.Contains(key, "base64") || strings.Contains(key, "_b64") || (parent == "input_audio" && key == "data") } func requestAssetMediaURLKind(path []string) string { _, parent := requestAssetFieldPath(path) switch parent { case "image_url": return "image" case "audio_url": return "audio" case "video_url": return "video" case "file_url": return "file" default: return "" } } func configuredRequestAssetMediaURLHydration(candidate store.RuntimeModelCandidate, kind string) requestAssetHydrationStyle { keys := []string{} switch kind { case "image": keys = append(keys, "requestAssetImageURLFormat", "request_asset_image_url_format") case "audio": keys = append(keys, "requestAssetAudioURLFormat", "request_asset_audio_url_format") case "video": keys = append(keys, "requestAssetVideoURLFormat", "request_asset_video_url_format") case "file": keys = append(keys, "requestAssetFileURLFormat", "request_asset_file_url_format") } keys = append(keys, "requestAssetMediaURLFormat", "request_asset_media_url_format", "mediaURLAssetFormat", "media_url_asset_format", ) for _, key := range keys { if style := requestAssetHydrationStyleFromString(stringFromAny(candidate.PlatformConfig[key])); style != "" { return style } } return "" } func requestAssetHydrationStyleFromString(value string) requestAssetHydrationStyle { normalized := strings.ToLower(strings.TrimSpace(value)) normalized = strings.ReplaceAll(normalized, "-", "_") normalized = strings.ReplaceAll(normalized, " ", "_") switch normalized { case "url", "remote_url", "public_url": return requestAssetHydrateURL case "data_url", "dataurl", "prefixed_base64", "base64_with_prefix", "base64_with_data_url_prefix": return requestAssetHydrateDataURL case "raw_base64", "base64", "bare_base64", "naked_base64": return requestAssetHydrateRawBase64 default: return "" } } func providerMediaURLNeedsDataURL(candidate store.RuntimeModelCandidate) bool { for _, name := range []string{candidate.Provider, candidate.SpecType, candidate.PlatformKey} { switch normalizeProviderKey(name) { case "openai", "volces", "volces_openai", "gemini", "vidu": return true } } return false } func normalizeProviderKey(value string) string { normalized := strings.ToLower(strings.TrimSpace(value)) normalized = strings.ReplaceAll(normalized, "-", "_") normalized = strings.ReplaceAll(normalized, " ", "_") return normalized } func requestAssetFieldPath(path []string) (string, string) { if len(path) == 0 { return "", "" } names := make([]string, 0, len(path)) for _, segment := range path { name := strings.ToLower(strings.TrimSpace(strings.Trim(segment, "[]"))) if name == "" || requestAssetPathSegmentIsIndex(name) { continue } names = append(names, name) } if len(names) == 0 { return "", "" } key := names[len(names)-1] parent := "" if len(names) > 1 { parent = names[len(names)-2] } return key, parent } func requestAssetPathSegmentIsIndex(value string) bool { if value == "" { return false } for _, char := range value { if char < '0' || char > '9' { return false } } return true } func requestAssetIsExpired(asset store.RequestAsset, now time.Time) bool { if asset.ExpiredAt != nil { return true } if asset.ExpiresAt != nil && !asset.ExpiresAt.After(now) { return true } return false } func requestAssetExpiredError(asset store.RequestAsset) error { message := "request asset is expired or unavailable" if asset.SHA256 != "" { message = "request asset is expired or unavailable: " + asset.SHA256 } return &clients.ClientError{Code: "request_asset_expired", Message: message, Retryable: false} } func requestAssetUnsupportedInputFormatError(path []string, asset store.RequestAsset) error { field := strings.Join(path, ".") if field == "" { field = "media" } message := "selected model does not support URL or base64 input for " + field if asset.SHA256 != "" { message += ": " + asset.SHA256 } return &clients.ClientError{Code: "request_asset_input_format_unsupported", Message: message, Retryable: false} }