feat: add parameter preprocessing audit trail
This commit is contained in:
parent
9ea83be718
commit
b9c9f457e9
@ -836,8 +836,10 @@ WHERE reference_type = 'gateway_task'
|
||||
}
|
||||
var imageToVideoTask struct {
|
||||
Task struct {
|
||||
Status string `json:"status"`
|
||||
ModelType string `json:"modelType"`
|
||||
ID string `json:"id"`
|
||||
Status string `json:"status"`
|
||||
ModelType string `json:"modelType"`
|
||||
Metrics map[string]any `json:"metrics"`
|
||||
} `json:"task"`
|
||||
}
|
||||
doJSON(t, server.URL, http.MethodPost, "/api/v1/videos/generations", apiKeyResponse.Secret, map[string]any{
|
||||
@ -851,6 +853,29 @@ WHERE reference_type = 'gateway_task'
|
||||
if imageToVideoTask.Task.Status != "succeeded" || imageToVideoTask.Task.ModelType != "image_to_video" {
|
||||
t.Fatalf("image-to-video request should use image_to_video model_type: %+v", imageToVideoTask.Task)
|
||||
}
|
||||
if _, ok := imageToVideoTask.Task.Metrics["parameterPreprocessing"]; ok {
|
||||
t.Fatalf("task metrics should not embed full parameter preprocessing log: %+v", imageToVideoTask.Task.Metrics)
|
||||
}
|
||||
if imageToVideoTask.Task.Metrics["parameterPreprocessingSummary"] == nil {
|
||||
t.Fatalf("task metrics should keep lightweight preprocessing summary: %+v", imageToVideoTask.Task.Metrics)
|
||||
}
|
||||
var preprocessingDetail struct {
|
||||
Items []map[string]any `json:"items"`
|
||||
}
|
||||
doJSON(t, server.URL, http.MethodGet, "/api/v1/tasks/"+imageToVideoTask.Task.ID+"/param-preprocessing", apiKeyResponse.Secret, nil, http.StatusOK, &preprocessingDetail)
|
||||
if len(preprocessingDetail.Items) == 0 {
|
||||
t.Fatalf("task preprocessing endpoint should expose persisted preprocessing logs: %+v", preprocessingDetail)
|
||||
}
|
||||
if preprocessingDetail.Items[0]["actualInput"] == nil || preprocessingDetail.Items[0]["convertedOutput"] == nil {
|
||||
t.Fatalf("preprocessing log should store actual input and converted output: %+v", preprocessingDetail.Items)
|
||||
}
|
||||
var preprocessingRows int
|
||||
if err := testPool.QueryRow(ctx, `SELECT count(*) FROM gateway_task_param_preprocessing_logs WHERE task_id = $1::uuid`, imageToVideoTask.Task.ID).Scan(&preprocessingRows); err != nil {
|
||||
t.Fatalf("count preprocessing logs: %v", err)
|
||||
}
|
||||
if preprocessingRows == 0 {
|
||||
t.Fatalf("expected preprocessing logs in dedicated table for task %s", imageToVideoTask.Task.ID)
|
||||
}
|
||||
|
||||
failoverModel := "phase1-failover-" + suffixText
|
||||
var failedPlatform struct {
|
||||
|
||||
@ -795,6 +795,26 @@ func (s *Server) getTask(w http.ResponseWriter, r *http.Request) {
|
||||
writeError(w, http.StatusInternalServerError, "get task failed")
|
||||
}
|
||||
|
||||
func (s *Server) taskParamPreprocessing(w http.ResponseWriter, r *http.Request) {
|
||||
task, err := s.store.GetTask(r.Context(), r.PathValue("taskID"))
|
||||
if err != nil {
|
||||
if store.IsNotFound(err) {
|
||||
writeError(w, http.StatusNotFound, "task not found")
|
||||
return
|
||||
}
|
||||
s.logger.Error("get task failed", "error", err)
|
||||
writeError(w, http.StatusInternalServerError, "get task failed")
|
||||
return
|
||||
}
|
||||
logs, err := s.store.ListTaskParamPreprocessingLogs(r.Context(), task.ID)
|
||||
if err != nil {
|
||||
s.logger.Error("list task parameter preprocessing logs failed", "taskID", task.ID, "error", err)
|
||||
writeError(w, http.StatusInternalServerError, "list task parameter preprocessing logs failed")
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"items": logs})
|
||||
}
|
||||
|
||||
func (s *Server) taskEvents(w http.ResponseWriter, r *http.Request) {
|
||||
task, err := s.store.GetTask(r.Context(), r.PathValue("taskID"))
|
||||
if err != nil {
|
||||
|
||||
@ -87,6 +87,7 @@ func NewServerWithContext(ctx context.Context, cfg config.Config, db *store.Stor
|
||||
mux.Handle("GET /api/workspace/wallet/transactions", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.listWalletTransactions)))
|
||||
mux.Handle("GET /api/workspace/tasks", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.listTasks)))
|
||||
mux.Handle("GET /api/workspace/tasks/{taskID}", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.getTask)))
|
||||
mux.Handle("GET /api/workspace/tasks/{taskID}/param-preprocessing", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.taskParamPreprocessing)))
|
||||
mux.Handle("GET /api/workspace/tasks/{taskID}/events", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.taskEvents)))
|
||||
mux.Handle("GET /api/admin/pricing/rules", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listPricingRules)))
|
||||
mux.Handle("GET /api/admin/pricing/rule-sets", server.requireAdmin(auth.PermissionPower, http.HandlerFunc(server.listPricingRuleSets)))
|
||||
@ -123,6 +124,7 @@ func NewServerWithContext(ctx context.Context, cfg config.Config, db *store.Stor
|
||||
mux.Handle("POST /api/v1/videos/generations", server.auth.Require(auth.PermissionBasic, server.createTask("videos.generations", false)))
|
||||
mux.Handle("GET /api/v1/tasks", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.listTasks)))
|
||||
mux.Handle("GET /api/v1/tasks/{taskID}", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.getTask)))
|
||||
mux.Handle("GET /api/v1/tasks/{taskID}/param-preprocessing", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.taskParamPreprocessing)))
|
||||
mux.Handle("GET /api/v1/tasks/{taskID}/events", server.auth.Require(auth.PermissionBasic, http.HandlerFunc(server.taskEvents)))
|
||||
mux.Handle("POST /chat/completions", server.auth.Require(auth.PermissionBasic, server.createTask("chat.completions", true)))
|
||||
mux.Handle("POST /v1/chat/completions", server.auth.Require(auth.PermissionBasic, server.createTask("chat.completions", true)))
|
||||
|
||||
1450
apps/api/internal/runner/param_processor.go
Normal file
1450
apps/api/internal/runner/param_processor.go
Normal file
File diff suppressed because it is too large
Load Diff
207
apps/api/internal/runner/param_processor_test.go
Normal file
207
apps/api/internal/runner/param_processor_test.go
Normal file
@ -0,0 +1,207 @@
|
||||
package runner
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/easyai/easyai-ai-gateway/apps/api/internal/store"
|
||||
)
|
||||
|
||||
func TestParamProcessorOmniFiltersUnsupportedVideoAndAudioContent(t *testing.T) {
|
||||
body := map[string]any{
|
||||
"model": "可灵O1",
|
||||
"prompt": "edit the source video",
|
||||
"content": []any{
|
||||
map[string]any{"type": "text", "text": "edit the source video"},
|
||||
map[string]any{"type": "video_url", "role": "video_base", "video_url": map[string]any{"url": "https://example.com/base.mp4", "refer_type": "base"}},
|
||||
map[string]any{"type": "video_url", "role": "reference_video", "video_url": map[string]any{"url": "https://example.com/ref.mp4", "refer_type": "feature"}},
|
||||
map[string]any{"type": "audio_url", "role": "reference_audio", "audio_url": map[string]any{"url": "https://example.com/ref.mp3"}},
|
||||
},
|
||||
}
|
||||
candidate := store.RuntimeModelCandidate{
|
||||
ModelType: "omni_video",
|
||||
Capabilities: map[string]any{
|
||||
"omni_video": map[string]any{
|
||||
"supported_modes": []any{"video_edit"},
|
||||
"max_videos": 1,
|
||||
"input_audio": false,
|
||||
"max_audios": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
result := preprocessRequestWithLog("videos.generations", body, candidate)
|
||||
processed := result.Body
|
||||
content := contentItems(processed["content"])
|
||||
if len(content) != 2 {
|
||||
t.Fatalf("expected text plus one video item, got %+v", content)
|
||||
}
|
||||
if stringFromAny(content[1]["role"]) != "video_base" || isAudioContent(content[1]) {
|
||||
t.Fatalf("unexpected retained content: %+v", content)
|
||||
}
|
||||
for _, item := range content {
|
||||
if isAudioContent(item) || stringFromAny(item["role"]) == "reference_video" {
|
||||
t.Fatalf("unsupported content was not filtered: %+v", content)
|
||||
}
|
||||
}
|
||||
if !result.Log.Changed || len(result.Log.Changes) < 2 {
|
||||
t.Fatalf("expected preprocessing log with filtered video and audio changes, got %+v", result.Log)
|
||||
}
|
||||
if result.Log.Input["content"] == nil || result.Log.Output["content"] == nil {
|
||||
t.Fatalf("preprocessing log should keep actual input and converted output: %+v", result.Log)
|
||||
}
|
||||
foundAudioReason := false
|
||||
for _, change := range result.Log.Changes {
|
||||
if change.Path == "content[3]" && change.CapabilityPath == "capabilities.omni_video.input_audio" {
|
||||
foundAudioReason = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundAudioReason {
|
||||
t.Fatalf("expected audio filtering reason to reference omni_video.input_audio, got %+v", result.Log.Changes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParamProcessorOmniFiltersConvenienceReferenceFields(t *testing.T) {
|
||||
body := map[string]any{
|
||||
"model": "可灵V3多模态",
|
||||
"prompt": "text only",
|
||||
"reference_video": "https://example.com/ref.mp4",
|
||||
"reference_audio": "https://example.com/ref.mp3",
|
||||
}
|
||||
candidate := store.RuntimeModelCandidate{
|
||||
ModelType: "omni_video",
|
||||
Capabilities: map[string]any{
|
||||
"omni_video": map[string]any{
|
||||
"supported_modes": []any{"text_to_video"},
|
||||
"max_videos": 0,
|
||||
"input_audio": false,
|
||||
"max_audios": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
result := preprocessRequestWithLog("videos.generations", body, candidate)
|
||||
processed := result.Body
|
||||
content := contentItems(processed["content"])
|
||||
if len(content) != 1 || stringFromAny(content[0]["type"]) != "text" {
|
||||
t.Fatalf("expected only text content, got %+v", content)
|
||||
}
|
||||
for _, key := range []string{"reference_video", "reference_audio"} {
|
||||
if processed[key] != nil {
|
||||
t.Fatalf("%s should be removed when capability rejects it: %+v", key, processed)
|
||||
}
|
||||
}
|
||||
if len(result.Log.Changes) == 0 {
|
||||
t.Fatalf("expected convenience-field filtering to be logged")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParamProcessorOmniCapabilityLogUsesActualCapabilityKey(t *testing.T) {
|
||||
body := map[string]any{
|
||||
"model": "Omni",
|
||||
"content": []any{
|
||||
map[string]any{"type": "text", "text": "animate"},
|
||||
map[string]any{"type": "audio_url", "role": "reference_audio", "audio_url": map[string]any{"url": "https://example.com/ref.mp3"}},
|
||||
},
|
||||
}
|
||||
candidate := store.RuntimeModelCandidate{
|
||||
ModelType: "omni",
|
||||
Capabilities: map[string]any{
|
||||
"omni": map[string]any{
|
||||
"input_audio": false,
|
||||
"max_audios": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
result := preprocessRequestWithLog("videos.generations", body, candidate)
|
||||
for _, change := range result.Log.Changes {
|
||||
if change.Path == "content[1]" && change.CapabilityPath == "capabilities.omni.input_audio" {
|
||||
return
|
||||
}
|
||||
}
|
||||
t.Fatalf("expected log to reference capabilities.omni.input_audio, got %+v", result.Log.Changes)
|
||||
}
|
||||
|
||||
func TestParamProcessorVideoCapabilitiesNormalizeAndFilter(t *testing.T) {
|
||||
body := map[string]any{
|
||||
"model": "Seedance",
|
||||
"duration": 13,
|
||||
"aspect_ratio": "4:3",
|
||||
"resolution": "1080p",
|
||||
"audio": true,
|
||||
"output_audio": true,
|
||||
"content": []any{
|
||||
map[string]any{"type": "text", "text": "animate it"},
|
||||
map[string]any{"type": "image_url", "role": "first_frame", "image_url": map[string]any{"url": "https://example.com/first.png"}},
|
||||
map[string]any{"type": "image_url", "role": "last_frame", "image_url": map[string]any{"url": "https://example.com/last.png"}},
|
||||
map[string]any{"type": "audio_url", "role": "reference_audio", "audio_url": map[string]any{"url": "https://example.com/ref.mp3"}},
|
||||
},
|
||||
}
|
||||
candidate := store.RuntimeModelCandidate{
|
||||
ModelType: "image_to_video",
|
||||
Capabilities: map[string]any{
|
||||
"image_to_video": map[string]any{
|
||||
"aspect_ratio_allowed": []any{"16:9", "1:1"},
|
||||
"duration_options": []any{4, 8, 12},
|
||||
"input_first_last_frame": false,
|
||||
"input_audio": false,
|
||||
"output_audio": false,
|
||||
"max_images_for_last_frame": 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
result := preprocessRequestWithLog("videos.generations", body, candidate)
|
||||
processed := result.Body
|
||||
if processed["duration"] != float64(12) && processed["duration"] != 12 {
|
||||
t.Fatalf("duration should be snapped to 12, got %+v", processed["duration"])
|
||||
}
|
||||
if processed["aspect_ratio"] != "16:9" {
|
||||
t.Fatalf("aspect_ratio should fall back to first allowed value, got %+v", processed["aspect_ratio"])
|
||||
}
|
||||
if processed["audio"] != nil || processed["output_audio"] != nil {
|
||||
t.Fatalf("output audio flags should be removed: %+v", processed)
|
||||
}
|
||||
for _, item := range contentItems(processed["content"]) {
|
||||
if stringFromAny(item["role"]) == "last_frame" || isAudioContent(item) {
|
||||
t.Fatalf("unsupported content remained: %+v", processed["content"])
|
||||
}
|
||||
}
|
||||
foundDuration := false
|
||||
for _, change := range result.Log.Changes {
|
||||
if change.Path == "duration" && change.CapabilityPath == "capabilities.image_to_video.duration_options" {
|
||||
foundDuration = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !foundDuration {
|
||||
t.Fatalf("expected duration adjustment to reference duration_options, got %+v", result.Log.Changes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParamProcessorImageResolutionAndOutputCount(t *testing.T) {
|
||||
body := map[string]any{
|
||||
"model": "即梦V4.0",
|
||||
"prompt": "draw",
|
||||
"size": "2K",
|
||||
"n": 8,
|
||||
}
|
||||
candidate := store.RuntimeModelCandidate{
|
||||
ModelType: "image_generate",
|
||||
Capabilities: map[string]any{
|
||||
"image_generate": map[string]any{
|
||||
"output_multiple_images": true,
|
||||
"output_max_images_count": 4,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
processed := preprocessRequest("images.generations", body, candidate)
|
||||
if processed["resolution"] != "2K" {
|
||||
t.Fatalf("size resolution should be copied to resolution, got %+v", processed)
|
||||
}
|
||||
if processed["n"] != 4 {
|
||||
t.Fatalf("image count should be capped to 4, got %+v", processed["n"])
|
||||
}
|
||||
}
|
||||
@ -16,11 +16,13 @@ type EstimateResult struct {
|
||||
}
|
||||
|
||||
func (s *Service) Estimate(ctx context.Context, kind string, model string, body map[string]any, user *auth.User) (EstimateResult, error) {
|
||||
body = normalizeRequest(kind, body)
|
||||
candidates, err := s.store.ListModelCandidates(ctx, model, modelTypeFromKind(kind, body), user)
|
||||
if err != nil {
|
||||
return EstimateResult{}, err
|
||||
}
|
||||
candidate := candidates[0]
|
||||
body = preprocessRequest(kind, body, candidate)
|
||||
return EstimateResult{
|
||||
Items: s.estimatedBillings(ctx, user, kind, body, candidate),
|
||||
Resolver: "effective-pricing-v1",
|
||||
|
||||
@ -243,6 +243,9 @@ func summarizeAttempts(attempts []store.TaskAttempt) []map[string]any {
|
||||
if trace, ok := attempt.Metrics["trace"]; ok {
|
||||
item["trace"] = trace
|
||||
}
|
||||
if preprocessing, ok := attempt.Metrics["parameterPreprocessingSummary"]; ok {
|
||||
item["parameterPreprocessingSummary"] = preprocessing
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
return items
|
||||
|
||||
@ -96,11 +96,24 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
|
||||
}
|
||||
return Result{Task: failed, Output: failed.Result}, err
|
||||
}
|
||||
firstCandidateBody := body
|
||||
normalizedModelType := modelType
|
||||
var firstPreprocessing parameterPreprocessingLog
|
||||
if len(candidates) > 0 {
|
||||
estimatedBillings := s.estimatedBillings(ctx, user, task.Kind, body, candidates[0])
|
||||
preprocessing := preprocessRequestWithLog(task.Kind, body, candidates[0])
|
||||
firstCandidateBody = preprocessing.Body
|
||||
firstPreprocessing = preprocessing.Log
|
||||
normalizedModelType = candidates[0].ModelType
|
||||
if err := s.store.MarkTaskRunning(ctx, task.ID, candidates[0].ModelType, firstCandidateBody); err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
estimatedBillings := s.estimatedBillings(ctx, user, task.Kind, firstCandidateBody, candidates[0])
|
||||
if err := s.ensureWalletBalance(ctx, user, estimatedBillings); err != nil {
|
||||
if errors.Is(err, store.ErrInsufficientWalletBalance) {
|
||||
failed, finishErr := s.failTask(ctx, task.ID, "insufficient_balance", err.Error(), task.RunMode == "simulation", err)
|
||||
if logErr := s.recordTaskParameterPreprocessing(ctx, task.ID, "", 0, candidates[0], firstPreprocessing); logErr != nil {
|
||||
return Result{}, logErr
|
||||
}
|
||||
failed, finishErr := s.failTask(ctx, task.ID, "insufficient_balance", err.Error(), task.RunMode == "simulation", err, parameterPreprocessingMetrics(firstPreprocessing))
|
||||
if finishErr != nil {
|
||||
return Result{}, finishErr
|
||||
}
|
||||
@ -109,7 +122,7 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
|
||||
return Result{}, err
|
||||
}
|
||||
}
|
||||
if err := s.emit(ctx, task.ID, "task.progress", "running", "normalizing", 0.15, "request normalized", map[string]any{"modelType": modelType}, task.RunMode == "simulation"); err != nil {
|
||||
if err := s.emit(ctx, task.ID, "task.progress", "running", "normalizing", 0.15, "request normalized", map[string]any{"modelType": normalizedModelType}, task.RunMode == "simulation"); err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
|
||||
@ -122,6 +135,7 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
|
||||
attemptNo := task.AttemptCount
|
||||
var lastErr error
|
||||
var lastCandidate store.RuntimeModelCandidate
|
||||
var lastPreprocessing *parameterPreprocessingLog
|
||||
candidatesLoop:
|
||||
for index, candidate := range candidates {
|
||||
if index >= maxPlatforms {
|
||||
@ -132,11 +146,16 @@ candidatesLoop:
|
||||
var candidateErr error
|
||||
for clientAttempt := 1; clientAttempt <= clientAttempts; clientAttempt++ {
|
||||
nextAttemptNo := attemptNo + 1
|
||||
response, err := s.runCandidate(ctx, task, user, body, candidate, nextAttemptNo, onDelta)
|
||||
preprocessing := preprocessRequestWithLog(task.Kind, body, candidate)
|
||||
preprocessingLog := preprocessing.Log
|
||||
lastPreprocessing = &preprocessingLog
|
||||
candidateBody := preprocessing.Body
|
||||
response, err := s.runCandidate(ctx, task, user, candidateBody, preprocessing.Log, candidate, nextAttemptNo, onDelta)
|
||||
if err == nil {
|
||||
attemptNo = nextAttemptNo
|
||||
billings := s.billings(ctx, user, task.Kind, body, candidate, response, isSimulation(task, candidate))
|
||||
record := buildSuccessRecord(task, user, body, candidate, response, billings, isSimulation(task, candidate))
|
||||
billings := s.billings(ctx, user, task.Kind, candidateBody, candidate, response, isSimulation(task, candidate))
|
||||
record := buildSuccessRecord(task, user, candidateBody, candidate, response, billings, isSimulation(task, candidate))
|
||||
record.Metrics = mergeMetrics(record.Metrics, parameterPreprocessingMetrics(preprocessing.Log))
|
||||
record.Metrics = s.withAttemptHistory(ctx, task.ID, record.Metrics)
|
||||
finished, finishErr := s.store.FinishTaskSuccess(ctx, store.FinishTaskSuccessInput{
|
||||
TaskID: task.ID,
|
||||
@ -305,15 +324,20 @@ candidatesLoop:
|
||||
}
|
||||
return Result{Task: queued, Output: queued.Result}, &TaskQueuedError{Delay: delay}
|
||||
}
|
||||
failed, err := s.failTask(ctx, task.ID, code, message, task.RunMode == "simulation", lastErr)
|
||||
extraMetrics := []map[string]any{}
|
||||
if lastPreprocessing != nil {
|
||||
extraMetrics = append(extraMetrics, parameterPreprocessingMetrics(*lastPreprocessing))
|
||||
}
|
||||
failed, err := s.failTask(ctx, task.ID, code, message, task.RunMode == "simulation", lastErr, extraMetrics...)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
return Result{Task: failed, Output: failed.Result}, lastErr
|
||||
}
|
||||
|
||||
func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, attemptNo int, onDelta clients.StreamDelta) (clients.Response, error) {
|
||||
func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, preprocessing parameterPreprocessingLog, candidate store.RuntimeModelCandidate, attemptNo int, onDelta clients.StreamDelta) (clients.Response, error) {
|
||||
simulated := isSimulation(task, candidate)
|
||||
baseAttemptMetrics := mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), parameterPreprocessingMetrics(preprocessing))
|
||||
reservations := s.rateLimitReservations(ctx, user, candidate, body)
|
||||
limitResult, err := s.store.ReserveRateLimits(ctx, task.ID, "", reservations)
|
||||
if err != nil {
|
||||
@ -339,18 +363,30 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
Status: "running",
|
||||
Simulated: simulated,
|
||||
RequestSnapshot: body,
|
||||
Metrics: attemptMetrics(candidate, attemptNo, simulated),
|
||||
Metrics: baseAttemptMetrics,
|
||||
})
|
||||
if err != nil {
|
||||
return clients.Response{}, fmt.Errorf("create task attempt: %w", err)
|
||||
}
|
||||
if err := s.recordTaskParameterPreprocessing(ctx, task.ID, attemptID, attemptNo, candidate, preprocessing); err != nil {
|
||||
clientErr := &clients.ClientError{Code: "runtime_error", Message: err.Error(), Retryable: false}
|
||||
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
|
||||
AttemptID: attemptID,
|
||||
Status: "failed",
|
||||
Retryable: false,
|
||||
Metrics: mergeMetrics(baseAttemptMetrics, map[string]any{"error": err.Error(), "retryable": false, "trace": []any{failureTraceEntry(clientErr, false)}}),
|
||||
ErrorCode: clients.ErrorCode(clientErr),
|
||||
ErrorMessage: err.Error(),
|
||||
})
|
||||
return clients.Response{}, fmt.Errorf("record parameter preprocessing: %w", err)
|
||||
}
|
||||
if err := s.store.AttachRateLimitResultToAttempt(ctx, attemptID, limitResult); err != nil {
|
||||
clientErr := &clients.ClientError{Code: "runtime_error", Message: err.Error(), Retryable: false}
|
||||
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
|
||||
AttemptID: attemptID,
|
||||
Status: "failed",
|
||||
Retryable: false,
|
||||
Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false, "trace": []any{failureTraceEntry(clientErr, false)}}),
|
||||
Metrics: mergeMetrics(baseAttemptMetrics, map[string]any{"error": err.Error(), "retryable": false, "trace": []any{failureTraceEntry(clientErr, false)}}),
|
||||
ErrorCode: clients.ErrorCode(clientErr),
|
||||
ErrorMessage: err.Error(),
|
||||
})
|
||||
@ -371,7 +407,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
AttemptID: attemptID,
|
||||
Status: "failed",
|
||||
Retryable: false,
|
||||
Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false, "trace": []any{failureTraceEntry(err, false)}}),
|
||||
Metrics: mergeMetrics(baseAttemptMetrics, map[string]any{"error": err.Error(), "retryable": false, "trace": []any{failureTraceEntry(err, false)}}),
|
||||
ErrorCode: clients.ErrorCode(err),
|
||||
ErrorMessage: err.Error(),
|
||||
})
|
||||
@ -425,7 +461,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
responseDurationMS = 0
|
||||
}
|
||||
}
|
||||
metrics = mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), metrics)
|
||||
metrics = mergeMetrics(baseAttemptMetrics, metrics)
|
||||
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
|
||||
AttemptID: attemptID,
|
||||
Status: "failed",
|
||||
@ -444,7 +480,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
}
|
||||
uploadedResult, err := s.uploadGeneratedAssets(ctx, response.Result)
|
||||
if err != nil {
|
||||
metrics := mergeMetrics(taskMetrics(task, user, body, candidate, response, simulated), map[string]any{
|
||||
metrics := mergeMetrics(taskMetrics(task, user, body, candidate, response, simulated), parameterPreprocessingMetrics(preprocessing), map[string]any{
|
||||
"error": err.Error(),
|
||||
"retryable": clients.IsRetryable(err),
|
||||
"trace": []any{failureTraceEntry(err, clients.IsRetryable(err))},
|
||||
@ -480,7 +516,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
Status: "succeeded",
|
||||
RequestID: response.RequestID,
|
||||
Usage: usageToMap(response.Usage),
|
||||
Metrics: taskMetrics(task, user, body, candidate, response, simulated),
|
||||
Metrics: mergeMetrics(taskMetrics(task, user, body, candidate, response, simulated), parameterPreprocessingMetrics(preprocessing)),
|
||||
ResponseSnapshot: response.Result,
|
||||
ResponseStartedAt: response.ResponseStartedAt,
|
||||
ResponseFinishedAt: response.ResponseFinishedAt,
|
||||
@ -491,6 +527,25 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (s *Service) recordTaskParameterPreprocessing(ctx context.Context, taskID string, attemptID string, attemptNo int, candidate store.RuntimeModelCandidate, log parameterPreprocessingLog) error {
|
||||
_, err := s.store.CreateTaskParamPreprocessingLog(ctx, store.CreateTaskParamPreprocessingLogInput{
|
||||
TaskID: taskID,
|
||||
AttemptID: attemptID,
|
||||
AttemptNo: attemptNo,
|
||||
ModelType: log.ModelType,
|
||||
PlatformID: candidate.PlatformID,
|
||||
PlatformModelID: candidate.PlatformModelID,
|
||||
ClientID: candidate.ClientID,
|
||||
Changed: log.Changed,
|
||||
ChangeCount: len(log.Changes),
|
||||
ActualInput: log.Input,
|
||||
ConvertedOutput: log.Output,
|
||||
Changes: log.Changes,
|
||||
ModelSnapshot: log.Model,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Service) clientFor(candidate store.RuntimeModelCandidate, simulated bool) clients.Client {
|
||||
if simulated {
|
||||
return s.clients["simulation"]
|
||||
@ -505,8 +560,12 @@ func (s *Service) clientFor(candidate store.RuntimeModelCandidate, simulated boo
|
||||
return s.clients["openai"]
|
||||
}
|
||||
|
||||
func (s *Service) failTask(ctx context.Context, taskID string, code string, message string, simulated bool, cause error) (store.GatewayTask, error) {
|
||||
func (s *Service) failTask(ctx context.Context, taskID string, code string, message string, simulated bool, cause error, extraMetrics ...map[string]any) (store.GatewayTask, error) {
|
||||
requestID, metrics, responseStartedAt, responseFinishedAt, responseDurationMS := failureMetrics(cause, simulated)
|
||||
if len(extraMetrics) > 0 {
|
||||
values := append([]map[string]any{metrics}, extraMetrics...)
|
||||
metrics = mergeMetrics(values...)
|
||||
}
|
||||
metrics = s.withAttemptHistory(ctx, taskID, metrics)
|
||||
failed, err := s.store.FinishTaskFailure(ctx, store.FinishTaskFailureInput{
|
||||
TaskID: taskID,
|
||||
@ -589,6 +648,9 @@ func (s *Service) emit(ctx context.Context, taskID string, eventType string, sta
|
||||
}
|
||||
|
||||
func modelTypeFromKind(kind string, body map[string]any) string {
|
||||
if requested := requestedModelTypeFromBody(body); requested != "" {
|
||||
return requested
|
||||
}
|
||||
switch kind {
|
||||
case "chat.completions", "responses":
|
||||
return "text_generate"
|
||||
@ -598,6 +660,9 @@ func modelTypeFromKind(kind string, body map[string]any) string {
|
||||
}
|
||||
return "image_generate"
|
||||
case "videos.generations":
|
||||
if videoRequestHasVideoOrAudioReference(body) {
|
||||
return "omni_video"
|
||||
}
|
||||
if videoRequestHasReferenceImage(body) {
|
||||
return "image_to_video"
|
||||
}
|
||||
@ -607,6 +672,25 @@ func modelTypeFromKind(kind string, body map[string]any) string {
|
||||
}
|
||||
}
|
||||
|
||||
func requestedModelTypeFromBody(body map[string]any) string {
|
||||
for _, key := range []string{"modelType", "model_type", "capability", "capabilityType"} {
|
||||
value := strings.TrimSpace(stringFromMap(body, key))
|
||||
if isKnownModelType(value) {
|
||||
return value
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func isKnownModelType(value string) bool {
|
||||
switch value {
|
||||
case "text_generate", "image_generate", "image_edit", "video_generate", "image_to_video", "text_to_video", "video_edit", "omni_video", "omni":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func videoRequestHasReferenceImage(body map[string]any) bool {
|
||||
if body == nil {
|
||||
return false
|
||||
@ -622,6 +706,23 @@ func videoRequestHasReferenceImage(body map[string]any) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func videoRequestHasVideoOrAudioReference(body map[string]any) bool {
|
||||
if body == nil {
|
||||
return false
|
||||
}
|
||||
for _, key := range []string{"video", "video_url", "videoUrl", "reference_video", "referenceVideo", "audio_url", "audioUrl", "reference_audio", "referenceAudio"} {
|
||||
if hasAnyString(body, key) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
for _, item := range contentItems(body["content"]) {
|
||||
if isVideoContent(item) || isAudioContent(item) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isTextGenerationKind(kind string) bool {
|
||||
return kind == "chat.completions" || kind == "responses"
|
||||
}
|
||||
@ -692,10 +793,7 @@ func failoverTimeBudgetExceeded(start time.Time, maxDuration time.Duration) bool
|
||||
}
|
||||
|
||||
func normalizeRequest(kind string, body map[string]any) map[string]any {
|
||||
out := map[string]any{}
|
||||
for key, value := range body {
|
||||
out[key] = value
|
||||
}
|
||||
out := cloneMap(body)
|
||||
if kind == "responses" && out["messages"] == nil && out["input"] != nil {
|
||||
out["messages"] = []any{map[string]any{"role": "user", "content": out["input"]}}
|
||||
}
|
||||
|
||||
@ -488,6 +488,24 @@ type TaskAttempt struct {
|
||||
FinishedAt string `json:"finishedAt,omitempty"`
|
||||
}
|
||||
|
||||
type TaskParamPreprocessingLog struct {
|
||||
ID string `json:"id"`
|
||||
TaskID string `json:"taskId"`
|
||||
AttemptID string `json:"attemptId,omitempty"`
|
||||
AttemptNo int `json:"attemptNo,omitempty"`
|
||||
ModelType string `json:"modelType,omitempty"`
|
||||
PlatformID string `json:"platformId,omitempty"`
|
||||
PlatformModelID string `json:"platformModelId,omitempty"`
|
||||
ClientID string `json:"clientId,omitempty"`
|
||||
Changed bool `json:"changed"`
|
||||
ChangeCount int `json:"changeCount"`
|
||||
ActualInput map[string]any `json:"actualInput,omitempty"`
|
||||
ConvertedOutput map[string]any `json:"convertedOutput,omitempty"`
|
||||
Changes []any `json:"changes,omitempty"`
|
||||
ModelSnapshot map[string]any `json:"model,omitempty"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
}
|
||||
|
||||
func (s *Store) ListPlatforms(ctx context.Context) ([]Platform, error) {
|
||||
rows, err := s.pool.Query(ctx, `
|
||||
SELECT id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority,
|
||||
|
||||
@ -214,3 +214,19 @@ type FinishTaskFailureInput struct {
|
||||
ResponseFinishedAt time.Time
|
||||
ResponseDurationMS int64
|
||||
}
|
||||
|
||||
type CreateTaskParamPreprocessingLogInput struct {
|
||||
TaskID string
|
||||
AttemptID string
|
||||
AttemptNo int
|
||||
ModelType string
|
||||
PlatformID string
|
||||
PlatformModelID string
|
||||
ClientID string
|
||||
Changed bool
|
||||
ChangeCount int
|
||||
ActualInput map[string]any
|
||||
ConvertedOutput map[string]any
|
||||
Changes any
|
||||
ModelSnapshot map[string]any
|
||||
}
|
||||
|
||||
@ -328,6 +328,47 @@ WHERE id = $1::uuid`, input.TaskID, input.AttemptNo); err != nil {
|
||||
return attemptID, tx.Commit(ctx)
|
||||
}
|
||||
|
||||
func (s *Store) CreateTaskParamPreprocessingLog(ctx context.Context, input CreateTaskParamPreprocessingLogInput) (string, error) {
|
||||
actualInputJSON, _ := json.Marshal(emptyObjectIfNil(input.ActualInput))
|
||||
convertedOutputJSON, _ := json.Marshal(emptyObjectIfNil(input.ConvertedOutput))
|
||||
changesJSON, _ := json.Marshal(input.Changes)
|
||||
if input.Changes == nil {
|
||||
changesJSON = []byte("[]")
|
||||
}
|
||||
modelSnapshotJSON, _ := json.Marshal(emptyObjectIfNil(input.ModelSnapshot))
|
||||
var attemptNo any
|
||||
if input.AttemptNo > 0 {
|
||||
attemptNo = input.AttemptNo
|
||||
}
|
||||
var id string
|
||||
err := s.pool.QueryRow(ctx, `
|
||||
INSERT INTO gateway_task_param_preprocessing_logs (
|
||||
task_id, attempt_id, attempt_no, model_type, platform_id, platform_model_id, client_id,
|
||||
changed, change_count, actual_input, converted_output, changes, model_snapshot
|
||||
)
|
||||
VALUES (
|
||||
$1::uuid, NULLIF($2::text, '')::uuid, $3::int, NULLIF($4::text, ''),
|
||||
NULLIF($5::text, '')::uuid, NULLIF($6::text, '')::uuid, NULLIF($7::text, ''),
|
||||
$8, $9::int, $10::jsonb, $11::jsonb, $12::jsonb, $13::jsonb
|
||||
)
|
||||
RETURNING id::text`,
|
||||
input.TaskID,
|
||||
input.AttemptID,
|
||||
attemptNo,
|
||||
input.ModelType,
|
||||
input.PlatformID,
|
||||
input.PlatformModelID,
|
||||
input.ClientID,
|
||||
input.Changed,
|
||||
input.ChangeCount,
|
||||
string(actualInputJSON),
|
||||
string(convertedOutputJSON),
|
||||
string(changesJSON),
|
||||
string(modelSnapshotJSON),
|
||||
).Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
|
||||
func (s *Store) attachTaskAttempts(ctx context.Context, items []GatewayTask) ([]GatewayTask, error) {
|
||||
if len(items) == 0 {
|
||||
return items, nil
|
||||
@ -354,6 +395,31 @@ func (s *Store) ListTaskAttempts(ctx context.Context, taskID string) ([]TaskAtte
|
||||
return attemptsByTaskID[taskID], nil
|
||||
}
|
||||
|
||||
func (s *Store) ListTaskParamPreprocessingLogs(ctx context.Context, taskID string) ([]TaskParamPreprocessingLog, error) {
|
||||
rows, err := s.pool.Query(ctx, `
|
||||
SELECT id::text, task_id::text, COALESCE(attempt_id::text, ''), COALESCE(attempt_no, 0),
|
||||
COALESCE(model_type, ''), COALESCE(platform_id::text, ''), COALESCE(platform_model_id::text, ''),
|
||||
COALESCE(client_id, ''), changed, change_count,
|
||||
COALESCE(actual_input, '{}'::jsonb), COALESCE(converted_output, '{}'::jsonb),
|
||||
COALESCE(changes, '[]'::jsonb), COALESCE(model_snapshot, '{}'::jsonb), created_at
|
||||
FROM gateway_task_param_preprocessing_logs
|
||||
WHERE task_id = $1::uuid
|
||||
ORDER BY COALESCE(attempt_no, 0), created_at`, taskID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := make([]TaskParamPreprocessingLog, 0)
|
||||
for rows.Next() {
|
||||
item, err := scanTaskParamPreprocessingLog(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
return items, rows.Err()
|
||||
}
|
||||
|
||||
func (s *Store) AppendTaskAttemptTrace(ctx context.Context, taskID string, attemptNo int, entry map[string]any) error {
|
||||
entryJSON, _ := json.Marshal(emptyObjectIfNil(entry))
|
||||
_, err := s.pool.Exec(ctx, `
|
||||
@ -459,6 +525,38 @@ func scanTaskAttempt(scanner taskScanner) (TaskAttempt, error) {
|
||||
return item, nil
|
||||
}
|
||||
|
||||
func scanTaskParamPreprocessingLog(scanner taskScanner) (TaskParamPreprocessingLog, error) {
|
||||
var item TaskParamPreprocessingLog
|
||||
var actualInputBytes []byte
|
||||
var convertedOutputBytes []byte
|
||||
var changesBytes []byte
|
||||
var modelSnapshotBytes []byte
|
||||
if err := scanner.Scan(
|
||||
&item.ID,
|
||||
&item.TaskID,
|
||||
&item.AttemptID,
|
||||
&item.AttemptNo,
|
||||
&item.ModelType,
|
||||
&item.PlatformID,
|
||||
&item.PlatformModelID,
|
||||
&item.ClientID,
|
||||
&item.Changed,
|
||||
&item.ChangeCount,
|
||||
&actualInputBytes,
|
||||
&convertedOutputBytes,
|
||||
&changesBytes,
|
||||
&modelSnapshotBytes,
|
||||
&item.CreatedAt,
|
||||
); err != nil {
|
||||
return TaskParamPreprocessingLog{}, err
|
||||
}
|
||||
item.ActualInput = decodeObject(actualInputBytes)
|
||||
item.ConvertedOutput = decodeObject(convertedOutputBytes)
|
||||
item.Changes = decodeArray(changesBytes)
|
||||
item.ModelSnapshot = decodeObject(modelSnapshotBytes)
|
||||
return item, nil
|
||||
}
|
||||
|
||||
func enrichTaskAttemptFromMetrics(item *TaskAttempt) {
|
||||
if item == nil || len(item.Metrics) == 0 {
|
||||
return
|
||||
|
||||
30
apps/api/migrations/0033_task_param_preprocessing_logs.sql
Normal file
30
apps/api/migrations/0033_task_param_preprocessing_logs.sql
Normal file
@ -0,0 +1,30 @@
|
||||
CREATE TABLE IF NOT EXISTS gateway_task_param_preprocessing_logs (
|
||||
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
task_id uuid NOT NULL REFERENCES gateway_tasks(id) ON DELETE CASCADE,
|
||||
attempt_id uuid REFERENCES gateway_task_attempts(id) ON DELETE SET NULL,
|
||||
attempt_no integer,
|
||||
model_type text NOT NULL DEFAULT '',
|
||||
platform_id uuid REFERENCES integration_platforms(id) ON DELETE SET NULL,
|
||||
platform_model_id uuid REFERENCES platform_models(id) ON DELETE SET NULL,
|
||||
client_id text,
|
||||
changed boolean NOT NULL DEFAULT false,
|
||||
change_count integer NOT NULL DEFAULT 0,
|
||||
actual_input jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
converted_output jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
changes jsonb NOT NULL DEFAULT '[]'::jsonb,
|
||||
model_snapshot jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
created_at timestamptz NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_gateway_task_param_pre_logs_task
|
||||
ON gateway_task_param_preprocessing_logs(task_id, created_at);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_gateway_task_param_pre_logs_attempt
|
||||
ON gateway_task_param_preprocessing_logs(attempt_id)
|
||||
WHERE attempt_id IS NOT NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_gateway_task_param_pre_logs_changed
|
||||
ON gateway_task_param_preprocessing_logs(changed, created_at DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_gateway_task_param_pre_logs_changes
|
||||
ON gateway_task_param_preprocessing_logs USING gin(changes);
|
||||
@ -942,6 +942,7 @@ export function App() {
|
||||
message={coreMessage}
|
||||
section={workspaceSection}
|
||||
state={coreState}
|
||||
token={token}
|
||||
taskQuery={workspaceTaskQuery}
|
||||
taskTotal={taskTotal}
|
||||
transactionQuery={workspaceTransactionQuery}
|
||||
|
||||
@ -16,6 +16,7 @@ import type {
|
||||
GatewayTenantUpsertRequest,
|
||||
GatewayNetworkProxyConfig,
|
||||
GatewayTask,
|
||||
GatewayTaskParamPreprocessingLog,
|
||||
GatewayUser,
|
||||
GatewayUserUpsertRequest,
|
||||
GatewayWalletTransaction,
|
||||
@ -660,6 +661,13 @@ export async function getTask(token: string, taskId: string): Promise<GatewayTas
|
||||
return request<GatewayTask>(`/api/workspace/tasks/${taskId}`, { token });
|
||||
}
|
||||
|
||||
export async function listTaskParamPreprocessing(
|
||||
token: string,
|
||||
taskId: string,
|
||||
): Promise<ListResponse<GatewayTaskParamPreprocessingLog>> {
|
||||
return request<ListResponse<GatewayTaskParamPreprocessingLog>>(`/api/workspace/tasks/${taskId}/param-preprocessing`, { token });
|
||||
}
|
||||
|
||||
export async function pollTaskUntilSettled(
|
||||
token: string,
|
||||
task: GatewayTask,
|
||||
|
||||
@ -1,12 +1,13 @@
|
||||
import { useEffect, useMemo, useState, type FormEvent, type ReactNode } from 'react';
|
||||
import { Popover as AntPopover } from 'antd';
|
||||
import { ChevronLeft, ChevronRight, Copy, CreditCard, Eye, KeyRound, ListChecks, Plus, ReceiptText, RotateCcw, Search, ShieldCheck, Trash2, UserRound } from 'lucide-react';
|
||||
import type { GatewayAccessRuleBatchRequest, GatewayApiKey, GatewayTask, GatewayWalletAccount, GatewayWalletTransaction, IntegrationPlatform, PlatformModel } from '@easyai-ai-gateway/contracts';
|
||||
import { ChevronLeft, ChevronRight, Copy, CreditCard, Eye, KeyRound, ListChecks, Plus, ReceiptText, RotateCcw, Search, ShieldCheck, SlidersHorizontal, Trash2, UserRound } from 'lucide-react';
|
||||
import type { GatewayAccessRuleBatchRequest, GatewayApiKey, GatewayTask, GatewayTaskParamPreprocessingLog, GatewayWalletAccount, GatewayWalletTransaction, IntegrationPlatform, PlatformModel } from '@easyai-ai-gateway/contracts';
|
||||
import type { ConsoleData } from '../app-state';
|
||||
import { EntityTable } from '../components/EntityTable';
|
||||
import { Badge, Button, Card, CardContent, CardHeader, CardTitle, ConfirmDialog, DateTimePicker, DateTimeRangePicker, FormDialog, Input, Label, Select, Table, TableCell, TableFooter, TableHead, TablePageActions, TableRow, TableToolbar, TableViewportLayout, Tabs } from '../components/ui';
|
||||
import { AccessPermissionEditor, countAccessPermissionRules } from './admin/AccessPermissionEditor';
|
||||
import type { ApiKeyForm, LoadState, WorkspaceSection, WorkspaceTaskQuery, WorkspaceTransactionQuery } from '../types';
|
||||
import { listTaskParamPreprocessing } from '../api';
|
||||
|
||||
const tabs = [
|
||||
{ value: 'overview', label: '个人总览', icon: <UserRound size={15} /> },
|
||||
@ -27,6 +28,7 @@ export function WorkspacePage(props: {
|
||||
message: string;
|
||||
section: WorkspaceSection;
|
||||
state: LoadState;
|
||||
token: string;
|
||||
taskQuery: WorkspaceTaskQuery;
|
||||
taskTotal: number;
|
||||
transactionQuery: WorkspaceTransactionQuery;
|
||||
@ -48,7 +50,7 @@ export function WorkspacePage(props: {
|
||||
{props.section === 'overview' && <WorkspaceOverview data={props.data} />}
|
||||
{props.section === 'billing' && <BillingPanel walletAccounts={props.data.walletAccounts} />}
|
||||
{props.section === 'apiKeys' && <ApiKeyPanel {...props} />}
|
||||
{props.section === 'tasks' && <TaskPanel data={props.data} query={props.taskQuery} total={props.taskTotal} onQueryChange={props.onTaskQueryChange} />}
|
||||
{props.section === 'tasks' && <TaskPanel data={props.data} query={props.taskQuery} token={props.token} total={props.taskTotal} onQueryChange={props.onTaskQueryChange} />}
|
||||
{props.section === 'transactions' && <ConsumptionPanel data={props.data} query={props.transactionQuery} total={props.transactionTotal} onQueryChange={props.onTransactionQueryChange} />}
|
||||
</div>
|
||||
</div>
|
||||
@ -545,6 +547,7 @@ function ApiKeyPanel(props: {
|
||||
function TaskPanel(props: {
|
||||
data: ConsoleData;
|
||||
query: WorkspaceTaskQuery;
|
||||
token: string;
|
||||
total: number;
|
||||
onQueryChange: (value: WorkspaceTaskQuery) => void;
|
||||
}) {
|
||||
@ -675,6 +678,7 @@ function TaskPanel(props: {
|
||||
<TableHead>状态</TableHead>
|
||||
<TableHead>模型</TableHead>
|
||||
<TableHead>尝试链路</TableHead>
|
||||
<TableHead>参数转换</TableHead>
|
||||
<TableHead>类型</TableHead>
|
||||
<TableHead>API Key</TableHead>
|
||||
<TableHead>Token</TableHead>
|
||||
@ -684,7 +688,7 @@ function TaskPanel(props: {
|
||||
<TableHead>原始 JSON</TableHead>
|
||||
</TableRow>
|
||||
{tasks.map((task) => (
|
||||
<TaskRecord key={task.id} task={task} onCopyRequestId={copyTaskRequestId} onOpenJson={setJsonTask} />
|
||||
<TaskRecord key={task.id} task={task} token={props.token} onCopyRequestId={copyTaskRequestId} onOpenJson={setJsonTask} />
|
||||
))}
|
||||
</Table>
|
||||
) : (
|
||||
@ -759,7 +763,7 @@ function TaskPanel(props: {
|
||||
);
|
||||
}
|
||||
|
||||
function TaskRecord(props: { task: GatewayTask; onCopyRequestId: (task: GatewayTask) => Promise<void>; onOpenJson: (task: GatewayTask) => void }) {
|
||||
function TaskRecord(props: { task: GatewayTask; token: string; onCopyRequestId: (task: GatewayTask) => Promise<void>; onOpenJson: (task: GatewayTask) => void }) {
|
||||
const usage = props.task.usage ?? {};
|
||||
const tokenUsage = formatTokenUsage(usage);
|
||||
const chargeText = props.task.finalChargeAmount !== undefined ? formatCellValue(props.task.finalChargeAmount) : '-';
|
||||
@ -801,6 +805,9 @@ function TaskRecord(props: { task: GatewayTask; onCopyRequestId: (task: GatewayT
|
||||
<TableCell className="taskRecordAttemptCell">
|
||||
<TaskAttemptChain task={props.task} />
|
||||
</TableCell>
|
||||
<TableCell className="taskRecordParamCell">
|
||||
<TaskParamConversionCell task={props.task} token={props.token} />
|
||||
</TableCell>
|
||||
<TableCell>{props.task.modelType || '-'}</TableCell>
|
||||
<TableCell>{props.task.apiKeyName || props.task.apiKeyPrefix || props.task.apiKeyId || '-'}</TableCell>
|
||||
<TableCell className="taskRecordTokenCell">{tokenUsage}</TableCell>
|
||||
@ -817,6 +824,108 @@ function TaskRecord(props: { task: GatewayTask; onCopyRequestId: (task: GatewayT
|
||||
);
|
||||
}
|
||||
|
||||
type TaskParamConversionSummary = {
|
||||
changed: boolean;
|
||||
changeCount: number;
|
||||
actions: string[];
|
||||
paths: string[];
|
||||
capabilityPaths: string[];
|
||||
};
|
||||
|
||||
function TaskParamConversionCell(props: { task: GatewayTask; token: string }) {
|
||||
const summary = taskParamConversionSummary(props.task);
|
||||
const [open, setOpen] = useState(false);
|
||||
const [logs, setLogs] = useState<GatewayTaskParamPreprocessingLog[] | null>(null);
|
||||
const [loadState, setLoadState] = useState<'idle' | 'loading' | 'ready' | 'error'>('idle');
|
||||
const [error, setError] = useState('');
|
||||
|
||||
useEffect(() => {
|
||||
if (!open || !summary.changed || logs || loadState === 'loading' || !props.token) return;
|
||||
let cancelled = false;
|
||||
setLoadState('loading');
|
||||
setError('');
|
||||
listTaskParamPreprocessing(props.token, props.task.id)
|
||||
.then((response) => {
|
||||
if (cancelled) return;
|
||||
setLogs(response.items ?? []);
|
||||
setLoadState('ready');
|
||||
})
|
||||
.catch((err) => {
|
||||
if (cancelled) return;
|
||||
setError(err instanceof Error ? err.message : '参数转换明细加载失败');
|
||||
setLoadState('error');
|
||||
});
|
||||
return () => {
|
||||
cancelled = true;
|
||||
};
|
||||
}, [loadState, logs, open, props.task.id, props.token, summary.changed]);
|
||||
|
||||
if (!summary.changed) {
|
||||
return <span className="taskParamConversionEmpty">无转换</span>;
|
||||
}
|
||||
|
||||
return (
|
||||
<AntPopover
|
||||
align={{ offset: [0, 8] }}
|
||||
content={<TaskParamConversionPopover error={error} loadState={loadState} logs={logs} summary={summary} />}
|
||||
overlayClassName="taskParamConversionAntPopover"
|
||||
placement="bottomLeft"
|
||||
trigger={['hover', 'focus']}
|
||||
onOpenChange={setOpen}
|
||||
>
|
||||
<button className="taskParamConversionTrigger" type="button" aria-label={`参数转换 ${summary.changeCount} 项`}>
|
||||
<SlidersHorizontal size={14} />
|
||||
<span>{summary.changeCount || '有'} 项</span>
|
||||
</button>
|
||||
</AntPopover>
|
||||
);
|
||||
}
|
||||
|
||||
function TaskParamConversionPopover(props: {
|
||||
error: string;
|
||||
loadState: 'idle' | 'loading' | 'ready' | 'error';
|
||||
logs: GatewayTaskParamPreprocessingLog[] | null;
|
||||
summary: TaskParamConversionSummary;
|
||||
}) {
|
||||
const logs = props.logs ?? [];
|
||||
return (
|
||||
<span className="taskParamConversionPopover" role="tooltip">
|
||||
<span className="taskParamConversionPopoverHeader">
|
||||
<strong>参数转换汇总</strong>
|
||||
<small>{taskParamSummaryText(props.summary)}</small>
|
||||
</span>
|
||||
{props.loadState === 'loading' && <span className="taskParamConversionState">正在加载转换明细...</span>}
|
||||
{props.loadState === 'error' && <span className="taskParamConversionState error">{props.error || '参数转换明细加载失败'}</span>}
|
||||
{props.loadState === 'ready' && logs.length === 0 && <span className="taskParamConversionState">暂无转换明细。</span>}
|
||||
{logs.map((log) => (
|
||||
<span key={log.id} className="taskParamConversionLog">
|
||||
<span className="taskParamConversionLogHeader">
|
||||
<strong>{taskParamLogTitle(log)}</strong>
|
||||
<Badge variant={log.changed ? 'secondary' : 'outline'}>{log.changeCount} 项</Badge>
|
||||
</span>
|
||||
{(log.changes ?? []).slice(0, 8).map((change, index) => (
|
||||
<span key={`${log.id}-change-${index}`} className="taskParamConversionChange">
|
||||
<span className="taskParamConversionChangeTop">
|
||||
<Badge variant="outline">{taskParamActionLabel(objectString(change, 'action'))}</Badge>
|
||||
<strong>{objectString(change, 'path') || '-'}</strong>
|
||||
</span>
|
||||
<span>{objectString(change, 'reason') || '按模型能力配置调整参数。'}</span>
|
||||
{objectString(change, 'capabilityPath') && <small>能力配置:{objectString(change, 'capabilityPath')}</small>}
|
||||
<code>{taskParamChangePreview(change)}</code>
|
||||
</span>
|
||||
))}
|
||||
{(log.changes?.length ?? 0) > 8 && <small>还有 {(log.changes?.length ?? 0) - 8} 项转换未展开。</small>}
|
||||
</span>
|
||||
))}
|
||||
{props.loadState !== 'ready' && props.summary.capabilityPaths.length > 0 && (
|
||||
<span className="taskParamConversionSummaryPaths">
|
||||
{props.summary.capabilityPaths.slice(0, 4).map((path) => <code key={path}>{path}</code>)}
|
||||
</span>
|
||||
)}
|
||||
</span>
|
||||
);
|
||||
}
|
||||
|
||||
function TaskAttemptChain(props: { task: GatewayTask }) {
|
||||
const attempts = props.task.attempts ?? [];
|
||||
if (!attempts.length) return <span>-</span>;
|
||||
@ -999,6 +1108,73 @@ function taskAttemptTraceReasonLabel(reason: string) {
|
||||
return labels[reason] ?? reason;
|
||||
}
|
||||
|
||||
function taskParamConversionSummary(task: GatewayTask): TaskParamConversionSummary {
|
||||
const summary: TaskParamConversionSummary = {
|
||||
changed: false,
|
||||
changeCount: 0,
|
||||
actions: [],
|
||||
paths: [],
|
||||
capabilityPaths: [],
|
||||
};
|
||||
mergeTaskParamSummary(summary, metadataObject(task.metrics, 'parameterPreprocessingSummary'));
|
||||
for (const attempt of task.attempts ?? []) {
|
||||
mergeTaskParamSummary(summary, metadataObject(attempt.metrics, 'parameterPreprocessingSummary'));
|
||||
}
|
||||
return summary;
|
||||
}
|
||||
|
||||
function mergeTaskParamSummary(target: TaskParamConversionSummary, raw: Record<string, unknown>) {
|
||||
if (!Object.keys(raw).length) return;
|
||||
target.changed = target.changed || raw.changed === true;
|
||||
const changeCount = metadataNumber(raw, 'changeCount');
|
||||
if (changeCount) target.changeCount += Math.max(0, Math.trunc(changeCount));
|
||||
for (const action of metadataStringList(raw, 'actions')) appendUniqueText(target.actions, action);
|
||||
for (const path of metadataStringList(raw, 'paths')) appendUniqueText(target.paths, path);
|
||||
for (const path of metadataStringList(raw, 'capabilityPaths')) appendUniqueText(target.capabilityPaths, path);
|
||||
}
|
||||
|
||||
function taskParamSummaryText(summary: TaskParamConversionSummary) {
|
||||
const actionText = summary.actions.map(taskParamActionLabel).join('、');
|
||||
const parts = [
|
||||
`${summary.changeCount || summary.paths.length || 1} 项转换`,
|
||||
actionText ? `动作 ${actionText}` : '',
|
||||
summary.capabilityPaths.length ? `涉及 ${summary.capabilityPaths.length} 项能力配置` : '',
|
||||
].filter(Boolean);
|
||||
return parts.join(' · ');
|
||||
}
|
||||
|
||||
function taskParamLogTitle(log: GatewayTaskParamPreprocessingLog) {
|
||||
const parts = [
|
||||
log.attemptNo ? `#${log.attemptNo}` : '',
|
||||
log.modelType || '',
|
||||
log.clientId || '',
|
||||
].filter(Boolean);
|
||||
return parts.join(' · ') || '预处理记录';
|
||||
}
|
||||
|
||||
function taskParamActionLabel(action: string) {
|
||||
if (action === 'remove') return '移除';
|
||||
if (action === 'adjust') return '调整';
|
||||
if (action === 'set') return '补齐';
|
||||
return action || '转换';
|
||||
}
|
||||
|
||||
function taskParamChangePreview(change: Record<string, unknown>) {
|
||||
const before = previewCompactValue(change.before);
|
||||
const after = previewCompactValue(change.after);
|
||||
const action = objectString(change, 'action');
|
||||
if (action === 'remove') return `原值 ${before}`;
|
||||
if (action === 'set') return `新值 ${after}`;
|
||||
return `原值 ${before} -> 新值 ${after}`;
|
||||
}
|
||||
|
||||
function previewCompactValue(value: unknown) {
|
||||
if (value === undefined || value === null || value === '') return '-';
|
||||
const text = typeof value === 'string' ? value : JSON.stringify(value);
|
||||
if (!text) return '-';
|
||||
return text.length > 150 ? `${text.slice(0, 150)}...` : text;
|
||||
}
|
||||
|
||||
function formatCellValue(value: unknown) {
|
||||
if (value === undefined || value === null || value === '') return '-';
|
||||
return String(value);
|
||||
@ -1039,6 +1215,12 @@ function metadataString(metadata: Record<string, unknown> | undefined, key: stri
|
||||
return typeof value === 'string' && value.trim() ? value.trim() : '';
|
||||
}
|
||||
|
||||
function metadataStringList(metadata: Record<string, unknown> | undefined, key: string) {
|
||||
const value = metadata?.[key];
|
||||
if (!Array.isArray(value)) return [];
|
||||
return value.filter((item): item is string => typeof item === 'string' && item.trim() !== '').map((item) => item.trim());
|
||||
}
|
||||
|
||||
function metadataNumber(metadata: Record<string, unknown> | undefined, key: string) {
|
||||
const value = metadata?.[key];
|
||||
if (value === undefined || value === null || value === '') return null;
|
||||
@ -1057,6 +1239,11 @@ function objectString(value: Record<string, unknown>, key: string) {
|
||||
return typeof next === 'string' && next.trim() ? next.trim() : '';
|
||||
}
|
||||
|
||||
function appendUniqueText(values: string[], value: string) {
|
||||
if (!value || values.includes(value)) return;
|
||||
values.push(value);
|
||||
}
|
||||
|
||||
function transactionChargeAmount(transaction: GatewayWalletTransaction) {
|
||||
return metadataNumber(transaction.metadata, 'finalChargeAmount') ?? transaction.amount;
|
||||
}
|
||||
|
||||
@ -261,8 +261,8 @@ strong {
|
||||
}
|
||||
|
||||
.taskRecordTable .shTableRow {
|
||||
grid-template-columns: minmax(190px, 0.9fr) minmax(220px, 1fr) minmax(94px, 0.4fr) minmax(280px, 1.45fr) minmax(104px, 0.42fr) minmax(126px, 0.55fr) minmax(150px, 0.66fr) minmax(154px, 0.62fr) minmax(82px, 0.36fr) minmax(98px, 0.42fr) minmax(150px, 0.66fr) minmax(130px, 0.54fr);
|
||||
min-width: 1778px;
|
||||
grid-template-columns: minmax(190px, 0.9fr) minmax(220px, 1fr) minmax(94px, 0.4fr) minmax(280px, 1.45fr) minmax(104px, 0.42fr) minmax(118px, 0.44fr) minmax(126px, 0.55fr) minmax(150px, 0.66fr) minmax(154px, 0.62fr) minmax(82px, 0.36fr) minmax(98px, 0.42fr) minmax(150px, 0.66fr) minmax(130px, 0.54fr);
|
||||
min-width: 1904px;
|
||||
align-items: start;
|
||||
}
|
||||
|
||||
@ -359,6 +359,11 @@ strong {
|
||||
white-space: normal;
|
||||
}
|
||||
|
||||
.taskRecordParamCell {
|
||||
overflow: visible;
|
||||
white-space: normal;
|
||||
}
|
||||
|
||||
.taskRecordAttemptCount {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
@ -437,6 +442,113 @@ strong {
|
||||
overflow-wrap: anywhere;
|
||||
}
|
||||
|
||||
.taskParamConversionEmpty {
|
||||
color: var(--text-soft);
|
||||
font-size: var(--font-size-xs);
|
||||
}
|
||||
|
||||
.taskParamConversionTrigger {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: 0.35rem;
|
||||
min-height: 1.5rem;
|
||||
padding: 0;
|
||||
border: 0;
|
||||
background: transparent;
|
||||
color: var(--text-strong);
|
||||
cursor: default;
|
||||
font: inherit;
|
||||
font-weight: var(--font-weight-medium);
|
||||
}
|
||||
|
||||
.taskParamConversionTrigger svg {
|
||||
color: var(--primary);
|
||||
}
|
||||
|
||||
.taskParamConversionAntPopover {
|
||||
z-index: 1200;
|
||||
}
|
||||
|
||||
.taskParamConversionPopover {
|
||||
display: grid;
|
||||
width: min(42rem, calc(100vw - 2rem));
|
||||
max-height: min(34rem, calc(100vh - 7rem));
|
||||
overflow-y: auto;
|
||||
gap: 0.65rem;
|
||||
}
|
||||
|
||||
.taskParamConversionPopoverHeader,
|
||||
.taskParamConversionLogHeader,
|
||||
.taskParamConversionChangeTop {
|
||||
display: flex;
|
||||
min-width: 0;
|
||||
align-items: center;
|
||||
justify-content: space-between;
|
||||
gap: 0.5rem;
|
||||
}
|
||||
|
||||
.taskParamConversionPopoverHeader strong,
|
||||
.taskParamConversionLogHeader strong,
|
||||
.taskParamConversionChangeTop strong {
|
||||
min-width: 0;
|
||||
color: var(--text-strong);
|
||||
font-weight: var(--font-weight-semibold);
|
||||
overflow-wrap: anywhere;
|
||||
}
|
||||
|
||||
.taskParamConversionPopoverHeader small,
|
||||
.taskParamConversionLog small,
|
||||
.taskParamConversionChange small {
|
||||
color: var(--text-soft);
|
||||
font-size: var(--font-size-xs);
|
||||
line-height: 1.4;
|
||||
}
|
||||
|
||||
.taskParamConversionState {
|
||||
color: var(--text-soft);
|
||||
font-size: var(--font-size-sm);
|
||||
}
|
||||
|
||||
.taskParamConversionState.error {
|
||||
color: var(--destructive);
|
||||
}
|
||||
|
||||
.taskParamConversionLog {
|
||||
display: grid;
|
||||
min-width: 0;
|
||||
gap: 0.5rem;
|
||||
padding-top: 0.65rem;
|
||||
border-top: 1px solid var(--border);
|
||||
}
|
||||
|
||||
.taskParamConversionChange {
|
||||
display: grid;
|
||||
gap: 0.28rem;
|
||||
padding: 0.5rem;
|
||||
border: 1px solid var(--border-subtle);
|
||||
border-radius: var(--radius-sm);
|
||||
background: var(--surface-subtle);
|
||||
color: var(--text-normal);
|
||||
font-size: var(--font-size-xs);
|
||||
line-height: 1.45;
|
||||
overflow-wrap: anywhere;
|
||||
}
|
||||
|
||||
.taskParamConversionChange code,
|
||||
.taskParamConversionSummaryPaths code {
|
||||
color: var(--text-soft);
|
||||
font-family: var(--font-mono);
|
||||
font-size: var(--font-size-xs);
|
||||
overflow-wrap: anywhere;
|
||||
white-space: normal;
|
||||
}
|
||||
|
||||
.taskParamConversionSummaryPaths {
|
||||
display: flex;
|
||||
flex-wrap: wrap;
|
||||
gap: 0.35rem;
|
||||
}
|
||||
|
||||
.taskRecordJsonButton {
|
||||
width: 100%;
|
||||
justify-content: flex-start;
|
||||
|
||||
@ -829,6 +829,24 @@ export interface GatewayTask {
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
export interface GatewayTaskParamPreprocessingLog {
|
||||
id: string;
|
||||
taskId: string;
|
||||
attemptId?: string;
|
||||
attemptNo?: number;
|
||||
modelType?: string;
|
||||
platformId?: string;
|
||||
platformModelId?: string;
|
||||
clientId?: string;
|
||||
changed: boolean;
|
||||
changeCount: number;
|
||||
actualInput?: Record<string, unknown>;
|
||||
convertedOutput?: Record<string, unknown>;
|
||||
changes?: Array<Record<string, unknown>>;
|
||||
model?: Record<string, unknown>;
|
||||
createdAt: string;
|
||||
}
|
||||
|
||||
export interface GatewayTaskAttempt {
|
||||
id: string;
|
||||
taskId: string;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user