package runner import ( "context" "encoding/base64" "fmt" "io" "net/http" "net/url" "os" "path/filepath" "strings" "time" "github.com/easyai/easyai-ai-gateway/apps/api/internal/clients" "github.com/easyai/easyai-ai-gateway/apps/api/internal/config" "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" ) func (s *Service) restoreTaskRequestReferences(ctx context.Context, task store.GatewayTask) (map[string]any, error) { body := cloneMap(task.Request) if body["messages"] != nil || body["messageRefs"] == nil || s.store == nil { return body, nil } refs, err := s.store.ListTaskConversationMessages(ctx, task.ID) if err != nil { return nil, err } if len(refs) == 0 { return body, nil } messages := make([]any, 0, len(refs)) for _, ref := range refs { messages = append(messages, ref.Message) } body["messages"] = messages return body, nil } func (s *Service) slimTaskRequestSnapshot(task store.GatewayTask, body map[string]any) map[string]any { out := cloneMap(body) messageRefs := task.Request["messageRefs"] if messageRefs == nil { return out } delete(out, "messages") out["messageRefs"] = messageRefs for _, key := range []string{"conversationId", "conversationRecordId", "newMessageCount"} { if value := task.Request[key]; value != nil { out[key] = value } } return out } func (s *Service) slimParameterPreprocessingLog(task store.GatewayTask, log parameterPreprocessingLog) parameterPreprocessingLog { log.Input = s.slimTaskRequestSnapshot(task, log.Input) log.Output = s.slimTaskRequestSnapshot(task, log.Output) return log } func (s *Service) hydrateProviderRequestAssets(ctx context.Context, body map[string]any) (map[string]any, error) { value, err := s.hydrateProviderRequestAssetValue(ctx, body, nil) if err != nil { return nil, err } out, _ := value.(map[string]any) if out == nil { return map[string]any{}, nil } return out, nil } func (s *Service) hydrateProviderRequestAssetValue(ctx context.Context, value any, path []string) (any, error) { switch typed := value.(type) { case map[string]any: if ref, ok := typed["assetRef"].(map[string]any); ok { return s.hydrateProviderRequestAssetRef(ctx, ref, path) } next := make(map[string]any, len(typed)) for key, item := range typed { hydrated, err := s.hydrateProviderRequestAssetValue(ctx, item, append(path, key)) if err != nil { return nil, err } next[key] = hydrated } return next, nil case []any: next := make([]any, 0, len(typed)) for index, item := range typed { hydrated, err := s.hydrateProviderRequestAssetValue(ctx, item, append(path, fmt.Sprintf("[%d]", index))) if err != nil { return nil, err } next = append(next, hydrated) } return next, nil default: return value, nil } } func (s *Service) hydrateProviderRequestAssetRef(ctx context.Context, ref map[string]any, path []string) (any, error) { asset, err := s.resolveRequestAsset(ctx, ref) if err != nil { return nil, err } if providerFieldNeedsBase64(path) { payload, err := s.readRequestAssetBytes(ctx, asset) if err != nil { return nil, err } return base64.StdEncoding.EncodeToString(payload), nil } if strings.TrimSpace(asset.URL) == "" { return nil, requestAssetExpiredError(asset) } return asset.URL, nil } func (s *Service) resolveRequestAsset(ctx context.Context, ref map[string]any) (store.RequestAsset, error) { sha := stringFromAny(ref["sha256"]) contentType := stringFromAny(ref["contentType"]) asset := store.RequestAsset{ SHA256: sha, ContentType: contentType, URL: stringFromAny(ref["url"]), StorageProvider: stringFromAny(ref["storageProvider"]), } if size := floatFromAny(ref["size"]); size > 0 { asset.ByteSize = int64(size) } if expiresAt := stringFromAny(ref["expiresAt"]); expiresAt != "" { if parsed, err := time.Parse(time.RFC3339, expiresAt); err == nil { asset.ExpiresAt = &parsed } } if s.store != nil && sha != "" && contentType != "" { if stored, ok, err := s.store.FindRequestAsset(ctx, sha, contentType); err != nil && !store.IsUndefinedDatabaseObject(err) { return store.RequestAsset{}, err } else if ok { asset = stored } } if requestAssetIsExpired(asset, time.Now()) { return store.RequestAsset{}, requestAssetExpiredError(asset) } return asset, nil } func (s *Service) readRequestAssetBytes(ctx context.Context, asset store.RequestAsset) ([]byte, error) { if requestAssetIsExpired(asset, time.Now()) { return nil, requestAssetExpiredError(asset) } if strings.TrimSpace(asset.LocalPath) != "" { payload, err := os.ReadFile(asset.LocalPath) if err != nil { return nil, requestAssetExpiredError(asset) } return payload, nil } if localPath := s.localPathFromRequestAssetURL(asset.URL); localPath != "" { payload, err := os.ReadFile(localPath) if err != nil { return nil, requestAssetExpiredError(asset) } return payload, nil } if strings.HasPrefix(asset.URL, "http://") || strings.HasPrefix(asset.URL, "https://") { req, err := http.NewRequestWithContext(ctx, http.MethodGet, asset.URL, nil) if err != nil { return nil, err } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, &clients.ClientError{Code: "request_asset_fetch_failed", Message: err.Error(), Retryable: true} } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { return nil, &clients.ClientError{Code: "request_asset_fetch_failed", Message: resp.Status, StatusCode: resp.StatusCode, Retryable: clients.HTTPRetryable(resp.StatusCode)} } payload, err := io.ReadAll(io.LimitReader(resp.Body, 256<<20)) if err != nil { return nil, &clients.ClientError{Code: "request_asset_fetch_failed", Message: err.Error(), Retryable: true} } return payload, nil } return nil, requestAssetExpiredError(asset) } func (s *Service) localPathFromRequestAssetURL(value string) string { raw := strings.TrimSpace(value) if raw == "" { return "" } pathValue := raw if parsed, err := url.Parse(raw); err == nil && parsed.Path != "" { pathValue = parsed.Path } const uploadedPrefix = "/static/uploaded/" if !strings.HasPrefix(pathValue, uploadedPrefix) { return "" } fileName := filepath.Base(strings.TrimPrefix(pathValue, uploadedPrefix)) if !strings.HasPrefix(fileName, "gateway-request-asset-") { return "" } storageDir := strings.TrimSpace(s.cfg.LocalUploadedStorageDir) if storageDir == "" { storageDir = config.DefaultLocalUploadedStorageDir } return filepath.Join(storageDir, fileName) } func providerFieldNeedsBase64(path []string) bool { if len(path) == 0 { return false } key := strings.ToLower(strings.Trim(path[len(path)-1], "[]")) parent := "" if len(path) > 1 { parent = strings.ToLower(strings.Trim(path[len(path)-2], "[]")) } return key == "b64_json" || key == "base64" || key == "b64" || strings.Contains(key, "base64") || strings.Contains(key, "_b64") || (parent == "input_audio" && key == "data") } func requestAssetIsExpired(asset store.RequestAsset, now time.Time) bool { if asset.ExpiredAt != nil { return true } if asset.ExpiresAt != nil && !asset.ExpiresAt.After(now) { return true } return false } func requestAssetExpiredError(asset store.RequestAsset) error { message := "request asset is expired or unavailable" if asset.SHA256 != "" { message = "request asset is expired or unavailable: " + asset.SHA256 } return &clients.ClientError{Code: "request_asset_expired", Message: message, Retryable: false} }