feat: record task attempt chains
This commit is contained in:
parent
4c2de4b4c9
commit
0049b246c1
@ -722,6 +722,31 @@ WHERE reference_type = 'gateway_task'
|
||||
if failoverTask.Task.Status != "succeeded" {
|
||||
t.Fatalf("failover task should succeed through second client: %+v", failoverTask.Task)
|
||||
}
|
||||
var failoverDetail struct {
|
||||
Attempts []struct {
|
||||
AttemptNo int `json:"attemptNo"`
|
||||
PlatformName string `json:"platformName"`
|
||||
Status string `json:"status"`
|
||||
Retryable bool `json:"retryable"`
|
||||
ErrorCode string `json:"errorCode"`
|
||||
ErrorMessage string `json:"errorMessage"`
|
||||
ResponseMS int64 `json:"responseDurationMs"`
|
||||
} `json:"attempts"`
|
||||
Metrics map[string]any `json:"metrics"`
|
||||
}
|
||||
doJSON(t, server.URL, http.MethodGet, "/api/v1/tasks/"+failoverTask.Task.ID, apiKeyResponse.Secret, nil, http.StatusOK, &failoverDetail)
|
||||
if len(failoverDetail.Attempts) != 2 {
|
||||
t.Fatalf("failover task history should include two attempts, got %+v", failoverDetail.Attempts)
|
||||
}
|
||||
if failoverDetail.Attempts[0].PlatformName != "OpenAI Retryable Failure" || failoverDetail.Attempts[0].Status != "failed" || !failoverDetail.Attempts[0].Retryable || failoverDetail.Attempts[0].ErrorCode == "" {
|
||||
t.Fatalf("first failover attempt should preserve failed platform and reason: %+v", failoverDetail.Attempts[0])
|
||||
}
|
||||
if failoverDetail.Attempts[1].PlatformName != "OpenAI Retry Success" || failoverDetail.Attempts[1].Status != "succeeded" || failoverDetail.Attempts[1].ResponseMS <= 0 {
|
||||
t.Fatalf("second failover attempt should preserve successful platform: %+v", failoverDetail.Attempts[1])
|
||||
}
|
||||
if summary, ok := failoverDetail.Metrics["attempts"].([]any); !ok || len(summary) != 2 {
|
||||
t.Fatalf("task metrics should keep attempt-chain summary, got %+v", failoverDetail.Metrics)
|
||||
}
|
||||
|
||||
var degradePolicySet struct {
|
||||
ID string `json:"id"`
|
||||
|
||||
@ -40,7 +40,8 @@ func buildSuccessRecord(task store.GatewayTask, user *auth.User, body map[string
|
||||
}
|
||||
|
||||
func taskMetrics(task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, response clients.Response, simulated bool) map[string]any {
|
||||
metrics := map[string]any{
|
||||
metrics := attemptMetrics(candidate, 0, simulated)
|
||||
for key, value := range map[string]any{
|
||||
"kind": task.Kind,
|
||||
"runMode": task.RunMode,
|
||||
"requestedModel": task.Model,
|
||||
@ -57,6 +58,8 @@ func taskMetrics(task store.GatewayTask, user *auth.User, body map[string]any, c
|
||||
"queueKey": candidate.QueueKey,
|
||||
"requestId": response.RequestID,
|
||||
"simulated": simulated,
|
||||
} {
|
||||
metrics[key] = value
|
||||
}
|
||||
if user != nil {
|
||||
metrics["apiKeyId"] = user.APIKeyID
|
||||
@ -94,6 +97,29 @@ func taskMetrics(task store.GatewayTask, user *auth.User, body map[string]any, c
|
||||
return metrics
|
||||
}
|
||||
|
||||
func attemptMetrics(candidate store.RuntimeModelCandidate, attemptNo int, simulated bool) map[string]any {
|
||||
metrics := map[string]any{
|
||||
"resolvedModel": candidate.ModelName,
|
||||
"modelName": candidate.ModelName,
|
||||
"modelAlias": candidate.ModelAlias,
|
||||
"providerModel": candidate.ProviderModelName,
|
||||
"canonicalModel": candidate.CanonicalModelKey,
|
||||
"modelType": candidate.ModelType,
|
||||
"provider": candidate.Provider,
|
||||
"platformId": candidate.PlatformID,
|
||||
"platformKey": candidate.PlatformKey,
|
||||
"platformName": candidate.PlatformName,
|
||||
"platformModelId": candidate.PlatformModelID,
|
||||
"clientId": candidate.ClientID,
|
||||
"queueKey": candidate.QueueKey,
|
||||
"simulated": simulated,
|
||||
}
|
||||
if attemptNo > 0 {
|
||||
metrics["attempt"] = attemptNo
|
||||
}
|
||||
return metrics
|
||||
}
|
||||
|
||||
func usageToMap(usage clients.Usage) map[string]any {
|
||||
out := map[string]any{}
|
||||
if usage.InputTokens > 0 {
|
||||
@ -172,6 +198,46 @@ func failureMetrics(err error, simulated bool) (string, map[string]any, time.Tim
|
||||
return meta.RequestID, metrics, meta.ResponseStartedAt, meta.ResponseFinishedAt, meta.ResponseDurationMS
|
||||
}
|
||||
|
||||
func mergeMetrics(values ...map[string]any) map[string]any {
|
||||
out := map[string]any{}
|
||||
for _, value := range values {
|
||||
for key, item := range value {
|
||||
out[key] = item
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func summarizeAttempts(attempts []store.TaskAttempt) []map[string]any {
|
||||
items := make([]map[string]any, 0, len(attempts))
|
||||
for _, attempt := range attempts {
|
||||
item := map[string]any{
|
||||
"attempt": attempt.AttemptNo,
|
||||
"status": attempt.Status,
|
||||
"platformId": attempt.PlatformID,
|
||||
"platformName": attempt.PlatformName,
|
||||
"provider": attempt.Provider,
|
||||
"platformModelId": attempt.PlatformModelID,
|
||||
"modelName": attempt.ModelName,
|
||||
"providerModelName": attempt.ProviderModelName,
|
||||
"modelAlias": attempt.ModelAlias,
|
||||
"modelType": attempt.ModelType,
|
||||
"clientId": attempt.ClientID,
|
||||
"queueKey": attempt.QueueKey,
|
||||
"requestId": attempt.RequestID,
|
||||
"retryable": attempt.Retryable,
|
||||
"simulated": attempt.Simulated,
|
||||
"errorCode": attempt.ErrorCode,
|
||||
"errorMessage": attempt.ErrorMessage,
|
||||
"responseDurationMs": attempt.ResponseDurationMS,
|
||||
"startedAt": attempt.StartedAt,
|
||||
"finishedAt": attempt.FinishedAt,
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
return items
|
||||
}
|
||||
|
||||
func messageCount(body map[string]any) int {
|
||||
messages, _ := body["messages"].([]any)
|
||||
return len(messages)
|
||||
|
||||
@ -85,6 +85,7 @@ func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *aut
|
||||
if err == nil {
|
||||
billings := s.billings(ctx, user, task.Kind, body, candidate, response, isSimulation(task, candidate))
|
||||
record := buildSuccessRecord(task, user, body, candidate, response, billings, isSimulation(task, candidate))
|
||||
record.Metrics = s.withAttemptHistory(ctx, task.ID, record.Metrics)
|
||||
finished, finishErr := s.store.FinishTaskSuccess(ctx, store.FinishTaskSuccessInput{
|
||||
TaskID: task.ID,
|
||||
Result: response.Result,
|
||||
@ -161,6 +162,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
Status: "running",
|
||||
Simulated: simulated,
|
||||
RequestSnapshot: body,
|
||||
Metrics: attemptMetrics(candidate, attemptNo, simulated),
|
||||
})
|
||||
if err != nil {
|
||||
return clients.Response{}, err
|
||||
@ -172,7 +174,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
AttemptID: attemptID,
|
||||
Status: "failed",
|
||||
Retryable: false,
|
||||
Metrics: map[string]any{"error": err.Error(), "candidateModel": candidate.ModelName, "clientId": candidate.ClientID},
|
||||
Metrics: mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), map[string]any{"error": err.Error(), "retryable": false}),
|
||||
ErrorCode: "rate_limit",
|
||||
ErrorMessage: err.Error(),
|
||||
})
|
||||
@ -224,6 +226,7 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
responseDurationMS = 0
|
||||
}
|
||||
}
|
||||
metrics = mergeMetrics(attemptMetrics(candidate, attemptNo, simulated), metrics)
|
||||
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
|
||||
AttemptID: attemptID,
|
||||
Status: "failed",
|
||||
@ -242,9 +245,10 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
|
||||
}
|
||||
uploadedResult, err := s.uploadGeneratedAssets(ctx, response.Result)
|
||||
if err != nil {
|
||||
metrics := taskMetrics(task, user, body, candidate, response, simulated)
|
||||
metrics["error"] = err.Error()
|
||||
metrics["retryable"] = clients.IsRetryable(err)
|
||||
metrics := mergeMetrics(taskMetrics(task, user, body, candidate, response, simulated), map[string]any{
|
||||
"error": err.Error(),
|
||||
"retryable": clients.IsRetryable(err),
|
||||
})
|
||||
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
|
||||
AttemptID: attemptID,
|
||||
Status: "failed",
|
||||
@ -299,6 +303,7 @@ func (s *Service) clientFor(candidate store.RuntimeModelCandidate, simulated boo
|
||||
|
||||
func (s *Service) failTask(ctx context.Context, taskID string, code string, message string, simulated bool, cause error) (store.GatewayTask, error) {
|
||||
requestID, metrics, responseStartedAt, responseFinishedAt, responseDurationMS := failureMetrics(cause, simulated)
|
||||
metrics = s.withAttemptHistory(ctx, taskID, metrics)
|
||||
failed, err := s.store.FinishTaskFailure(ctx, store.FinishTaskFailureInput{
|
||||
TaskID: taskID,
|
||||
Code: code,
|
||||
@ -318,6 +323,21 @@ func (s *Service) failTask(ctx context.Context, taskID string, code string, mess
|
||||
return failed, nil
|
||||
}
|
||||
|
||||
func (s *Service) withAttemptHistory(ctx context.Context, taskID string, metrics map[string]any) map[string]any {
|
||||
attempts, err := s.store.ListTaskAttempts(ctx, taskID)
|
||||
if err != nil {
|
||||
s.logger.Warn("list task attempts for metrics failed", "taskID", taskID, "error", err)
|
||||
return metrics
|
||||
}
|
||||
if len(attempts) == 0 {
|
||||
return metrics
|
||||
}
|
||||
metrics = mergeMetrics(metrics)
|
||||
metrics["attemptCount"] = len(attempts)
|
||||
metrics["attempts"] = summarizeAttempts(attempts)
|
||||
return metrics
|
||||
}
|
||||
|
||||
func (s *Service) emit(ctx context.Context, taskID string, eventType string, status string, phase string, progress float64, message string, payload map[string]any, simulated bool) error {
|
||||
event, err := s.store.AddTaskEvent(ctx, taskID, eventType, status, phase, progress, message, payload, simulated)
|
||||
if err != nil {
|
||||
|
||||
@ -393,6 +393,7 @@ type GatewayTask struct {
|
||||
Error string `json:"error,omitempty"`
|
||||
ErrorCode string `json:"errorCode,omitempty"`
|
||||
ErrorMessage string `json:"errorMessage,omitempty"`
|
||||
Attempts []TaskAttempt `json:"attempts,omitempty"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
}
|
||||
@ -424,6 +425,37 @@ type TaskEvent struct {
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
}
|
||||
|
||||
type TaskAttempt struct {
|
||||
ID string `json:"id"`
|
||||
TaskID string `json:"taskId"`
|
||||
AttemptNo int `json:"attemptNo"`
|
||||
PlatformID string `json:"platformId,omitempty"`
|
||||
PlatformName string `json:"platformName,omitempty"`
|
||||
Provider string `json:"provider,omitempty"`
|
||||
PlatformModelID string `json:"platformModelId,omitempty"`
|
||||
ModelName string `json:"modelName,omitempty"`
|
||||
ProviderModelName string `json:"providerModelName,omitempty"`
|
||||
ModelAlias string `json:"modelAlias,omitempty"`
|
||||
ModelType string `json:"modelType,omitempty"`
|
||||
ClientID string `json:"clientId,omitempty"`
|
||||
QueueKey string `json:"queueKey"`
|
||||
Status string `json:"status"`
|
||||
Retryable bool `json:"retryable"`
|
||||
Simulated bool `json:"simulated"`
|
||||
RequestID string `json:"requestId,omitempty"`
|
||||
Usage map[string]any `json:"usage,omitempty"`
|
||||
Metrics map[string]any `json:"metrics,omitempty"`
|
||||
RequestSnapshot map[string]any `json:"requestSnapshot,omitempty"`
|
||||
ResponseSnapshot map[string]any `json:"responseSnapshot,omitempty"`
|
||||
ResponseStartedAt string `json:"responseStartedAt,omitempty"`
|
||||
ResponseFinishedAt string `json:"responseFinishedAt,omitempty"`
|
||||
ResponseDurationMS int64 `json:"responseDurationMs"`
|
||||
ErrorCode string `json:"errorCode,omitempty"`
|
||||
ErrorMessage string `json:"errorMessage,omitempty"`
|
||||
StartedAt time.Time `json:"startedAt"`
|
||||
FinishedAt string `json:"finishedAt,omitempty"`
|
||||
}
|
||||
|
||||
func (s *Store) ListPlatforms(ctx context.Context) ([]Platform, error) {
|
||||
rows, err := s.pool.Query(ctx, `
|
||||
SELECT id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority,
|
||||
@ -1623,6 +1655,11 @@ SELECT `+gatewayTaskColumns+`
|
||||
if err != nil {
|
||||
return GatewayTask{}, err
|
||||
}
|
||||
attempts, err := s.ListTaskAttempts(ctx, task.ID)
|
||||
if err != nil {
|
||||
return GatewayTask{}, err
|
||||
}
|
||||
task.Attempts = attempts
|
||||
return task, nil
|
||||
}
|
||||
|
||||
|
||||
@ -104,6 +104,7 @@ type CreateTaskAttemptInput struct {
|
||||
Status string
|
||||
Simulated bool
|
||||
RequestSnapshot map[string]any
|
||||
Metrics map[string]any
|
||||
}
|
||||
|
||||
type FinishTaskAttemptInput struct {
|
||||
|
||||
@ -132,6 +132,10 @@ LIMIT $8 OFFSET $9`, queryArgs...)
|
||||
if err := rows.Err(); err != nil {
|
||||
return TaskListResult{}, err
|
||||
}
|
||||
items, err = s.attachTaskAttempts(ctx, items)
|
||||
if err != nil {
|
||||
return TaskListResult{}, err
|
||||
}
|
||||
return TaskListResult{
|
||||
Items: items,
|
||||
Total: total,
|
||||
@ -163,6 +167,7 @@ WHERE id = $1::uuid`, taskID, modelType, string(normalizedJSON))
|
||||
|
||||
func (s *Store) CreateTaskAttempt(ctx context.Context, input CreateTaskAttemptInput) (string, error) {
|
||||
requestJSON, _ := json.Marshal(emptyObjectIfNil(input.RequestSnapshot))
|
||||
metricsJSON, _ := json.Marshal(emptyObjectIfNil(input.Metrics))
|
||||
tx, err := s.pool.Begin(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -173,11 +178,11 @@ func (s *Store) CreateTaskAttempt(ctx context.Context, input CreateTaskAttemptIn
|
||||
err = tx.QueryRow(ctx, `
|
||||
INSERT INTO gateway_task_attempts (
|
||||
task_id, attempt_no, platform_id, platform_model_id, client_id, queue_key,
|
||||
status, simulated, request_snapshot
|
||||
status, simulated, request_snapshot, metrics
|
||||
)
|
||||
VALUES (
|
||||
$1::uuid, $2, NULLIF($3, '')::uuid, NULLIF($4, '')::uuid, NULLIF($5, ''), $6,
|
||||
$7, $8, $9::jsonb
|
||||
$7, $8, $9::jsonb, $10::jsonb
|
||||
)
|
||||
RETURNING id::text`,
|
||||
input.TaskID,
|
||||
@ -189,6 +194,7 @@ RETURNING id::text`,
|
||||
firstNonEmpty(input.Status, "running"),
|
||||
input.Simulated,
|
||||
string(requestJSON),
|
||||
string(metricsJSON),
|
||||
).Scan(&attemptID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@ -202,6 +208,136 @@ WHERE id = $1::uuid`, input.TaskID, input.AttemptNo); err != nil {
|
||||
return attemptID, tx.Commit(ctx)
|
||||
}
|
||||
|
||||
func (s *Store) attachTaskAttempts(ctx context.Context, items []GatewayTask) ([]GatewayTask, error) {
|
||||
if len(items) == 0 {
|
||||
return items, nil
|
||||
}
|
||||
taskIDs := make([]string, 0, len(items))
|
||||
for _, item := range items {
|
||||
taskIDs = append(taskIDs, item.ID)
|
||||
}
|
||||
attemptsByTaskID, err := s.listTaskAttemptsByTaskIDs(ctx, taskIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for index := range items {
|
||||
items[index].Attempts = attemptsByTaskID[items[index].ID]
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (s *Store) ListTaskAttempts(ctx context.Context, taskID string) ([]TaskAttempt, error) {
|
||||
attemptsByTaskID, err := s.listTaskAttemptsByTaskIDs(ctx, []string{taskID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return attemptsByTaskID[taskID], nil
|
||||
}
|
||||
|
||||
func (s *Store) listTaskAttemptsByTaskIDs(ctx context.Context, taskIDs []string) (map[string][]TaskAttempt, error) {
|
||||
itemsByTaskID := map[string][]TaskAttempt{}
|
||||
if len(taskIDs) == 0 {
|
||||
return itemsByTaskID, nil
|
||||
}
|
||||
rows, err := s.pool.Query(ctx, `
|
||||
SELECT a.id::text, a.task_id::text, a.attempt_no,
|
||||
COALESCE(a.platform_id::text, ''), COALESCE(p.name, ''), COALESCE(p.provider, ''),
|
||||
COALESCE(a.platform_model_id::text, ''), COALESCE(pm.model_name, ''),
|
||||
COALESCE(NULLIF(pm.provider_model_name, ''), pm.model_name, ''),
|
||||
COALESCE(pm.model_alias, ''),
|
||||
COALESCE(a.client_id, ''), a.queue_key, a.status, a.retryable, a.simulated,
|
||||
COALESCE(a.request_id, ''), COALESCE(a.usage, '{}'::jsonb), COALESCE(a.metrics, '{}'::jsonb),
|
||||
a.request_snapshot, COALESCE(a.response_snapshot, '{}'::jsonb),
|
||||
COALESCE(a.response_started_at::text, ''), COALESCE(a.response_finished_at::text, ''),
|
||||
COALESCE(a.response_duration_ms, 0), COALESCE(a.error_code, ''), COALESCE(a.error_message, ''),
|
||||
a.started_at, COALESCE(a.finished_at::text, '')
|
||||
FROM gateway_task_attempts a
|
||||
LEFT JOIN integration_platforms p ON p.id = a.platform_id
|
||||
LEFT JOIN platform_models pm ON pm.id = a.platform_model_id
|
||||
WHERE a.task_id::text = ANY($1)
|
||||
ORDER BY a.task_id, a.attempt_no`, taskIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
item, err := scanTaskAttempt(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
itemsByTaskID[item.TaskID] = append(itemsByTaskID[item.TaskID], item)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return itemsByTaskID, nil
|
||||
}
|
||||
|
||||
func scanTaskAttempt(scanner taskScanner) (TaskAttempt, error) {
|
||||
var item TaskAttempt
|
||||
var usageBytes []byte
|
||||
var metricsBytes []byte
|
||||
var requestBytes []byte
|
||||
var responseBytes []byte
|
||||
if err := scanner.Scan(
|
||||
&item.ID,
|
||||
&item.TaskID,
|
||||
&item.AttemptNo,
|
||||
&item.PlatformID,
|
||||
&item.PlatformName,
|
||||
&item.Provider,
|
||||
&item.PlatformModelID,
|
||||
&item.ModelName,
|
||||
&item.ProviderModelName,
|
||||
&item.ModelAlias,
|
||||
&item.ClientID,
|
||||
&item.QueueKey,
|
||||
&item.Status,
|
||||
&item.Retryable,
|
||||
&item.Simulated,
|
||||
&item.RequestID,
|
||||
&usageBytes,
|
||||
&metricsBytes,
|
||||
&requestBytes,
|
||||
&responseBytes,
|
||||
&item.ResponseStartedAt,
|
||||
&item.ResponseFinishedAt,
|
||||
&item.ResponseDurationMS,
|
||||
&item.ErrorCode,
|
||||
&item.ErrorMessage,
|
||||
&item.StartedAt,
|
||||
&item.FinishedAt,
|
||||
); err != nil {
|
||||
return TaskAttempt{}, err
|
||||
}
|
||||
item.Usage = decodeObject(usageBytes)
|
||||
item.Metrics = decodeObject(metricsBytes)
|
||||
item.RequestSnapshot = decodeObject(requestBytes)
|
||||
item.ResponseSnapshot = decodeObject(responseBytes)
|
||||
enrichTaskAttemptFromMetrics(&item)
|
||||
return item, nil
|
||||
}
|
||||
|
||||
func enrichTaskAttemptFromMetrics(item *TaskAttempt) {
|
||||
if item == nil || len(item.Metrics) == 0 {
|
||||
return
|
||||
}
|
||||
item.PlatformID = firstNonEmpty(item.PlatformID, taskAttemptMetricString(item.Metrics, "platformId"))
|
||||
item.PlatformName = firstNonEmpty(item.PlatformName, taskAttemptMetricString(item.Metrics, "platformName"))
|
||||
item.Provider = firstNonEmpty(item.Provider, taskAttemptMetricString(item.Metrics, "provider"))
|
||||
item.PlatformModelID = firstNonEmpty(item.PlatformModelID, taskAttemptMetricString(item.Metrics, "platformModelId"))
|
||||
item.ModelName = firstNonEmpty(item.ModelName, taskAttemptMetricString(item.Metrics, "resolvedModel"), taskAttemptMetricString(item.Metrics, "modelName"))
|
||||
item.ProviderModelName = firstNonEmpty(item.ProviderModelName, taskAttemptMetricString(item.Metrics, "providerModel"))
|
||||
item.ModelAlias = firstNonEmpty(item.ModelAlias, taskAttemptMetricString(item.Metrics, "modelAlias"))
|
||||
item.ModelType = firstNonEmpty(item.ModelType, taskAttemptMetricString(item.Metrics, "modelType"))
|
||||
item.ClientID = firstNonEmpty(item.ClientID, taskAttemptMetricString(item.Metrics, "clientId"))
|
||||
}
|
||||
|
||||
func taskAttemptMetricString(metrics map[string]any, key string) string {
|
||||
value, _ := metrics[key].(string)
|
||||
return strings.TrimSpace(value)
|
||||
}
|
||||
|
||||
func (s *Store) FinishTaskAttempt(ctx context.Context, input FinishTaskAttemptInput) error {
|
||||
responseJSON, _ := json.Marshal(emptyObjectIfNil(input.ResponseSnapshot))
|
||||
usageJSON, _ := json.Marshal(emptyObjectIfNil(input.Usage))
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import { useEffect, useMemo, useState, type FormEvent, type ReactNode } from 'react';
|
||||
import { Popover as AntPopover } from 'antd';
|
||||
import { ChevronLeft, ChevronRight, Copy, CreditCard, Eye, KeyRound, ListChecks, Plus, RotateCcw, Search, ShieldCheck, Trash2, UserRound } from 'lucide-react';
|
||||
import type { GatewayAccessRuleBatchRequest, GatewayApiKey, GatewayTask, IntegrationPlatform, PlatformModel } from '@easyai-ai-gateway/contracts';
|
||||
import type { ConsoleData } from '../app-state';
|
||||
@ -433,6 +434,7 @@ function TaskPanel(props: {
|
||||
<TableHead>RequestID</TableHead>
|
||||
<TableHead>状态</TableHead>
|
||||
<TableHead>模型</TableHead>
|
||||
<TableHead>尝试链路</TableHead>
|
||||
<TableHead>类型</TableHead>
|
||||
<TableHead>API Key</TableHead>
|
||||
<TableHead>Token</TableHead>
|
||||
@ -556,6 +558,9 @@ function TaskRecord(props: { task: GatewayTask; onCopyRequestId: (task: GatewayT
|
||||
{props.task.requestedModel && props.task.requestedModel !== resolvedModel && <small>{props.task.requestedModel}</small>}
|
||||
</span>
|
||||
</TableCell>
|
||||
<TableCell className="taskRecordAttemptCell">
|
||||
<TaskAttemptChain task={props.task} />
|
||||
</TableCell>
|
||||
<TableCell>{props.task.modelType || '-'}</TableCell>
|
||||
<TableCell>{props.task.apiKeyName || props.task.apiKeyPrefix || props.task.apiKeyId || '-'}</TableCell>
|
||||
<TableCell className="taskRecordTokenCell">{tokenUsage}</TableCell>
|
||||
@ -572,11 +577,103 @@ function TaskRecord(props: { task: GatewayTask; onCopyRequestId: (task: GatewayT
|
||||
);
|
||||
}
|
||||
|
||||
function TaskAttemptChain(props: { task: GatewayTask }) {
|
||||
const attempts = props.task.attempts ?? [];
|
||||
if (!attempts.length) return <span>-</span>;
|
||||
|
||||
return (
|
||||
<AntPopover
|
||||
align={{ offset: [0, 8] }}
|
||||
content={<TaskAttemptPopoverContent task={props.task} />}
|
||||
overlayClassName="taskRecordAttemptAntPopover"
|
||||
placement="bottomLeft"
|
||||
trigger={['hover', 'focus']}
|
||||
>
|
||||
<button className="taskRecordAttemptCount" type="button" aria-label={attempts.map(taskAttemptTitle).join('\n')}>
|
||||
{attempts.length} 次尝试
|
||||
</button>
|
||||
</AntPopover>
|
||||
);
|
||||
}
|
||||
|
||||
function TaskAttemptPopoverContent(props: { task: GatewayTask }) {
|
||||
const attempts = props.task.attempts ?? [];
|
||||
return (
|
||||
<span className="taskRecordAttemptPopover" role="tooltip">
|
||||
{attempts.map((attempt) => (
|
||||
<span
|
||||
key={attempt.id || `${props.task.id}-${attempt.attemptNo}`}
|
||||
className={`taskRecordAttemptDetail ${attempt.status === 'failed' ? 'failed' : attempt.status === 'succeeded' ? 'succeeded' : ''}`}
|
||||
>
|
||||
<span className="taskRecordAttemptDetailHeader">
|
||||
<strong>#{attempt.attemptNo} {taskAttemptTarget(attempt)}</strong>
|
||||
<Badge variant={attempt.status === 'succeeded' ? 'success' : attempt.status === 'failed' ? 'destructive' : 'secondary'}>{taskAttemptStatusText(attempt.status)}</Badge>
|
||||
</span>
|
||||
<small>{taskAttemptMeta(attempt)}</small>
|
||||
{attempt.status === 'failed' && <span className="taskRecordAttemptError">{taskAttemptFailureReason(attempt)}</span>}
|
||||
</span>
|
||||
))}
|
||||
</span>
|
||||
);
|
||||
}
|
||||
|
||||
function taskAttemptTitle(attempt: NonNullable<GatewayTask['attempts']>[number]) {
|
||||
const parts = [
|
||||
`#${attempt.attemptNo}`,
|
||||
attempt.platformName || attempt.provider || attempt.clientId || '',
|
||||
attempt.status,
|
||||
attempt.errorMessage || attempt.errorCode || metadataString(attempt.metrics, 'error') || '',
|
||||
].filter(Boolean);
|
||||
return parts.join(' · ');
|
||||
}
|
||||
|
||||
function taskAttemptTarget(attempt: NonNullable<GatewayTask['attempts']>[number]) {
|
||||
return attempt.platformName || attempt.provider || attempt.clientId || `尝试 ${attempt.attemptNo}`;
|
||||
}
|
||||
|
||||
function taskAttemptStatusText(status: string) {
|
||||
if (status === 'succeeded') return '成功';
|
||||
if (status === 'failed') return '失败';
|
||||
if (status === 'running') return '运行中';
|
||||
return status || '-';
|
||||
}
|
||||
|
||||
function taskAttemptMeta(attempt: NonNullable<GatewayTask['attempts']>[number]) {
|
||||
const values = [
|
||||
attempt.providerModelName || attempt.modelName || attempt.modelAlias,
|
||||
attempt.requestId ? `RequestID ${attempt.requestId}` : '',
|
||||
attempt.responseDurationMs ? formatDuration(attempt.responseDurationMs) : '',
|
||||
].filter(Boolean);
|
||||
return values.join(' · ') || attempt.clientId || '-';
|
||||
}
|
||||
|
||||
function taskAttemptFailureReason(attempt: NonNullable<GatewayTask['attempts']>[number]) {
|
||||
const detail = firstText(
|
||||
attempt.errorMessage,
|
||||
metadataString(attempt.metrics, 'error'),
|
||||
metadataString(attempt.metrics, 'message'),
|
||||
);
|
||||
const code = firstText(attempt.errorCode, metadataString(attempt.metrics, 'errorCode'));
|
||||
if (detail && code && detail !== code) return `${detail}(${code})`;
|
||||
return detail || code || '失败';
|
||||
}
|
||||
function formatCellValue(value: unknown) {
|
||||
if (value === undefined || value === null || value === '') return '-';
|
||||
return String(value);
|
||||
}
|
||||
|
||||
function firstText(...values: Array<unknown>) {
|
||||
for (const value of values) {
|
||||
if (typeof value === 'string' && value.trim()) return value.trim();
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
||||
function metadataString(metadata: Record<string, unknown> | undefined, key: string) {
|
||||
const value = metadata?.[key];
|
||||
return typeof value === 'string' && value.trim() ? value.trim() : '';
|
||||
}
|
||||
|
||||
function formatTokenUsage(usage: Record<string, unknown>) {
|
||||
const input = tokenValue(usage.inputTokens ?? usage.promptTokens ?? usage.input_tokens ?? usage.prompt_tokens);
|
||||
const output = tokenValue(usage.outputTokens ?? usage.completionTokens ?? usage.output_tokens ?? usage.completion_tokens);
|
||||
|
||||
@ -261,8 +261,8 @@ strong {
|
||||
}
|
||||
|
||||
.taskRecordTable .shTableRow {
|
||||
grid-template-columns: minmax(190px, 0.95fr) minmax(220px, 1.05fr) minmax(94px, 0.42fr) minmax(280px, 1.55fr) minmax(126px, 0.58fr) minmax(150px, 0.7fr) minmax(154px, 0.66fr) minmax(82px, 0.38fr) minmax(98px, 0.45fr) minmax(150px, 0.7fr) minmax(130px, 0.58fr);
|
||||
min-width: 1674px;
|
||||
grid-template-columns: minmax(190px, 0.9fr) minmax(220px, 1fr) minmax(94px, 0.4fr) minmax(280px, 1.45fr) minmax(104px, 0.42fr) minmax(126px, 0.55fr) minmax(150px, 0.66fr) minmax(154px, 0.62fr) minmax(82px, 0.36fr) minmax(98px, 0.42fr) minmax(150px, 0.66fr) minmax(130px, 0.54fr);
|
||||
min-width: 1778px;
|
||||
align-items: start;
|
||||
}
|
||||
|
||||
@ -354,6 +354,75 @@ strong {
|
||||
word-break: break-word;
|
||||
}
|
||||
|
||||
.taskRecordAttemptCell {
|
||||
overflow: visible;
|
||||
white-space: normal;
|
||||
}
|
||||
|
||||
.taskRecordAttemptCount {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
justify-content: flex-start;
|
||||
min-height: 1.5rem;
|
||||
padding: 0;
|
||||
border: 0;
|
||||
background: transparent;
|
||||
color: var(--text-strong);
|
||||
cursor: default;
|
||||
font: inherit;
|
||||
font-weight: var(--font-weight-medium);
|
||||
}
|
||||
|
||||
.taskRecordAttemptAntPopover {
|
||||
z-index: 1200;
|
||||
}
|
||||
|
||||
.taskRecordAttemptPopover {
|
||||
display: grid;
|
||||
width: min(34rem, calc(100vw - 2rem));
|
||||
gap: 0.6rem;
|
||||
}
|
||||
|
||||
.taskRecordAttemptDetail {
|
||||
display: grid;
|
||||
min-width: 0;
|
||||
gap: 0.35rem;
|
||||
padding-bottom: 0.6rem;
|
||||
border-bottom: 1px solid var(--border);
|
||||
}
|
||||
|
||||
.taskRecordAttemptDetail:last-child {
|
||||
padding-bottom: 0;
|
||||
border-bottom: 0;
|
||||
}
|
||||
|
||||
.taskRecordAttemptDetailHeader {
|
||||
display: flex;
|
||||
min-width: 0;
|
||||
align-items: center;
|
||||
justify-content: space-between;
|
||||
gap: 0.5rem;
|
||||
}
|
||||
|
||||
.taskRecordAttemptDetailHeader strong {
|
||||
min-width: 0;
|
||||
color: var(--text-strong);
|
||||
font-weight: var(--font-weight-semibold);
|
||||
overflow-wrap: anywhere;
|
||||
}
|
||||
|
||||
.taskRecordAttemptDetail small {
|
||||
color: var(--text-soft);
|
||||
font-size: var(--font-size-xs);
|
||||
line-height: 1.4;
|
||||
}
|
||||
|
||||
.taskRecordAttemptError {
|
||||
color: var(--destructive);
|
||||
font-size: var(--font-size-xs);
|
||||
line-height: 1.45;
|
||||
overflow-wrap: anywhere;
|
||||
}
|
||||
.taskRecordJsonButton {
|
||||
width: 100%;
|
||||
justify-content: flex-start;
|
||||
|
||||
@ -715,10 +715,42 @@ export interface GatewayTask {
|
||||
error?: string;
|
||||
errorCode?: string;
|
||||
errorMessage?: string;
|
||||
attempts?: GatewayTaskAttempt[];
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
export interface GatewayTaskAttempt {
|
||||
id: string;
|
||||
taskId: string;
|
||||
attemptNo: number;
|
||||
platformId?: string;
|
||||
platformName?: string;
|
||||
provider?: string;
|
||||
platformModelId?: string;
|
||||
modelName?: string;
|
||||
providerModelName?: string;
|
||||
modelAlias?: string;
|
||||
modelType?: string;
|
||||
clientId?: string;
|
||||
queueKey: string;
|
||||
status: 'running' | 'succeeded' | 'failed' | string;
|
||||
retryable: boolean;
|
||||
simulated: boolean;
|
||||
requestId?: string;
|
||||
usage?: Record<string, unknown>;
|
||||
metrics?: Record<string, unknown>;
|
||||
requestSnapshot?: Record<string, unknown>;
|
||||
responseSnapshot?: Record<string, unknown>;
|
||||
responseStartedAt?: string;
|
||||
responseFinishedAt?: string;
|
||||
responseDurationMs?: number;
|
||||
errorCode?: string;
|
||||
errorMessage?: string;
|
||||
startedAt: string;
|
||||
finishedAt?: string;
|
||||
}
|
||||
|
||||
export interface GatewayTaskEvent {
|
||||
id: string;
|
||||
taskId: string;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user