package clients import ( "bytes" "context" "encoding/base64" "encoding/json" "fmt" "io" "math" "net/http" "sort" "strings" "time" "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" "github.com/golang-jwt/jwt/v5" ) type KelingClient struct { HTTPClient *http.Client } type kelingPreparedTask struct { Endpoint string Payload map[string]any RemoteTaskPayload map[string]any CleanupElementIDs []string } func (c KelingClient) Run(ctx context.Context, request Request) (Response, error) { if request.Kind != "videos.generations" { return Response{}, &ClientError{Code: "unsupported_kind", Message: "unsupported keling request kind", Retryable: false} } token, err := kelingAuthToken(request.Candidate) if err != nil { return Response{}, err } return c.runVideo(ctx, request, token) } func (c KelingClient) runVideo(ctx context.Context, request Request, token string) (Response, error) { submitStartedAt := time.Now() submitRequestID := strings.TrimSpace(request.RemoteTaskID) upstreamTaskID := strings.TrimSpace(request.RemoteTaskID) prepared := kelingResumePreparedTask(request) if upstreamTaskID == "" { var err error prepared, err = c.prepareVideoTask(ctx, request, token) if err != nil { return Response{}, err } } defer func() { if upstreamTaskID == "" { _ = c.cleanupKelingElements(context.WithoutCancel(ctx), request, token, prepared.CleanupElementIDs) } }() if upstreamTaskID == "" { submitResult, requestID, err := c.postJSON(ctx, request, prepared.Endpoint, token, prepared.Payload) submitRequestID = requestID if err != nil { return Response{}, annotateResponseError(err, submitRequestID, submitStartedAt, time.Now()) } upstreamTaskID = strings.TrimSpace(stringFromAny(kelingData(submitResult)["task_id"])) if upstreamTaskID == "" { _ = c.cleanupKelingElements(context.WithoutCancel(ctx), request, token, prepared.CleanupElementIDs) return Response{}, &ClientError{Code: "invalid_response", Message: "keling video task id is missing", RequestID: submitRequestID, Retryable: false} } prepared.RemoteTaskPayload["submit"] = submitResult if request.OnRemoteTaskSubmitted != nil { if err := request.OnRemoteTaskSubmitted(upstreamTaskID, prepared.RemoteTaskPayload); err != nil { return Response{}, err } } } pollEndpoint := kelingPollEndpoint(request, prepared.Endpoint) interval := kelingPollInterval(request) timeout := kelingPollTimeout(request) deadline := time.NewTimer(timeout) defer deadline.Stop() ticker := time.NewTicker(interval) defer ticker.Stop() var lastResult map[string]any for { select { case <-ctx.Done(): return Response{}, &ClientError{Code: "cancelled", Message: ctx.Err().Error(), RequestID: submitRequestID, Retryable: true} default: } pollStartedAt := time.Now() pollResult, pollRequestID, err := c.getJSON(ctx, request, pollEndpoint+"/"+upstreamTaskID, token) pollFinishedAt := time.Now() requestID := firstNonEmpty(pollRequestID, submitRequestID, upstreamTaskID) if err != nil { return Response{}, annotateResponseError(err, requestID, pollStartedAt, pollFinishedAt) } lastResult = pollResult switch kelingTaskStatus(pollResult) { case "succeed": _ = c.cleanupKelingElements(context.WithoutCancel(ctx), request, token, prepared.CleanupElementIDs) prepared.CleanupElementIDs = nil result := kelingVideoSuccessResult(request, upstreamTaskID, pollResult) return Response{ Result: result, RequestID: requestID, Progress: kelingVideoProgress(request, upstreamTaskID), ResponseStartedAt: submitStartedAt, ResponseFinishedAt: pollFinishedAt, ResponseDurationMS: responseDurationMS(submitStartedAt, pollFinishedAt), }, nil case "failed": _ = c.cleanupKelingElements(context.WithoutCancel(ctx), request, token, prepared.CleanupElementIDs) prepared.CleanupElementIDs = nil return Response{}, &ClientError{ Code: kelingTaskErrorCode(pollResult), Message: kelingTaskErrorMessage(request.Candidate, pollResult), RequestID: requestID, ResponseStartedAt: submitStartedAt, ResponseFinishedAt: pollFinishedAt, ResponseDurationMS: responseDurationMS(submitStartedAt, pollFinishedAt), Retryable: false, } } select { case <-ctx.Done(): return Response{}, &ClientError{Code: "cancelled", Message: ctx.Err().Error(), RequestID: requestID, Retryable: true} case <-deadline.C: return Response{}, &ClientError{ Code: "timeout", Message: fmt.Sprintf("keling video task %s did not finish before timeout; last status: %s", upstreamTaskID, kelingTaskStatus(lastResult)), RequestID: requestID, Retryable: true, } case <-ticker.C: } } } func (c KelingClient) prepareVideoTask(ctx context.Context, request Request, token string) (kelingPreparedTask, error) { if kelingIsOmniRequest(request) { payload, cleanupIDs, err := c.kelingOmniPayload(ctx, request, token) if err != nil { return kelingPreparedTask{}, err } return kelingPreparedTask{ Endpoint: "/videos/omni-video", Payload: payload, RemoteTaskPayload: map[string]any{"endpoint": "/videos/omni-video", "mode": "omni_video", "cleanupElementIds": cleanupIDs}, CleanupElementIDs: cleanupIDs, }, nil } payload, taskType, err := kelingVideoPayload(ctx, request) if err != nil { return kelingPreparedTask{}, err } endpoint := "/videos/" + taskType return kelingPreparedTask{ Endpoint: endpoint, Payload: payload, RemoteTaskPayload: map[string]any{"endpoint": endpoint, "taskType": taskType}, }, nil } func kelingResumePreparedTask(request Request) kelingPreparedTask { endpoint := "" for _, key := range []string{"endpoint", "pollEndpoint"} { if value := strings.TrimSpace(stringFromAny(request.RemoteTaskPayload[key])); value != "" { endpoint = value break } } if endpoint == "" { if kelingIsOmniRequest(request) { endpoint = "/videos/omni-video" } else { endpoint = "/videos/" + kelingTaskTypeFromRequest(request) } } return kelingPreparedTask{Endpoint: endpoint, RemoteTaskPayload: map[string]any{"endpoint": endpoint}} } func kelingVideoPayload(ctx context.Context, request Request) (map[string]any, string, error) { body := cleanProviderBody(request.Body) content := contentItems(body["content"]) if len(content) == 0 { content = buildVolcesContentFromBody(body) } prompt := firstKelingPrompt(content) if prompt == "" { return nil, "", &ClientError{Code: "invalid_parameter", Message: "keling video prompt is required", StatusCode: 400, Retryable: false} } firstFrame, lastFrame, referenceImages := kelingImageInputs(content) isImage2Video := firstFrame != "" || lastFrame != "" || len(referenceImages) > 0 primaryImage := firstFrame if primaryImage == "" && len(referenceImages) <= 1 && len(referenceImages) > 0 { primaryImage = referenceImages[0] } if primaryImage == "" { primaryImage = lastFrame } payload := map[string]any{ "prompt": prompt, "model_name": upstreamModelName(request.Candidate), "duration": fmtDuration(body["duration"], 5), } if value := strings.TrimSpace(stringFromAny(body["negative_prompt"])); value != "" { payload["negative_prompt"] = value } if value, ok := body["cfg_scale"]; ok && numericValue(value, 0) > 0 { payload["cfg_scale"] = value } if boolValue(body, "audio") || boolValue(body, "output_audio") { payload["sound"] = "on" } if mode := kelingModeByResolution(firstNonEmptyStringValue(body, "resolution", "size")); mode != "" { payload["mode"] = mode } if ratio := strings.TrimSpace(firstNonEmptyStringValue(body, "aspect_ratio", "aspectRatio", "ratio")); strings.Contains(ratio, ":") { payload["aspect_ratio"] = ratio } if camera := kelingCameraControl(body); camera != nil { payload["camera_control"] = camera } if primaryImage != "" { encoded, err := kelingImageToBase64(ctx, request, primaryImage) if err != nil { return nil, "", err } payload["image"] = encoded } if lastFrame != "" { encoded, err := kelingImageToBase64(ctx, request, lastFrame) if err != nil { return nil, "", err } payload["image_tail"] = encoded } if len(referenceImages) > 0 { imageList := make([]any, 0, len(referenceImages)) for _, url := range referenceImages { encoded, err := kelingImageToBase64(ctx, request, url) if err != nil { return nil, "", err } imageList = append(imageList, map[string]any{"image": encoded}) } payload["image_list"] = imageList } if !strings.Contains(stringFromAny(payload["aspect_ratio"]), ":") || isImage2Video { delete(payload, "aspect_ratio") } taskType := "text2video" if primaryImage != "" { taskType = "image2video" } else if len(referenceImages) > 1 { taskType = "multi-image2video" } return payload, taskType, nil } func kelingTaskTypeFromRequest(request Request) string { body := cleanProviderBody(request.Body) content := contentItems(body["content"]) if len(content) == 0 { content = buildVolcesContentFromBody(body) } firstFrame, lastFrame, referenceImages := kelingImageInputs(content) if firstFrame != "" || lastFrame != "" || len(referenceImages) == 1 { return "image2video" } if len(referenceImages) > 1 { return "multi-image2video" } return "text2video" } func (c KelingClient) kelingOmniPayload(ctx context.Context, request Request, token string) (map[string]any, []string, error) { body := cleanProviderBody(request.Body) content := contentItems(body["content"]) if len(content) == 0 { content = buildVolcesContentFromBody(body) } prompt := firstKelingPrompt(content) images := kelingOmniImageList(content) videos := kelingOmniVideoList(content) uploadedElementIDs := make([]string, 0) elements, createdIDs, err := c.kelingOmniElementList(ctx, request, token, content) if err != nil { return nil, nil, err } uploadedElementIDs = append(uploadedElementIDs, createdIDs...) shots := kelingShotPrompts(content) hasMultiPrompt := len(shots) > 0 hasVideo := len(videos) > 0 hasVideoEdit := kelingHasBaseVideo(videos) hasFirstFrame := kelingHasFirstFrame(images) payload := map[string]any{ "model_name": upstreamModelName(request.Candidate), "mode": kelingModeByResolution(firstNonEmptyStringValue(body, "resolution", "size")), "watermark_info": map[string]any{"enabled": false}, "negative_prompt": strings.TrimSpace(stringFromAny(body["negative_prompt"])), } if !hasMultiPrompt { payload["prompt"] = prompt if body["duration"] != nil { payload["duration"] = fmtDuration(body["duration"], 0) } } if ratio := strings.TrimSpace(firstNonEmptyStringValue(body, "aspect_ratio", "aspectRatio", "ratio")); strings.Contains(ratio, ":") { payload["aspect_ratio"] = ratio } if len(images) > 0 { payload["image_list"] = images } if len(videos) > 0 { payload["video_list"] = videos } if len(elements) > 0 { payload["element_list"] = elements } if (boolValue(body, "audio") || boolValue(body, "output_audio")) && !hasVideo { payload["sound"] = "on" } if hasMultiPrompt { payload["multi_shot"] = true payload["shot_type"] = "customize" total := 0.0 multiPrompt := make([]any, 0, len(shots)) for index, shot := range shots { duration := shot.duration if duration <= 0 { duration = 5 } total += duration multiPrompt = append(multiPrompt, map[string]any{ "index": index + 1, "prompt": shot.text, "duration": fmtDuration(duration, 5), }) } delete(payload, "prompt") payload["multi_prompt"] = multiPrompt payload["duration"] = fmtDuration(total, 0) } deleteEmptyStringFields(payload) if hasVideoEdit { delete(payload, "duration") delete(payload, "aspect_ratio") } if hasVideo && !hasVideoEdit && !strings.Contains(stringFromAny(payload["aspect_ratio"]), ":") { payload["aspect_ratio"] = "16:9" } if !hasVideoEdit && !hasFirstFrame && !strings.Contains(stringFromAny(payload["aspect_ratio"]), ":") { payload["aspect_ratio"] = "16:9" } return payload, uploadedElementIDs, nil } func (c KelingClient) kelingOmniElementList(ctx context.Context, request Request, token string, content []map[string]any) ([]any, []string, error) { elements := make([]any, 0) createdIDs := make([]string, 0) for _, item := range content { if stringFromAny(item["type"]) != "element" { continue } element := mapFromAny(item["element"]) if element == nil { continue } if id := kelingStringFromAny(firstPresent(element["element_id"], element["id"])); id != "" { elements = append(elements, map[string]any{"element_id": id}) continue } inline := mapFromAny(element["inline_element"]) if inline == nil { continue } payload := kelingCreateElementPayload(inline) if payload == nil { continue } id, err := c.createKelingElement(ctx, request, token, payload) if err != nil { return nil, createdIDs, err } elements = append(elements, map[string]any{"element_id": id}) createdIDs = append(createdIDs, id) } return elements, createdIDs, nil } func (c KelingClient) postJSON(ctx context.Context, request Request, path string, token string, body map[string]any) (map[string]any, string, error) { raw, _ := json.Marshal(body) req, err := http.NewRequestWithContext(ctx, http.MethodPost, joinURL(request.Candidate.BaseURL, path), bytes.NewReader(raw)) if err != nil { return nil, "", err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+token) resp, err := httpClient(request.HTTPClient, c.HTTPClient).Do(req) if err != nil { return nil, "", &ClientError{Code: "network", Message: err.Error(), Retryable: true} } requestID := requestIDFromHTTPResponse(resp) result, err := decodeHTTPResponse(resp) if err != nil { return result, requestID, err } if code := intFromAny(result["code"]); code != 0 { return result, requestID, &ClientError{Code: kelingEnvelopeErrorCode(result), Message: kelingEnvelopeErrorMessage(result), RequestID: firstNonEmpty(requestID, stringFromAny(result["request_id"])), Retryable: false} } return result, firstNonEmpty(requestID, stringFromAny(result["request_id"])), nil } func (c KelingClient) getJSON(ctx context.Context, request Request, path string, token string) (map[string]any, string, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, joinURL(request.Candidate.BaseURL, path), nil) if err != nil { return nil, "", err } req.Header.Set("Authorization", "Bearer "+token) resp, err := httpClient(request.HTTPClient, c.HTTPClient).Do(req) if err != nil { return nil, "", &ClientError{Code: "network", Message: err.Error(), Retryable: true} } requestID := requestIDFromHTTPResponse(resp) result, err := decodeHTTPResponse(resp) if err != nil { return result, requestID, err } if code := intFromAny(result["code"]); code != 0 { return result, requestID, &ClientError{Code: kelingEnvelopeErrorCode(result), Message: kelingEnvelopeErrorMessage(result), RequestID: firstNonEmpty(requestID, stringFromAny(result["request_id"])), Retryable: false} } return result, firstNonEmpty(requestID, stringFromAny(result["request_id"])), nil } func (c KelingClient) createKelingElement(ctx context.Context, request Request, token string, payload map[string]any) (string, error) { raw, _ := json.Marshal(payload) req, err := http.NewRequestWithContext(ctx, http.MethodPost, joinURL(request.Candidate.BaseURL, "/general/custom-elements"), bytes.NewReader(raw)) if err != nil { return "", err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+token) resp, err := httpClient(request.HTTPClient, c.HTTPClient).Do(req) if err != nil { return "", &ClientError{Code: "network", Message: err.Error(), Retryable: true} } defer resp.Body.Close() body, _ := io.ReadAll(io.LimitReader(resp.Body, 16*1024*1024)) if resp.StatusCode < 200 || resp.StatusCode >= 300 { return "", &ClientError{Code: statusCodeName(resp.StatusCode), Message: errorMessage(body, resp.Status), StatusCode: resp.StatusCode, RequestID: requestIDFromHTTPResponse(resp), Retryable: HTTPRetryable(resp.StatusCode)} } var parsed struct { Code int `json:"code"` Message string `json:"message"` RequestID string `json:"request_id"` Data map[string]any `json:"data"` } decoder := json.NewDecoder(bytes.NewReader(body)) decoder.UseNumber() if err := decoder.Decode(&parsed); err != nil { return "", &ClientError{Code: "invalid_response", Message: err.Error(), Retryable: false} } if parsed.Code != 0 { return "", &ClientError{Code: "keling_element_create_failed", Message: parsed.Message, RequestID: parsed.RequestID, Retryable: false} } id := kelingStringFromAny(parsed.Data["element_id"]) if id == "" { return "", &ClientError{Code: "invalid_response", Message: "keling element id is missing", RequestID: parsed.RequestID, Retryable: false} } return id, nil } func (c KelingClient) cleanupKelingElements(ctx context.Context, request Request, token string, elementIDs []string) error { for _, id := range elementIDs { id = strings.TrimSpace(id) if id == "" { continue } _, _, _ = c.postJSON(ctx, request, "/general/delete-elements", token, map[string]any{"element_id": id}) } return nil } func kelingAuthToken(candidate store.RuntimeModelCandidate) (string, error) { apiKey := credential(candidate.Credentials, "apiKey", "api_key", "key", "token") accessKey := credential(candidate.Credentials, "accessKey", "access_key", "ak") secretKey := credential(candidate.Credentials, "secretKey", "secret_key", "sk") if accessKey != "" || secretKey != "" || strings.EqualFold(strings.TrimSpace(candidate.AuthType), "AccessKey-SecretKey") { if accessKey == "" || secretKey == "" { return "", &ClientError{Code: "missing_credentials", Message: "keling accessKey and secretKey are required", Retryable: false} } now := time.Now() claims := jwt.MapClaims{ "iss": accessKey, "exp": now.Add(30 * time.Minute).Unix(), "nbf": now.Add(-5 * time.Second).Unix(), } token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) signed, err := token.SignedString([]byte(secretKey)) if err != nil { return "", &ClientError{Code: "auth_failed", Message: err.Error(), Retryable: false} } return signed, nil } if apiKey == "" { return "", &ClientError{Code: "missing_credentials", Message: "keling api key is required", Retryable: false} } return apiKey, nil } func kelingImageToBase64(ctx context.Context, request Request, value string) (string, error) { value = strings.TrimSpace(value) if value == "" { return "", nil } if strings.HasPrefix(value, "data:") { parts := strings.SplitN(value, ",", 2) if len(parts) == 2 { return strings.TrimSpace(parts[1]), nil } } if strings.HasPrefix(value, "http://") || strings.HasPrefix(value, "https://") { req, err := http.NewRequestWithContext(ctx, http.MethodGet, value, nil) if err != nil { return "", err } resp, err := httpClient(request.HTTPClient).Do(req) if err != nil { return "", &ClientError{Code: "network", Message: err.Error(), Retryable: true} } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { raw, _ := io.ReadAll(io.LimitReader(resp.Body, 64*1024)) return "", &ClientError{Code: statusCodeName(resp.StatusCode), Message: errorMessage(raw, resp.Status), StatusCode: resp.StatusCode, RequestID: requestIDFromHTTPResponse(resp), Retryable: HTTPRetryable(resp.StatusCode)} } raw, err := io.ReadAll(io.LimitReader(resp.Body, 16*1024*1024)) if err != nil { return "", &ClientError{Code: "network", Message: err.Error(), Retryable: true} } return base64.StdEncoding.EncodeToString(raw), nil } return value, nil } func kelingIsOmniRequest(request Request) bool { modelType := strings.TrimSpace(request.ModelType) return modelType == "omni_video" || modelType == "omni" || request.Candidate.Capabilities["omni_video"] != nil || request.Candidate.Capabilities["omni"] != nil } func firstKelingPrompt(content []map[string]any) string { for _, item := range content { if stringFromAny(item["type"]) == "text" && stringFromAny(item["role"]) != "shot_prompt" && item["shot_index"] == nil { if text := strings.TrimSpace(stringFromAny(item["text"])); text != "" { return text } } } return "" } func kelingImageInputs(content []map[string]any) (string, string, []string) { firstFrame := "" lastFrame := "" references := make([]string, 0) for _, item := range content { if !isKelingImageContent(item) { continue } url := kelingNestedURL(item, "image_url") if url == "" { continue } switch stringFromAny(item["role"]) { case "first_frame": if firstFrame == "" { firstFrame = url } case "last_frame": if lastFrame == "" { lastFrame = url } default: references = append(references, url) } } return firstFrame, lastFrame, references } func kelingOmniImageList(content []map[string]any) []any { out := make([]any, 0) for _, item := range content { if !isKelingImageContent(item) { continue } url := kelingNestedURL(item, "image_url") if url == "" { continue } image := map[string]any{"image_url": url} switch stringFromAny(item["role"]) { case "first_frame": image["type"] = "first_frame" case "last_frame": image["type"] = "end_frame" } out = append(out, image) } return out } func kelingOmniVideoList(content []map[string]any) []map[string]any { out := make([]map[string]any, 0) for _, item := range content { if !isKelingVideoContent(item) { continue } nested := mapFromAny(item["video_url"]) url := strings.TrimSpace(stringFromAny(nested["url"])) if url == "" { continue } video := map[string]any{"video_url": url} referType := strings.TrimSpace(stringFromAny(nested["refer_type"])) if referType == "" { switch stringFromAny(item["role"]) { case "video_base": referType = "base" case "video_feature", "reference_video": referType = "feature" } } if referType == "base" || referType == "feature" { video["refer_type"] = referType } if keep := strings.TrimSpace(stringFromAny(nested["keep_original_sound"])); keep != "" { video["keep_original_sound"] = keep } out = append(out, video) } return out } type kelingShotPrompt struct { index int text string duration float64 } func kelingShotPrompts(content []map[string]any) []kelingShotPrompt { shots := make([]kelingShotPrompt, 0) for index, item := range content { if stringFromAny(item["type"]) != "text" { continue } if stringFromAny(item["role"]) != "shot_prompt" && item["shot_index"] == nil { continue } text := strings.TrimSpace(stringFromAny(item["text"])) if text == "" { continue } shotIndex := int(math.Floor(numericValue(item["shot_index"], float64(index)))) shots = append(shots, kelingShotPrompt{index: shotIndex, text: text, duration: numericValue(item["duration"], 5)}) } sort.SliceStable(shots, func(i, j int) bool { return shots[i].index < shots[j].index }) return shots } func kelingHasBaseVideo(videos []map[string]any) bool { for _, video := range videos { if stringFromAny(video["refer_type"]) == "base" { return true } } return false } func kelingHasFirstFrame(images []any) bool { for _, item := range images { image := mapFromAny(item) if stringFromAny(image["type"]) == "first_frame" { return true } } return false } func kelingCreateElementPayload(inline map[string]any) map[string]any { frontURL := strings.TrimSpace(firstNonEmptyStringValue(inline, "frontal_image_url", "frontalImageUrl", "element_frontal_image", "image_url", "imageUrl", "url")) if frontURL == "" { return nil } name := firstNonEmptyStringValue(inline, "name", "element_name", "elementName") if name == "" { name = "temporary element" } payload := map[string]any{ "element_name": name, "element_description": firstNonEmpty(firstNonEmptyStringValue(inline, "description"), name), "element_frontal_image": frontURL, } referImages := make([]any, 0) for _, ref := range mapListFromAny(firstPresent(inline["refer_images"], inline["referImages"], inline["element_refer_list"])) { url := strings.TrimSpace(firstNonEmptyStringValue(ref, "url", "image_url", "imageUrl")) if url != "" { referImages = append(referImages, map[string]any{"image_url": url}) } } if len(referImages) > 0 { payload["element_refer_list"] = referImages } if tags := kelingElementTagList(inline["tags"]); len(tags) > 0 { payload["tag_list"] = tags } return payload } func kelingElementTagList(value any) []any { mapping := map[string]string{ "hot_meme": "o_101", "character": "o_102", "animal": "o_103", "prop": "o_104", "costume": "o_105", "scene": "o_106", "effect": "o_107", "other": "o_108", } out := make([]any, 0) for _, tag := range stringListFromAny(value) { id := mapping[strings.TrimSpace(tag)] if id == "" { id = mapping["other"] } out = append(out, map[string]any{"tag_id": id}) } return out } func kelingNestedURL(item map[string]any, key string) string { nested := mapFromAny(item[key]) if nested != nil { if value := strings.TrimSpace(stringFromAny(nested["url"])); value != "" { return value } } return strings.TrimSpace(stringFromAny(item[key])) } func isKelingImageContent(item map[string]any) bool { return stringFromAny(item["type"]) == "image_url" || mapFromAny(item["image_url"]) != nil || strings.TrimSpace(stringFromAny(item["image_url"])) != "" } func isKelingVideoContent(item map[string]any) bool { return stringFromAny(item["type"]) == "video_url" || mapFromAny(item["video_url"]) != nil || strings.TrimSpace(stringFromAny(item["video_url"])) != "" } func kelingModeByResolution(resolution string) string { switch strings.TrimSpace(resolution) { case "2160p": return "4k" case "1080p": return "pro" case "480p", "720p", "": return "std" default: if strings.HasSuffix(strings.TrimSpace(resolution), "p") { return "std" } return "" } } func kelingCameraControl(body map[string]any) map[string]any { cameraControl := strings.TrimSpace(stringFromAny(body["camera_control"])) if cameraControl == "" { return nil } if strings.HasPrefix(cameraControl, "simple") { directions := []string{"horizontal", "vertical", "pan", "tilt", "roll", "zoom"} current := "" parts := strings.SplitN(cameraControl, ":", 2) if len(parts) == 2 { current = parts[1] } strength := firstPresent(body["camera_control_strength"], body["cameraControlStrength"]) config := map[string]any{} for _, direction := range directions { if direction == current { config[direction] = strength } else { config[direction] = 0 } } return map[string]any{"type": "simple", "config": config} } return map[string]any{"type": cameraControl} } func kelingData(result map[string]any) map[string]any { data, _ := result["data"].(map[string]any) if data == nil { return map[string]any{} } return data } func kelingTaskStatus(result map[string]any) string { return strings.ToLower(strings.TrimSpace(stringFromAny(kelingData(result)["task_status"]))) } func kelingTaskErrorCode(result map[string]any) string { if code := intFromAny(result["code"]); code != 0 { return fmt.Sprintf("keling_%d", code) } return "keling_task_failed" } func kelingTaskErrorMessage(candidate store.RuntimeModelCandidate, result map[string]any) string { message := strings.TrimSpace(stringFromAny(kelingData(result)["task_status_msg"])) if message == "" { message = strings.TrimSpace(stringFromAny(result["message"])) } if message == "" { message = "keling video task failed" } return fmt.Sprintf("Platform:%s,Code:%v,requestId:%s,message:%s", candidate.Provider, result["code"], stringFromAny(result["request_id"]), message) } func kelingEnvelopeErrorCode(result map[string]any) string { if code := intFromAny(result["code"]); code != 0 { return fmt.Sprintf("keling_%d", code) } return "keling_error" } func kelingEnvelopeErrorMessage(result map[string]any) string { if message := strings.TrimSpace(stringFromAny(result["message"])); message != "" { return message } return "keling request failed" } func kelingVideoSuccessResult(request Request, upstreamTaskID string, raw map[string]any) map[string]any { data := kelingData(raw) taskResult, _ := data["task_result"].(map[string]any) videos, _ := taskResult["videos"].([]any) items := make([]any, 0, len(videos)) for _, rawVideo := range videos { video := mapFromAny(rawVideo) url := strings.TrimSpace(stringFromAny(video["url"])) if url == "" { continue } item := map[string]any{"url": url, "video_url": url, "type": "video"} if duration := intFromAny(video["duration"]); duration > 0 { item["duration"] = duration } items = append(items, item) } created := intFromAny(data["created_at"]) if created == 0 { created = int(nowUnix()) } return map[string]any{ "id": upstreamTaskID, "object": "video.generation", "created": created, "model": upstreamModelName(request.Candidate), "status": "succeeded", "upstream_task_id": upstreamTaskID, "data": items, "raw": raw, } } func kelingVideoProgress(request Request, upstreamTaskID string) []Progress { progress := providerProgress(request) progress = append(progress, Progress{ Phase: "polling_result", Progress: 0.9, Message: "keling video task completed", Payload: map[string]any{"upstreamTaskId": upstreamTaskID}, }) return progress } func kelingPollEndpoint(request Request, fallback string) string { for _, key := range []string{"endpoint", "pollEndpoint"} { if value := strings.TrimSpace(stringFromAny(request.RemoteTaskPayload[key])); value != "" { return value } } return fallback } func kelingPollInterval(request Request) time.Duration { ms := numericValue(firstPresent(request.Candidate.PlatformConfig["kelingPollIntervalMs"], request.Candidate.PlatformConfig["klingPollIntervalMs"], request.Body["pollIntervalMs"], request.Body["poll_interval_ms"]), 15000) if ms < 100 { ms = 100 } return time.Duration(ms) * time.Millisecond } func kelingPollTimeout(request Request) time.Duration { seconds := numericValue(firstPresent(request.Candidate.PlatformConfig["kelingPollTimeoutSeconds"], request.Candidate.PlatformConfig["klingPollTimeoutSeconds"], request.Body["pollTimeoutSeconds"], request.Body["poll_timeout_seconds"]), 600) if seconds < 1 { seconds = 600 } return time.Duration(seconds) * time.Second } func fmtDuration(value any, fallback float64) string { duration := numericValue(value, fallback) if math.Abs(duration-math.Round(duration)) < 1e-9 { return fmt.Sprintf("%d", int(math.Round(duration))) } return strings.TrimRight(strings.TrimRight(fmt.Sprintf("%.6f", duration), "0"), ".") } func deleteEmptyStringFields(payload map[string]any) { for key, value := range payload { if text, ok := value.(string); ok && strings.TrimSpace(text) == "" { delete(payload, key) } } } func kelingStringFromAny(value any) string { switch typed := value.(type) { case json.Number: return typed.String() case float64: if math.Abs(typed-math.Round(typed)) < 1e-9 { return fmt.Sprintf("%.0f", typed) } return fmt.Sprintf("%v", typed) case int: return fmt.Sprintf("%d", typed) case int64: return fmt.Sprintf("%d", typed) case string: return strings.TrimSpace(typed) default: return "" } }