diff --git a/apps/api/internal/clients/clients_test.go b/apps/api/internal/clients/clients_test.go index 5476949..e883ba6 100644 --- a/apps/api/internal/clients/clients_test.go +++ b/apps/api/internal/clients/clients_test.go @@ -662,6 +662,266 @@ func TestVolcesClientVideoResumePollsExistingTaskID(t *testing.T) { } } +func TestKelingClientVideoSubmitsAndPollsImageTask(t *testing.T) { + var submitPath string + var pollPath string + var gotAuth string + var submittedTaskID string + var submittedPayload map[string]any + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotAuth = r.Header.Get("Authorization") + switch r.Method + " " + r.URL.Path { + case "POST /videos/image2video": + submitPath = r.URL.Path + if err := json.NewDecoder(r.Body).Decode(&submittedPayload); err != nil { + t.Fatalf("decode keling submit: %v", err) + } + if _, ok := submittedPayload["aspect_ratio"]; ok { + t.Fatalf("image2video payload should not include aspect_ratio: %+v", submittedPayload) + } + _ = json.NewEncoder(w).Encode(map[string]any{ + "code": 0, + "request_id": "req-submit", + "data": map[string]any{"task_id": "keling-task-1"}, + }) + case "GET /videos/image2video/keling-task-1": + pollPath = r.URL.Path + _ = json.NewEncoder(w).Encode(map[string]any{ + "code": 0, + "request_id": "req-poll", + "data": map[string]any{ + "task_id": "keling-task-1", + "task_status": "succeed", + "created_at": 456, + "task_result": map[string]any{ + "videos": []any{map[string]any{"url": "https://example.com/keling.mp4", "duration": 6}}, + }, + }, + }) + default: + t.Fatalf("unexpected request %s %s", r.Method, r.URL.Path) + } + })) + defer server.Close() + + response, err := (KelingClient{HTTPClient: server.Client()}).Run(context.Background(), Request{ + Kind: "videos.generations", + ModelType: "image_to_video", + Model: "可灵2.6", + Body: map[string]any{ + "model": "可灵2.6", + "prompt": "A clean product reveal", + "first_frame": "data:image/png;base64,Zmlyc3Q=", + "last_frame": "data:image/png;base64,bGFzdA==", + "duration": 6, + "resolution": "1080p", + "aspect_ratio": "16:9", + "audio": true, + "camera_control": "simple:zoom", + "camera_control_strength": 0.6, + }, + Candidate: store.RuntimeModelCandidate{ + BaseURL: server.URL, + Provider: "keling", + AuthType: "AccessKey-SecretKey", + ModelName: "可灵2.6", + ProviderModelName: "kling-v2-6", + Credentials: map[string]any{"accessKey": "ak", "secretKey": "sk"}, + PlatformConfig: map[string]any{ + "kelingPollIntervalMs": 100, + "kelingPollTimeoutSeconds": 1, + }, + }, + OnRemoteTaskSubmitted: func(remoteTaskID string, payload map[string]any) error { + submittedTaskID = remoteTaskID + if payload["endpoint"] != "/videos/image2video" || payload["taskType"] != "image2video" { + t.Fatalf("unexpected submitted keling payload: %+v", payload) + } + return nil + }, + }) + if err != nil { + t.Fatalf("run keling video: %v", err) + } + if submitPath != "/videos/image2video" || pollPath != "/videos/image2video/keling-task-1" || !strings.HasPrefix(gotAuth, "Bearer ") { + t.Fatalf("unexpected keling paths/auth submit=%s poll=%s auth=%s", submitPath, pollPath, gotAuth) + } + if submittedTaskID != "keling-task-1" { + t.Fatalf("remote task submit callback did not receive task id, got %q", submittedTaskID) + } + if submittedPayload["model_name"] != "kling-v2-6" || + submittedPayload["prompt"] != "A clean product reveal" || + submittedPayload["duration"] != "6" || + submittedPayload["mode"] != "pro" || + submittedPayload["sound"] != "on" || + submittedPayload["image"] != "Zmlyc3Q=" || + submittedPayload["image_tail"] != "bGFzdA==" { + t.Fatalf("unexpected keling submit payload: %+v", submittedPayload) + } + camera, _ := submittedPayload["camera_control"].(map[string]any) + config, _ := camera["config"].(map[string]any) + if camera["type"] != "simple" || numericValue(config["zoom"], 0) != 0.6 || numericValue(config["pan"], -1) != 0 { + t.Fatalf("unexpected keling camera conversion: %+v", submittedPayload["camera_control"]) + } + data, _ := response.Result["data"].([]any) + item, _ := data[0].(map[string]any) + if response.Result["upstream_task_id"] != "keling-task-1" || item["url"] != "https://example.com/keling.mp4" || item["video_url"] != "https://example.com/keling.mp4" { + t.Fatalf("unexpected keling response: %+v", response.Result) + } +} + +func TestKelingOmniPayloadConvertsGatewayContent(t *testing.T) { + payload, cleanupIDs, err := (KelingClient{}).kelingOmniPayload(context.Background(), Request{ + Kind: "videos.generations", + ModelType: "omni_video", + Model: "可灵V3多模态", + Body: map[string]any{ + "model": "可灵V3多模态", + "duration": 8, + "aspect_ratio": "9:16", + "resolution": "2160p", + "audio": true, + "content": []any{ + map[string]any{"type": "text", "text": "Refine the base video"}, + map[string]any{"type": "image_url", "role": "first_frame", "image_url": map[string]any{"url": "https://example.com/first.png"}}, + map[string]any{"type": "image_url", "role": "last_frame", "image_url": map[string]any{"url": "https://example.com/last.png"}}, + map[string]any{ + "type": "video_url", + "role": "video_base", + "video_url": map[string]any{ + "url": "https://example.com/base.mp4", + "keep_original_sound": "yes", + }, + }, + }, + }, + Candidate: store.RuntimeModelCandidate{ + Provider: "keling", + ProviderModelName: "kling-v3-omni", + Capabilities: map[string]any{"omni_video": map[string]any{}}, + }, + }, "token") + if err != nil { + t.Fatalf("build keling omni payload: %v", err) + } + if len(cleanupIDs) != 0 { + t.Fatalf("unexpected cleanup ids: %+v", cleanupIDs) + } + if payload["model_name"] != "kling-v3-omni" || payload["mode"] != "4k" || payload["prompt"] != "Refine the base video" { + t.Fatalf("unexpected keling omni base fields: %+v", payload) + } + if _, ok := payload["sound"]; ok { + t.Fatalf("omni payload with base video should not include sound: %+v", payload) + } + if _, ok := payload["duration"]; ok { + t.Fatalf("base video edit should not include duration: %+v", payload) + } + if _, ok := payload["aspect_ratio"]; ok { + t.Fatalf("base video edit should not include aspect_ratio: %+v", payload) + } + watermark, _ := payload["watermark_info"].(map[string]any) + if watermark["enabled"] != false { + t.Fatalf("keling watermark should be disabled by default: %+v", payload) + } + images, _ := payload["image_list"].([]any) + if len(images) != 2 { + t.Fatalf("unexpected keling image_list: %+v", payload["image_list"]) + } + firstImage, _ := images[0].(map[string]any) + lastImage, _ := images[1].(map[string]any) + if firstImage["type"] != "first_frame" || lastImage["type"] != "end_frame" { + t.Fatalf("frame roles should convert to keling omni types: %+v", images) + } + videos, _ := payload["video_list"].([]map[string]any) + if len(videos) != 1 || videos[0]["refer_type"] != "base" || videos[0]["keep_original_sound"] != "yes" { + t.Fatalf("video roles should convert to keling omni refer_type: %+v", payload["video_list"]) + } +} + +func TestKelingClientVideoResumePollsWithoutSubmitting(t *testing.T) { + var submitCalled bool + var pollPath string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.Method + " " + r.URL.Path { + case "POST /general/custom-elements", "POST /videos/omni-video": + submitCalled = true + t.Fatalf("resume should not submit or upload temporary elements") + case "GET /videos/omni-video/keling-existing": + pollPath = r.URL.Path + _ = json.NewEncoder(w).Encode(map[string]any{ + "code": 0, + "request_id": "req-resume", + "data": map[string]any{ + "task_id": "keling-existing", + "task_status": "succeed", + "task_result": map[string]any{ + "videos": []any{map[string]any{"url": "https://example.com/resumed-keling.mp4"}}, + }, + }, + }) + default: + t.Fatalf("unexpected request %s %s", r.Method, r.URL.Path) + } + })) + defer server.Close() + + response, err := (KelingClient{HTTPClient: server.Client()}).Run(context.Background(), Request{ + Kind: "videos.generations", + ModelType: "omni_video", + Model: "可灵V3多模态", + Body: map[string]any{"prompt": "resume", "pollIntervalMs": 100, "pollTimeoutSeconds": 1}, + RemoteTaskID: "keling-existing", + RemoteTaskPayload: map[string]any{ + "endpoint": "/videos/omni-video", + }, + Candidate: store.RuntimeModelCandidate{ + BaseURL: server.URL, + Provider: "keling", + AuthType: "AccessKey-SecretKey", + ProviderModelName: "kling-v3-omni", + Credentials: map[string]any{"accessKey": "ak", "secretKey": "sk"}, + }, + }) + if err != nil { + t.Fatalf("resume keling video: %v", err) + } + if submitCalled || pollPath != "/videos/omni-video/keling-existing" { + t.Fatalf("resume should poll existing task only, submit=%v poll=%s", submitCalled, pollPath) + } + data, _ := response.Result["data"].([]any) + item, _ := data[0].(map[string]any) + if response.Result["upstream_task_id"] != "keling-existing" || item["url"] != "https://example.com/resumed-keling.mp4" { + t.Fatalf("unexpected resumed keling response: %+v", response.Result) + } +} + +func TestKelingElementPayloadMapsTags(t *testing.T) { + payload := kelingCreateElementPayload(map[string]any{ + "name": "subject", + "frontal_image_url": "https://example.com/front.png", + "tags": []any{"character", "unknown"}, + "refer_images": []any{ + map[string]any{"url": "https://example.com/side.png"}, + }, + }) + if payload["element_name"] != "subject" || payload["element_frontal_image"] != "https://example.com/front.png" { + t.Fatalf("unexpected element payload base fields: %+v", payload) + } + tags, _ := payload["tag_list"].([]any) + if len(tags) != 2 { + t.Fatalf("unexpected tag list: %+v", payload["tag_list"]) + } + firstTag, _ := tags[0].(map[string]any) + secondTag, _ := tags[1].(map[string]any) + if firstTag["tag_id"] != "o_102" || secondTag["tag_id"] != "o_108" { + t.Fatalf("unexpected keling tag conversion: %+v", payload["tag_list"]) + } + refs, _ := payload["element_refer_list"].([]any) + if len(refs) != 1 { + t.Fatalf("unexpected element references: %+v", payload["element_refer_list"]) + } +} + func extractText(result map[string]any) string { choices, _ := result["choices"].([]any) choice, _ := choices[0].(map[string]any) diff --git a/apps/api/internal/clients/keling.go b/apps/api/internal/clients/keling.go new file mode 100644 index 0000000..dfe1a09 --- /dev/null +++ b/apps/api/internal/clients/keling.go @@ -0,0 +1,960 @@ +package clients + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "math" + "net/http" + "sort" + "strings" + "time" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" + "github.com/golang-jwt/jwt/v5" +) + +type KelingClient struct { + HTTPClient *http.Client +} + +type kelingPreparedTask struct { + Endpoint string + Payload map[string]any + RemoteTaskPayload map[string]any + CleanupElementIDs []string +} + +func (c KelingClient) Run(ctx context.Context, request Request) (Response, error) { + if request.Kind != "videos.generations" { + return Response{}, &ClientError{Code: "unsupported_kind", Message: "unsupported keling request kind", Retryable: false} + } + token, err := kelingAuthToken(request.Candidate) + if err != nil { + return Response{}, err + } + return c.runVideo(ctx, request, token) +} + +func (c KelingClient) runVideo(ctx context.Context, request Request, token string) (Response, error) { + submitStartedAt := time.Now() + submitRequestID := strings.TrimSpace(request.RemoteTaskID) + upstreamTaskID := strings.TrimSpace(request.RemoteTaskID) + prepared := kelingResumePreparedTask(request) + if upstreamTaskID == "" { + var err error + prepared, err = c.prepareVideoTask(ctx, request, token) + if err != nil { + return Response{}, err + } + } + defer func() { + if upstreamTaskID == "" { + _ = c.cleanupKelingElements(context.WithoutCancel(ctx), request, token, prepared.CleanupElementIDs) + } + }() + + if upstreamTaskID == "" { + submitResult, requestID, err := c.postJSON(ctx, request, prepared.Endpoint, token, prepared.Payload) + submitRequestID = requestID + if err != nil { + return Response{}, annotateResponseError(err, submitRequestID, submitStartedAt, time.Now()) + } + upstreamTaskID = strings.TrimSpace(stringFromAny(kelingData(submitResult)["task_id"])) + if upstreamTaskID == "" { + _ = c.cleanupKelingElements(context.WithoutCancel(ctx), request, token, prepared.CleanupElementIDs) + return Response{}, &ClientError{Code: "invalid_response", Message: "keling video task id is missing", RequestID: submitRequestID, Retryable: false} + } + prepared.RemoteTaskPayload["submit"] = submitResult + if request.OnRemoteTaskSubmitted != nil { + if err := request.OnRemoteTaskSubmitted(upstreamTaskID, prepared.RemoteTaskPayload); err != nil { + return Response{}, err + } + } + } + + pollEndpoint := kelingPollEndpoint(request, prepared.Endpoint) + interval := kelingPollInterval(request) + timeout := kelingPollTimeout(request) + deadline := time.NewTimer(timeout) + defer deadline.Stop() + ticker := time.NewTicker(interval) + defer ticker.Stop() + + var lastResult map[string]any + for { + select { + case <-ctx.Done(): + return Response{}, &ClientError{Code: "cancelled", Message: ctx.Err().Error(), RequestID: submitRequestID, Retryable: true} + default: + } + + pollStartedAt := time.Now() + pollResult, pollRequestID, err := c.getJSON(ctx, request, pollEndpoint+"/"+upstreamTaskID, token) + pollFinishedAt := time.Now() + requestID := firstNonEmpty(pollRequestID, submitRequestID, upstreamTaskID) + if err != nil { + return Response{}, annotateResponseError(err, requestID, pollStartedAt, pollFinishedAt) + } + lastResult = pollResult + + switch kelingTaskStatus(pollResult) { + case "succeed": + _ = c.cleanupKelingElements(context.WithoutCancel(ctx), request, token, prepared.CleanupElementIDs) + prepared.CleanupElementIDs = nil + result := kelingVideoSuccessResult(request, upstreamTaskID, pollResult) + return Response{ + Result: result, + RequestID: requestID, + Progress: kelingVideoProgress(request, upstreamTaskID), + ResponseStartedAt: submitStartedAt, + ResponseFinishedAt: pollFinishedAt, + ResponseDurationMS: responseDurationMS(submitStartedAt, pollFinishedAt), + }, nil + case "failed": + _ = c.cleanupKelingElements(context.WithoutCancel(ctx), request, token, prepared.CleanupElementIDs) + prepared.CleanupElementIDs = nil + return Response{}, &ClientError{ + Code: kelingTaskErrorCode(pollResult), + Message: kelingTaskErrorMessage(request.Candidate, pollResult), + RequestID: requestID, + ResponseStartedAt: submitStartedAt, + ResponseFinishedAt: pollFinishedAt, + ResponseDurationMS: responseDurationMS(submitStartedAt, pollFinishedAt), + Retryable: false, + } + } + + select { + case <-ctx.Done(): + return Response{}, &ClientError{Code: "cancelled", Message: ctx.Err().Error(), RequestID: requestID, Retryable: true} + case <-deadline.C: + return Response{}, &ClientError{ + Code: "timeout", + Message: fmt.Sprintf("keling video task %s did not finish before timeout; last status: %s", upstreamTaskID, kelingTaskStatus(lastResult)), + RequestID: requestID, + Retryable: true, + } + case <-ticker.C: + } + } +} + +func (c KelingClient) prepareVideoTask(ctx context.Context, request Request, token string) (kelingPreparedTask, error) { + if kelingIsOmniRequest(request) { + payload, cleanupIDs, err := c.kelingOmniPayload(ctx, request, token) + if err != nil { + return kelingPreparedTask{}, err + } + return kelingPreparedTask{ + Endpoint: "/videos/omni-video", + Payload: payload, + RemoteTaskPayload: map[string]any{"endpoint": "/videos/omni-video", "mode": "omni_video", "cleanupElementIds": cleanupIDs}, + CleanupElementIDs: cleanupIDs, + }, nil + } + payload, taskType, err := kelingVideoPayload(ctx, request) + if err != nil { + return kelingPreparedTask{}, err + } + endpoint := "/videos/" + taskType + return kelingPreparedTask{ + Endpoint: endpoint, + Payload: payload, + RemoteTaskPayload: map[string]any{"endpoint": endpoint, "taskType": taskType}, + }, nil +} + +func kelingResumePreparedTask(request Request) kelingPreparedTask { + endpoint := "" + for _, key := range []string{"endpoint", "pollEndpoint"} { + if value := strings.TrimSpace(stringFromAny(request.RemoteTaskPayload[key])); value != "" { + endpoint = value + break + } + } + if endpoint == "" { + if kelingIsOmniRequest(request) { + endpoint = "/videos/omni-video" + } else { + endpoint = "/videos/" + kelingTaskTypeFromRequest(request) + } + } + return kelingPreparedTask{Endpoint: endpoint, RemoteTaskPayload: map[string]any{"endpoint": endpoint}} +} + +func kelingVideoPayload(ctx context.Context, request Request) (map[string]any, string, error) { + body := cleanProviderBody(request.Body) + content := contentItems(body["content"]) + if len(content) == 0 { + content = buildVolcesContentFromBody(body) + } + prompt := firstKelingPrompt(content) + if prompt == "" { + return nil, "", &ClientError{Code: "invalid_parameter", Message: "keling video prompt is required", StatusCode: 400, Retryable: false} + } + firstFrame, lastFrame, referenceImages := kelingImageInputs(content) + isImage2Video := firstFrame != "" || lastFrame != "" || len(referenceImages) > 0 + primaryImage := firstFrame + if primaryImage == "" && len(referenceImages) <= 1 && len(referenceImages) > 0 { + primaryImage = referenceImages[0] + } + if primaryImage == "" { + primaryImage = lastFrame + } + + payload := map[string]any{ + "prompt": prompt, + "model_name": upstreamModelName(request.Candidate), + "duration": fmtDuration(body["duration"], 5), + } + if value := strings.TrimSpace(stringFromAny(body["negative_prompt"])); value != "" { + payload["negative_prompt"] = value + } + if value, ok := body["cfg_scale"]; ok && numericValue(value, 0) > 0 { + payload["cfg_scale"] = value + } + if boolValue(body, "audio") || boolValue(body, "output_audio") { + payload["sound"] = "on" + } + if mode := kelingModeByResolution(firstNonEmptyStringValue(body, "resolution", "size")); mode != "" { + payload["mode"] = mode + } + if ratio := strings.TrimSpace(firstNonEmptyStringValue(body, "aspect_ratio", "aspectRatio", "ratio")); strings.Contains(ratio, ":") { + payload["aspect_ratio"] = ratio + } + if camera := kelingCameraControl(body); camera != nil { + payload["camera_control"] = camera + } + if primaryImage != "" { + encoded, err := kelingImageToBase64(ctx, request, primaryImage) + if err != nil { + return nil, "", err + } + payload["image"] = encoded + } + if lastFrame != "" { + encoded, err := kelingImageToBase64(ctx, request, lastFrame) + if err != nil { + return nil, "", err + } + payload["image_tail"] = encoded + } + if len(referenceImages) > 0 { + imageList := make([]any, 0, len(referenceImages)) + for _, url := range referenceImages { + encoded, err := kelingImageToBase64(ctx, request, url) + if err != nil { + return nil, "", err + } + imageList = append(imageList, map[string]any{"image": encoded}) + } + payload["image_list"] = imageList + } + if !strings.Contains(stringFromAny(payload["aspect_ratio"]), ":") || isImage2Video { + delete(payload, "aspect_ratio") + } + + taskType := "text2video" + if primaryImage != "" { + taskType = "image2video" + } else if len(referenceImages) > 1 { + taskType = "multi-image2video" + } + return payload, taskType, nil +} + +func kelingTaskTypeFromRequest(request Request) string { + body := cleanProviderBody(request.Body) + content := contentItems(body["content"]) + if len(content) == 0 { + content = buildVolcesContentFromBody(body) + } + firstFrame, lastFrame, referenceImages := kelingImageInputs(content) + if firstFrame != "" || lastFrame != "" || len(referenceImages) == 1 { + return "image2video" + } + if len(referenceImages) > 1 { + return "multi-image2video" + } + return "text2video" +} + +func (c KelingClient) kelingOmniPayload(ctx context.Context, request Request, token string) (map[string]any, []string, error) { + body := cleanProviderBody(request.Body) + content := contentItems(body["content"]) + if len(content) == 0 { + content = buildVolcesContentFromBody(body) + } + prompt := firstKelingPrompt(content) + images := kelingOmniImageList(content) + videos := kelingOmniVideoList(content) + uploadedElementIDs := make([]string, 0) + elements, createdIDs, err := c.kelingOmniElementList(ctx, request, token, content) + if err != nil { + return nil, nil, err + } + uploadedElementIDs = append(uploadedElementIDs, createdIDs...) + shots := kelingShotPrompts(content) + hasMultiPrompt := len(shots) > 0 + hasVideo := len(videos) > 0 + hasVideoEdit := kelingHasBaseVideo(videos) + hasFirstFrame := kelingHasFirstFrame(images) + + payload := map[string]any{ + "model_name": upstreamModelName(request.Candidate), + "mode": kelingModeByResolution(firstNonEmptyStringValue(body, "resolution", "size")), + "watermark_info": map[string]any{"enabled": false}, + "negative_prompt": strings.TrimSpace(stringFromAny(body["negative_prompt"])), + } + if !hasMultiPrompt { + payload["prompt"] = prompt + if body["duration"] != nil { + payload["duration"] = fmtDuration(body["duration"], 0) + } + } + if ratio := strings.TrimSpace(firstNonEmptyStringValue(body, "aspect_ratio", "aspectRatio", "ratio")); strings.Contains(ratio, ":") { + payload["aspect_ratio"] = ratio + } + if len(images) > 0 { + payload["image_list"] = images + } + if len(videos) > 0 { + payload["video_list"] = videos + } + if len(elements) > 0 { + payload["element_list"] = elements + } + if (boolValue(body, "audio") || boolValue(body, "output_audio")) && !hasVideo { + payload["sound"] = "on" + } + if hasMultiPrompt { + payload["multi_shot"] = true + payload["shot_type"] = "customize" + total := 0.0 + multiPrompt := make([]any, 0, len(shots)) + for index, shot := range shots { + duration := shot.duration + if duration <= 0 { + duration = 5 + } + total += duration + multiPrompt = append(multiPrompt, map[string]any{ + "index": index + 1, + "prompt": shot.text, + "duration": fmtDuration(duration, 5), + }) + } + delete(payload, "prompt") + payload["multi_prompt"] = multiPrompt + payload["duration"] = fmtDuration(total, 0) + } + deleteEmptyStringFields(payload) + if hasVideoEdit { + delete(payload, "duration") + delete(payload, "aspect_ratio") + } + if hasVideo && !hasVideoEdit && !strings.Contains(stringFromAny(payload["aspect_ratio"]), ":") { + payload["aspect_ratio"] = "16:9" + } + if !hasVideoEdit && !hasFirstFrame && !strings.Contains(stringFromAny(payload["aspect_ratio"]), ":") { + payload["aspect_ratio"] = "16:9" + } + return payload, uploadedElementIDs, nil +} + +func (c KelingClient) kelingOmniElementList(ctx context.Context, request Request, token string, content []map[string]any) ([]any, []string, error) { + elements := make([]any, 0) + createdIDs := make([]string, 0) + for _, item := range content { + if stringFromAny(item["type"]) != "element" { + continue + } + element := mapFromAny(item["element"]) + if element == nil { + continue + } + if id := kelingStringFromAny(firstPresent(element["element_id"], element["id"])); id != "" { + elements = append(elements, map[string]any{"element_id": id}) + continue + } + inline := mapFromAny(element["inline_element"]) + if inline == nil { + continue + } + payload := kelingCreateElementPayload(inline) + if payload == nil { + continue + } + id, err := c.createKelingElement(ctx, request, token, payload) + if err != nil { + return nil, createdIDs, err + } + elements = append(elements, map[string]any{"element_id": id}) + createdIDs = append(createdIDs, id) + } + return elements, createdIDs, nil +} + +func (c KelingClient) postJSON(ctx context.Context, request Request, path string, token string, body map[string]any) (map[string]any, string, error) { + raw, _ := json.Marshal(body) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, joinURL(request.Candidate.BaseURL, path), bytes.NewReader(raw)) + if err != nil { + return nil, "", err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+token) + resp, err := httpClient(request.HTTPClient, c.HTTPClient).Do(req) + if err != nil { + return nil, "", &ClientError{Code: "network", Message: err.Error(), Retryable: true} + } + requestID := requestIDFromHTTPResponse(resp) + result, err := decodeHTTPResponse(resp) + if err != nil { + return result, requestID, err + } + if code := intFromAny(result["code"]); code != 0 { + return result, requestID, &ClientError{Code: kelingEnvelopeErrorCode(result), Message: kelingEnvelopeErrorMessage(result), RequestID: firstNonEmpty(requestID, stringFromAny(result["request_id"])), Retryable: false} + } + return result, firstNonEmpty(requestID, stringFromAny(result["request_id"])), nil +} + +func (c KelingClient) getJSON(ctx context.Context, request Request, path string, token string) (map[string]any, string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, joinURL(request.Candidate.BaseURL, path), nil) + if err != nil { + return nil, "", err + } + req.Header.Set("Authorization", "Bearer "+token) + resp, err := httpClient(request.HTTPClient, c.HTTPClient).Do(req) + if err != nil { + return nil, "", &ClientError{Code: "network", Message: err.Error(), Retryable: true} + } + requestID := requestIDFromHTTPResponse(resp) + result, err := decodeHTTPResponse(resp) + if err != nil { + return result, requestID, err + } + if code := intFromAny(result["code"]); code != 0 { + return result, requestID, &ClientError{Code: kelingEnvelopeErrorCode(result), Message: kelingEnvelopeErrorMessage(result), RequestID: firstNonEmpty(requestID, stringFromAny(result["request_id"])), Retryable: false} + } + return result, firstNonEmpty(requestID, stringFromAny(result["request_id"])), nil +} + +func (c KelingClient) createKelingElement(ctx context.Context, request Request, token string, payload map[string]any) (string, error) { + raw, _ := json.Marshal(payload) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, joinURL(request.Candidate.BaseURL, "/general/custom-elements"), bytes.NewReader(raw)) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+token) + resp, err := httpClient(request.HTTPClient, c.HTTPClient).Do(req) + if err != nil { + return "", &ClientError{Code: "network", Message: err.Error(), Retryable: true} + } + defer resp.Body.Close() + body, _ := io.ReadAll(io.LimitReader(resp.Body, 16*1024*1024)) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return "", &ClientError{Code: statusCodeName(resp.StatusCode), Message: errorMessage(body, resp.Status), StatusCode: resp.StatusCode, RequestID: requestIDFromHTTPResponse(resp), Retryable: HTTPRetryable(resp.StatusCode)} + } + var parsed struct { + Code int `json:"code"` + Message string `json:"message"` + RequestID string `json:"request_id"` + Data map[string]any `json:"data"` + } + decoder := json.NewDecoder(bytes.NewReader(body)) + decoder.UseNumber() + if err := decoder.Decode(&parsed); err != nil { + return "", &ClientError{Code: "invalid_response", Message: err.Error(), Retryable: false} + } + if parsed.Code != 0 { + return "", &ClientError{Code: "keling_element_create_failed", Message: parsed.Message, RequestID: parsed.RequestID, Retryable: false} + } + id := kelingStringFromAny(parsed.Data["element_id"]) + if id == "" { + return "", &ClientError{Code: "invalid_response", Message: "keling element id is missing", RequestID: parsed.RequestID, Retryable: false} + } + return id, nil +} + +func (c KelingClient) cleanupKelingElements(ctx context.Context, request Request, token string, elementIDs []string) error { + for _, id := range elementIDs { + id = strings.TrimSpace(id) + if id == "" { + continue + } + _, _, _ = c.postJSON(ctx, request, "/general/delete-elements", token, map[string]any{"element_id": id}) + } + return nil +} + +func kelingAuthToken(candidate store.RuntimeModelCandidate) (string, error) { + apiKey := credential(candidate.Credentials, "apiKey", "api_key", "key", "token") + accessKey := credential(candidate.Credentials, "accessKey", "access_key", "ak") + secretKey := credential(candidate.Credentials, "secretKey", "secret_key", "sk") + if accessKey != "" || secretKey != "" || strings.EqualFold(strings.TrimSpace(candidate.AuthType), "AccessKey-SecretKey") { + if accessKey == "" || secretKey == "" { + return "", &ClientError{Code: "missing_credentials", Message: "keling accessKey and secretKey are required", Retryable: false} + } + now := time.Now() + claims := jwt.MapClaims{ + "iss": accessKey, + "exp": now.Add(30 * time.Minute).Unix(), + "nbf": now.Add(-5 * time.Second).Unix(), + } + token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) + signed, err := token.SignedString([]byte(secretKey)) + if err != nil { + return "", &ClientError{Code: "auth_failed", Message: err.Error(), Retryable: false} + } + return signed, nil + } + if apiKey == "" { + return "", &ClientError{Code: "missing_credentials", Message: "keling api key is required", Retryable: false} + } + return apiKey, nil +} + +func kelingImageToBase64(ctx context.Context, request Request, value string) (string, error) { + value = strings.TrimSpace(value) + if value == "" { + return "", nil + } + if strings.HasPrefix(value, "data:") { + parts := strings.SplitN(value, ",", 2) + if len(parts) == 2 { + return strings.TrimSpace(parts[1]), nil + } + } + if strings.HasPrefix(value, "http://") || strings.HasPrefix(value, "https://") { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, value, nil) + if err != nil { + return "", err + } + resp, err := httpClient(request.HTTPClient).Do(req) + if err != nil { + return "", &ClientError{Code: "network", Message: err.Error(), Retryable: true} + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + raw, _ := io.ReadAll(io.LimitReader(resp.Body, 64*1024)) + return "", &ClientError{Code: statusCodeName(resp.StatusCode), Message: errorMessage(raw, resp.Status), StatusCode: resp.StatusCode, RequestID: requestIDFromHTTPResponse(resp), Retryable: HTTPRetryable(resp.StatusCode)} + } + raw, err := io.ReadAll(io.LimitReader(resp.Body, 16*1024*1024)) + if err != nil { + return "", &ClientError{Code: "network", Message: err.Error(), Retryable: true} + } + return base64.StdEncoding.EncodeToString(raw), nil + } + return value, nil +} + +func kelingIsOmniRequest(request Request) bool { + modelType := strings.TrimSpace(request.ModelType) + return modelType == "omni_video" || modelType == "omni" || + request.Candidate.Capabilities["omni_video"] != nil || + request.Candidate.Capabilities["omni"] != nil +} + +func firstKelingPrompt(content []map[string]any) string { + for _, item := range content { + if stringFromAny(item["type"]) == "text" && stringFromAny(item["role"]) != "shot_prompt" && item["shot_index"] == nil { + if text := strings.TrimSpace(stringFromAny(item["text"])); text != "" { + return text + } + } + } + return "" +} + +func kelingImageInputs(content []map[string]any) (string, string, []string) { + firstFrame := "" + lastFrame := "" + references := make([]string, 0) + for _, item := range content { + if !isKelingImageContent(item) { + continue + } + url := kelingNestedURL(item, "image_url") + if url == "" { + continue + } + switch stringFromAny(item["role"]) { + case "first_frame": + if firstFrame == "" { + firstFrame = url + } + case "last_frame": + if lastFrame == "" { + lastFrame = url + } + default: + references = append(references, url) + } + } + return firstFrame, lastFrame, references +} + +func kelingOmniImageList(content []map[string]any) []any { + out := make([]any, 0) + for _, item := range content { + if !isKelingImageContent(item) { + continue + } + url := kelingNestedURL(item, "image_url") + if url == "" { + continue + } + image := map[string]any{"image_url": url} + switch stringFromAny(item["role"]) { + case "first_frame": + image["type"] = "first_frame" + case "last_frame": + image["type"] = "end_frame" + } + out = append(out, image) + } + return out +} + +func kelingOmniVideoList(content []map[string]any) []map[string]any { + out := make([]map[string]any, 0) + for _, item := range content { + if !isKelingVideoContent(item) { + continue + } + nested := mapFromAny(item["video_url"]) + url := strings.TrimSpace(stringFromAny(nested["url"])) + if url == "" { + continue + } + video := map[string]any{"video_url": url} + referType := strings.TrimSpace(stringFromAny(nested["refer_type"])) + if referType == "" { + switch stringFromAny(item["role"]) { + case "video_base": + referType = "base" + case "video_feature", "reference_video": + referType = "feature" + } + } + if referType == "base" || referType == "feature" { + video["refer_type"] = referType + } + if keep := strings.TrimSpace(stringFromAny(nested["keep_original_sound"])); keep != "" { + video["keep_original_sound"] = keep + } + out = append(out, video) + } + return out +} + +type kelingShotPrompt struct { + index int + text string + duration float64 +} + +func kelingShotPrompts(content []map[string]any) []kelingShotPrompt { + shots := make([]kelingShotPrompt, 0) + for index, item := range content { + if stringFromAny(item["type"]) != "text" { + continue + } + if stringFromAny(item["role"]) != "shot_prompt" && item["shot_index"] == nil { + continue + } + text := strings.TrimSpace(stringFromAny(item["text"])) + if text == "" { + continue + } + shotIndex := int(math.Floor(numericValue(item["shot_index"], float64(index)))) + shots = append(shots, kelingShotPrompt{index: shotIndex, text: text, duration: numericValue(item["duration"], 5)}) + } + sort.SliceStable(shots, func(i, j int) bool { return shots[i].index < shots[j].index }) + return shots +} + +func kelingHasBaseVideo(videos []map[string]any) bool { + for _, video := range videos { + if stringFromAny(video["refer_type"]) == "base" { + return true + } + } + return false +} + +func kelingHasFirstFrame(images []any) bool { + for _, item := range images { + image := mapFromAny(item) + if stringFromAny(image["type"]) == "first_frame" { + return true + } + } + return false +} + +func kelingCreateElementPayload(inline map[string]any) map[string]any { + frontURL := strings.TrimSpace(firstNonEmptyStringValue(inline, "frontal_image_url", "frontalImageUrl", "element_frontal_image", "image_url", "imageUrl", "url")) + if frontURL == "" { + return nil + } + name := firstNonEmptyStringValue(inline, "name", "element_name", "elementName") + if name == "" { + name = "temporary element" + } + payload := map[string]any{ + "element_name": name, + "element_description": firstNonEmpty(firstNonEmptyStringValue(inline, "description"), name), + "element_frontal_image": frontURL, + } + referImages := make([]any, 0) + for _, ref := range mapListFromAny(firstPresent(inline["refer_images"], inline["referImages"], inline["element_refer_list"])) { + url := strings.TrimSpace(firstNonEmptyStringValue(ref, "url", "image_url", "imageUrl")) + if url != "" { + referImages = append(referImages, map[string]any{"image_url": url}) + } + } + if len(referImages) > 0 { + payload["element_refer_list"] = referImages + } + if tags := kelingElementTagList(inline["tags"]); len(tags) > 0 { + payload["tag_list"] = tags + } + return payload +} + +func kelingElementTagList(value any) []any { + mapping := map[string]string{ + "hot_meme": "o_101", + "character": "o_102", + "animal": "o_103", + "prop": "o_104", + "costume": "o_105", + "scene": "o_106", + "effect": "o_107", + "other": "o_108", + } + out := make([]any, 0) + for _, tag := range stringListFromAny(value) { + id := mapping[strings.TrimSpace(tag)] + if id == "" { + id = mapping["other"] + } + out = append(out, map[string]any{"tag_id": id}) + } + return out +} + +func kelingNestedURL(item map[string]any, key string) string { + nested := mapFromAny(item[key]) + if nested != nil { + if value := strings.TrimSpace(stringFromAny(nested["url"])); value != "" { + return value + } + } + return strings.TrimSpace(stringFromAny(item[key])) +} + +func isKelingImageContent(item map[string]any) bool { + return stringFromAny(item["type"]) == "image_url" || mapFromAny(item["image_url"]) != nil || strings.TrimSpace(stringFromAny(item["image_url"])) != "" +} + +func isKelingVideoContent(item map[string]any) bool { + return stringFromAny(item["type"]) == "video_url" || mapFromAny(item["video_url"]) != nil || strings.TrimSpace(stringFromAny(item["video_url"])) != "" +} + +func kelingModeByResolution(resolution string) string { + switch strings.TrimSpace(resolution) { + case "2160p": + return "4k" + case "1080p": + return "pro" + case "480p", "720p", "": + return "std" + default: + if strings.HasSuffix(strings.TrimSpace(resolution), "p") { + return "std" + } + return "" + } +} + +func kelingCameraControl(body map[string]any) map[string]any { + cameraControl := strings.TrimSpace(stringFromAny(body["camera_control"])) + if cameraControl == "" { + return nil + } + if strings.HasPrefix(cameraControl, "simple") { + directions := []string{"horizontal", "vertical", "pan", "tilt", "roll", "zoom"} + current := "" + parts := strings.SplitN(cameraControl, ":", 2) + if len(parts) == 2 { + current = parts[1] + } + strength := firstPresent(body["camera_control_strength"], body["cameraControlStrength"]) + config := map[string]any{} + for _, direction := range directions { + if direction == current { + config[direction] = strength + } else { + config[direction] = 0 + } + } + return map[string]any{"type": "simple", "config": config} + } + return map[string]any{"type": cameraControl} +} + +func kelingData(result map[string]any) map[string]any { + data, _ := result["data"].(map[string]any) + if data == nil { + return map[string]any{} + } + return data +} + +func kelingTaskStatus(result map[string]any) string { + return strings.ToLower(strings.TrimSpace(stringFromAny(kelingData(result)["task_status"]))) +} + +func kelingTaskErrorCode(result map[string]any) string { + if code := intFromAny(result["code"]); code != 0 { + return fmt.Sprintf("keling_%d", code) + } + return "keling_task_failed" +} + +func kelingTaskErrorMessage(candidate store.RuntimeModelCandidate, result map[string]any) string { + message := strings.TrimSpace(stringFromAny(kelingData(result)["task_status_msg"])) + if message == "" { + message = strings.TrimSpace(stringFromAny(result["message"])) + } + if message == "" { + message = "keling video task failed" + } + return fmt.Sprintf("Platform:%s,Code:%v,requestId:%s,message:%s", candidate.Provider, result["code"], stringFromAny(result["request_id"]), message) +} + +func kelingEnvelopeErrorCode(result map[string]any) string { + if code := intFromAny(result["code"]); code != 0 { + return fmt.Sprintf("keling_%d", code) + } + return "keling_error" +} + +func kelingEnvelopeErrorMessage(result map[string]any) string { + if message := strings.TrimSpace(stringFromAny(result["message"])); message != "" { + return message + } + return "keling request failed" +} + +func kelingVideoSuccessResult(request Request, upstreamTaskID string, raw map[string]any) map[string]any { + data := kelingData(raw) + taskResult, _ := data["task_result"].(map[string]any) + videos, _ := taskResult["videos"].([]any) + items := make([]any, 0, len(videos)) + for _, rawVideo := range videos { + video := mapFromAny(rawVideo) + url := strings.TrimSpace(stringFromAny(video["url"])) + if url == "" { + continue + } + item := map[string]any{"url": url, "video_url": url, "type": "video"} + if duration := intFromAny(video["duration"]); duration > 0 { + item["duration"] = duration + } + items = append(items, item) + } + created := intFromAny(data["created_at"]) + if created == 0 { + created = int(nowUnix()) + } + return map[string]any{ + "id": upstreamTaskID, + "object": "video.generation", + "created": created, + "model": upstreamModelName(request.Candidate), + "status": "succeeded", + "upstream_task_id": upstreamTaskID, + "data": items, + "raw": raw, + } +} + +func kelingVideoProgress(request Request, upstreamTaskID string) []Progress { + progress := providerProgress(request) + progress = append(progress, Progress{ + Phase: "polling_result", + Progress: 0.9, + Message: "keling video task completed", + Payload: map[string]any{"upstreamTaskId": upstreamTaskID}, + }) + return progress +} + +func kelingPollEndpoint(request Request, fallback string) string { + for _, key := range []string{"endpoint", "pollEndpoint"} { + if value := strings.TrimSpace(stringFromAny(request.RemoteTaskPayload[key])); value != "" { + return value + } + } + return fallback +} + +func kelingPollInterval(request Request) time.Duration { + ms := numericValue(firstPresent(request.Candidate.PlatformConfig["kelingPollIntervalMs"], request.Candidate.PlatformConfig["klingPollIntervalMs"], request.Body["pollIntervalMs"], request.Body["poll_interval_ms"]), 15000) + if ms < 100 { + ms = 100 + } + return time.Duration(ms) * time.Millisecond +} + +func kelingPollTimeout(request Request) time.Duration { + seconds := numericValue(firstPresent(request.Candidate.PlatformConfig["kelingPollTimeoutSeconds"], request.Candidate.PlatformConfig["klingPollTimeoutSeconds"], request.Body["pollTimeoutSeconds"], request.Body["poll_timeout_seconds"]), 600) + if seconds < 1 { + seconds = 600 + } + return time.Duration(seconds) * time.Second +} + +func fmtDuration(value any, fallback float64) string { + duration := numericValue(value, fallback) + if math.Abs(duration-math.Round(duration)) < 1e-9 { + return fmt.Sprintf("%d", int(math.Round(duration))) + } + return strings.TrimRight(strings.TrimRight(fmt.Sprintf("%.6f", duration), "0"), ".") +} + +func deleteEmptyStringFields(payload map[string]any) { + for key, value := range payload { + if text, ok := value.(string); ok && strings.TrimSpace(text) == "" { + delete(payload, key) + } + } +} + +func kelingStringFromAny(value any) string { + switch typed := value.(type) { + case json.Number: + return typed.String() + case float64: + if math.Abs(typed-math.Round(typed)) < 1e-9 { + return fmt.Sprintf("%.0f", typed) + } + return fmt.Sprintf("%v", typed) + case int: + return fmt.Sprintf("%d", typed) + case int64: + return fmt.Sprintf("%d", typed) + case string: + return strings.TrimSpace(typed) + default: + return "" + } +} diff --git a/apps/api/internal/runner/service.go b/apps/api/internal/runner/service.go index 9bb6b50..890e09f 100644 --- a/apps/api/internal/runner/service.go +++ b/apps/api/internal/runner/service.go @@ -55,6 +55,8 @@ func New(cfg config.Config, db *store.Store, logger *slog.Logger) *Service { "openai": clients.OpenAIClient{HTTPClient: httpClients.none}, "gemini": clients.GeminiClient{HTTPClient: httpClients.none}, "volces": clients.VolcesClient{HTTPClient: httpClients.none}, + "keling": clients.KelingClient{HTTPClient: httpClients.none}, + "kling": clients.KelingClient{HTTPClient: httpClients.none}, "simulation": clients.SimulationClient{}, }, httpClients: httpClients, diff --git a/apps/api/migrations/0038_keling_omni_audio_flags.sql b/apps/api/migrations/0038_keling_omni_audio_flags.sql new file mode 100644 index 0000000..17cda6f --- /dev/null +++ b/apps/api/migrations/0038_keling_omni_audio_flags.sql @@ -0,0 +1,25 @@ +UPDATE base_model_catalog +SET capabilities = jsonb_set( + jsonb_set(capabilities, '{omni_video,input_audio}', 'false'::jsonb, true), + '{omni_video,max_audios}', '0'::jsonb, true + ), + metadata = jsonb_set( + jsonb_set(metadata, '{rawModel,capabilities,omni_video,input_audio}', 'false'::jsonb, true), + '{rawModel,capabilities,omni_video,max_audios}', '0'::jsonb, true + ), + updated_at = now() +WHERE provider_key = 'keling' + AND provider_model_name IN ('kling-video-o1', 'kling-v3-omni') + AND capabilities ? 'omni_video'; + +UPDATE platform_models m +SET capabilities = jsonb_set( + jsonb_set(m.capabilities, '{omni_video,input_audio}', 'false'::jsonb, true), + '{omni_video,max_audios}', '0'::jsonb, true + ), + updated_at = now() +FROM integration_platforms p +WHERE m.platform_id = p.id + AND p.provider = 'keling' + AND COALESCE(NULLIF(m.provider_model_name, ''), m.model_name) IN ('kling-video-o1', 'kling-v3-omni') + AND m.capabilities ? 'omni_video';