From 53f8edfb6712d4d2972a7b06e18e9c26c4440b71 Mon Sep 17 00:00:00 2001 From: wangbo Date: Sun, 10 May 2026 22:33:58 +0800 Subject: [PATCH] feat: enrich task record details --- apps/api/internal/auth/auth.go | 30 + apps/api/internal/clients/clients_test.go | 58 ++ apps/api/internal/clients/gemini.go | 23 +- apps/api/internal/clients/helpers.go | 169 ++++++ apps/api/internal/clients/openai.go | 33 +- apps/api/internal/clients/simulation.go | 14 +- apps/api/internal/clients/types.go | 92 ++- .../httpapi/core_flow_integration_test.go | 66 ++- apps/api/internal/runner/helpers.go | 5 + apps/api/internal/runner/pricing.go | 73 ++- apps/api/internal/runner/recording.go | 221 +++++++ apps/api/internal/runner/service.go | 177 ++++-- apps/api/internal/store/postgres.go | 549 ++++++++++++++++-- apps/api/internal/store/runtime_types.go | 55 +- apps/api/internal/store/tasks_runtime.go | 84 ++- .../0017_task_record_enrichment.sql | 72 +++ apps/web/src/pages/WorkspacePage.tsx | 27 +- apps/web/src/styles.css | 10 + packages/contracts/src/index.ts | 188 +++++- 19 files changed, 1781 insertions(+), 165 deletions(-) create mode 100644 apps/api/internal/runner/recording.go create mode 100644 apps/api/migrations/0017_task_record_enrichment.sql diff --git a/apps/api/internal/auth/auth.go b/apps/api/internal/auth/auth.go index d4be656..3d76b91 100644 --- a/apps/api/internal/auth/auth.go +++ b/apps/api/internal/auth/auth.go @@ -39,6 +39,7 @@ type User struct { APIKeyID string `json:"apiKeyId,omitempty"` APIKeySecret string `json:"apiKeySecret,omitempty"` APIKeyName string `json:"apiKeyName,omitempty"` + APIKeyPrefix string `json:"apiKeyPrefix,omitempty"` } type contextKey string @@ -135,6 +136,7 @@ func (a *Authenticator) verifyJWT(tokenString string) (*User, error) { APIKeyID: stringClaim(claims, "apiKeyId"), APIKeySecret: stringClaim(claims, "apiKeySecret"), APIKeyName: stringClaim(claims, "apiKeyName"), + APIKeyPrefix: stringClaim(claims, "apiKeyPrefix"), } if user.Source == "" { user.Source = "gateway" @@ -162,6 +164,9 @@ func (a *Authenticator) SignJWT(user *User, ttl time.Duration) (string, error) { "userGroupId": user.UserGroupID, "userGroupKey": user.UserGroupKey, "userGroupKeys": user.UserGroupKeys, + "apiKeyId": user.APIKeyID, + "apiKeyName": user.APIKeyName, + "apiKeyPrefix": user.APIKeyPrefix, "iat": now.Unix(), "exp": now.Add(ttl).Unix(), } @@ -235,6 +240,31 @@ func hasPermission(roles []string, required Permission) bool { return granted[required] } +func PermissionLevel(roles []string) int { + level := 0 + for _, role := range roles { + switch role { + case "admin", "manager": + if level < 4 { + level = 4 + } + case "operator": + if level < 3 { + level = 3 + } + case "creator": + if level < 2 { + level = 2 + } + case "user": + if level < 1 { + level = 1 + } + } + } + return level +} + func permissionsForRole(role string) []Permission { switch role { case "admin", "manager": diff --git a/apps/api/internal/clients/clients_test.go b/apps/api/internal/clients/clients_test.go index 5f9fdc8..7e9c332 100644 --- a/apps/api/internal/clients/clients_test.go +++ b/apps/api/internal/clients/clients_test.go @@ -17,6 +17,7 @@ func TestOpenAIClientChatContract(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { gotPath = r.URL.Path gotAuth = r.Header.Get("Authorization") + w.Header().Set("X-Request-Id", "req-chat-test") var body map[string]any if err := json.NewDecoder(r.Body).Decode(&body); err != nil { t.Fatalf("decode request: %v", err) @@ -53,6 +54,55 @@ func TestOpenAIClientChatContract(t *testing.T) { if response.Usage.TotalTokens != 5 || response.Result["id"] != "chatcmpl-test" { t.Fatalf("unexpected response: %+v", response) } + if response.RequestID != "req-chat-test" || response.ResponseStartedAt.IsZero() || response.ResponseFinishedAt.IsZero() { + t.Fatalf("response metadata was not captured: %+v", response) + } +} + +func TestOpenAIClientChatStreamContract(t *testing.T) { + var gotStream bool + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var body map[string]any + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatalf("decode request: %v", err) + } + gotStream, _ = body["stream"].(bool) + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte("data: {\"id\":\"chatcmpl-stream\",\"object\":\"chat.completion.chunk\",\"model\":\"deepseek-v4-flash\",\"choices\":[{\"delta\":{\"content\":\"hello\"}}]}\n\n")) + _, _ = w.Write([]byte("data: {\"id\":\"chatcmpl-stream\",\"object\":\"chat.completion.chunk\",\"model\":\"deepseek-v4-flash\",\"choices\":[{\"delta\":{\"content\":\" world\"},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":1,\"completion_tokens\":2,\"total_tokens\":3}}\n\n")) + _, _ = w.Write([]byte("data: [DONE]\n\n")) + })) + defer server.Close() + + response, err := (OpenAIClient{HTTPClient: server.Client()}).Run(context.Background(), Request{ + Kind: "chat.completions", + Model: "DeepSeek-V4-Flash", + Body: map[string]any{ + "model": "DeepSeek-V4-Flash", + "messages": []any{map[string]any{"role": "user", "content": "ping"}}, + "stream": true, + }, + Candidate: store.RuntimeModelCandidate{ + BaseURL: server.URL, + ModelName: "deepseek-v4-flash", + Credentials: map[string]any{"apiKey": "test-key"}, + }, + }) + if err != nil { + t.Fatalf("run openai stream client: %v", err) + } + if !gotStream { + t.Fatalf("expected upstream stream request") + } + if response.Usage.TotalTokens != 3 { + t.Fatalf("unexpected usage: %+v", response.Usage) + } + choices, _ := response.Result["choices"].([]any) + choice, _ := choices[0].(map[string]any) + message, _ := choice["message"].(map[string]any) + if message["content"] != "hello world" { + t.Fatalf("unexpected stream response: %+v", response.Result) + } } func TestGeminiClientChatContract(t *testing.T) { @@ -111,6 +161,14 @@ func TestGeminiClientChatContract(t *testing.T) { } } +func TestGeminiURLAcceptsVersionedBaseURL(t *testing.T) { + got := geminiURL("https://generativelanguage.googleapis.com/v1beta", "gemini-2.5-flash", "test-key") + want := "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key=test-key" + if got != want { + t.Fatalf("unexpected gemini url: %s", got) + } +} + func extractText(result map[string]any) string { choices, _ := result["choices"].([]any) choice, _ := choices[0].(map[string]any) diff --git a/apps/api/internal/clients/gemini.go b/apps/api/internal/clients/gemini.go index 411b17e..8c6b0e5 100644 --- a/apps/api/internal/clients/gemini.go +++ b/apps/api/internal/clients/gemini.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "strings" + "time" ) type GeminiClient struct { @@ -30,11 +31,26 @@ func (c GeminiClient) Run(ctx context.Context, request Request) (Response, error if err != nil { return Response{}, &ClientError{Code: "network", Message: err.Error(), Retryable: true} } + responseStartedAt := time.Now() + requestID := requestIDFromHTTPResponse(resp) result, err := decodeHTTPResponse(resp) + responseFinishedAt := time.Now() if err != nil { - return Response{}, err + return Response{}, annotateResponseError(err, requestID, responseStartedAt, responseFinishedAt) } - return Response{Result: geminiResult(request, result), Usage: geminiUsage(result), Progress: providerProgress(request)}, nil + output := geminiResult(request, result) + if requestID == "" { + requestID = requestIDFromResult(output) + } + return Response{ + Result: output, + RequestID: requestID, + Usage: geminiUsage(result), + Progress: providerProgress(request), + ResponseStartedAt: responseStartedAt, + ResponseFinishedAt: responseFinishedAt, + ResponseDurationMS: responseDurationMS(responseStartedAt, responseFinishedAt), + }, nil } func geminiURL(baseURL string, model string, apiKey string) string { @@ -42,6 +58,9 @@ func geminiURL(baseURL string, model string, apiKey string) string { if base == "" { base = "https://generativelanguage.googleapis.com" } + if strings.HasSuffix(base, "/v1beta") { + base = strings.TrimSuffix(base, "/v1beta") + } escapedModel := url.PathEscape(model) return fmt.Sprintf("%s/v1beta/models/%s:generateContent?key=%s", base, escapedModel, url.QueryEscape(apiKey)) } diff --git a/apps/api/internal/clients/helpers.go b/apps/api/internal/clients/helpers.go index f453ce5..8461a38 100644 --- a/apps/api/internal/clients/helpers.go +++ b/apps/api/internal/clients/helpers.go @@ -1,6 +1,8 @@ package clients import ( + "bufio" + "bytes" "encoding/json" "fmt" "io" @@ -48,6 +50,7 @@ func decodeHTTPResponse(resp *http.Response) (map[string]any, error) { Code: statusCodeName(resp.StatusCode), Message: errorMessage(raw, resp.Status), StatusCode: resp.StatusCode, + RequestID: requestIDFromHTTPResponse(resp), Retryable: HTTPRetryable(resp.StatusCode), } } @@ -61,6 +64,137 @@ func decodeHTTPResponse(resp *http.Response) (map[string]any, error) { return out, nil } +func decodeOpenAIStreamResponse(resp *http.Response, onDelta StreamDelta) (map[string]any, error) { + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + raw, _ := io.ReadAll(io.LimitReader(resp.Body, 16*1024*1024)) + return nil, &ClientError{ + Code: statusCodeName(resp.StatusCode), + Message: errorMessage(raw, resp.Status), + StatusCode: resp.StatusCode, + RequestID: requestIDFromHTTPResponse(resp), + Retryable: HTTPRetryable(resp.StatusCode), + } + } + if result, ok, err := decodeOpenAIStreamReader(resp.Body, onDelta); ok || err != nil { + return result, err + } + return map[string]any{}, nil +} + +func decodeOpenAIStreamReader(reader io.Reader, onDelta StreamDelta) (map[string]any, bool, error) { + scanner := bufio.NewScanner(reader) + scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024) + rawLines := make([]string, 0) + parts := make([]string, 0) + var last map[string]any + var usage Usage + for scanner.Scan() { + rawLine := scanner.Text() + rawLines = append(rawLines, rawLine) + line := strings.TrimSpace(rawLine) + if !strings.HasPrefix(line, "data:") { + continue + } + payload := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + if payload == "" || payload == "[DONE]" { + continue + } + var event map[string]any + if err := json.Unmarshal([]byte(payload), &event); err != nil { + continue + } + last = event + if text := streamEventText(event); text != "" { + parts = append(parts, text) + if onDelta != nil { + if err := onDelta(text); err != nil { + return nil, true, err + } + } + } + if eventUsage := usageFromOpenAI(event); eventUsage.TotalTokens > 0 { + usage = eventUsage + } + } + if err := scanner.Err(); err != nil { + return nil, true, &ClientError{Code: "stream_read_error", Message: err.Error(), Retryable: true} + } + if last == nil { + raw := []byte(strings.Join(rawLines, "\n")) + if len(raw) == 0 { + return map[string]any{}, true, nil + } + var out map[string]any + if err := json.Unmarshal(raw, &out); err != nil { + return nil, false, nil + } + return out, true, nil + } + return buildOpenAIStreamResult(last, parts, usage), true, nil +} + +func decodeOpenAIStream(raw []byte) (map[string]any, bool) { + if !bytes.Contains(raw, []byte("data:")) { + return nil, false + } + result, ok, err := decodeOpenAIStreamReader(bytes.NewReader(raw), nil) + return result, ok && err == nil +} + +func buildOpenAIStreamResult(last map[string]any, parts []string, usage Usage) map[string]any { + if len(parts) == 0 { + return last + } + var out map[string]any + out = map[string]any{ + "id": stringFromAny(firstPresent(last["id"], "chatcmpl-stream")), + "object": "chat.completion", + "model": stringFromAny(last["model"]), + "choices": []any{map[string]any{ + "index": 0, + "message": map[string]any{ + "role": "assistant", + "content": strings.Join(parts, ""), + }, + "finish_reason": "stop", + }}, + } + if usage.TotalTokens > 0 { + out["usage"] = map[string]any{ + "prompt_tokens": usage.InputTokens, + "completion_tokens": usage.OutputTokens, + "total_tokens": usage.TotalTokens, + } + } + return out +} + +func streamEventText(event map[string]any) string { + if choices, ok := event["choices"].([]any); ok { + for _, rawChoice := range choices { + choice, _ := rawChoice.(map[string]any) + if delta, ok := choice["delta"].(map[string]any); ok { + if content, ok := delta["content"].(string); ok { + return content + } + } + if message, ok := choice["message"].(map[string]any); ok { + if content, ok := message["content"].(string); ok { + return content + } + } + } + } + if delta, ok := event["delta"].(string); ok { + return delta + } + if text, ok := event["output_text"].(string); ok { + return text + } + return "" +} + func usageFromOpenAI(result map[string]any) Usage { usage, _ := result["usage"].(map[string]any) input := intFromAny(firstPresent(usage["prompt_tokens"], usage["input_tokens"])) @@ -72,6 +206,34 @@ func usageFromOpenAI(result map[string]any) Usage { return Usage{InputTokens: input, OutputTokens: output, TotalTokens: total} } +func requestIDFromHTTPResponse(resp *http.Response) string { + if resp == nil { + return "" + } + for _, key := range []string{ + "x-request-id", + "x-requestid", + "request-id", + "x-amzn-requestid", + "x-amz-request-id", + "cf-ray", + } { + if value := strings.TrimSpace(resp.Header.Get(key)); value != "" { + return value + } + } + return "" +} + +func requestIDFromResult(result map[string]any) string { + for _, key := range []string{"request_id", "requestId", "id", "response_id", "responseId"} { + if value := strings.TrimSpace(stringFromAny(result[key])); value != "" { + return value + } + } + return "" +} + func intFromAny(value any) int { switch typed := value.(type) { case float64: @@ -85,6 +247,13 @@ func intFromAny(value any) int { } } +func stringFromAny(value any) string { + if text, ok := value.(string); ok { + return text + } + return "" +} + func firstPresent(values ...any) any { for _, value := range values { if value != nil { diff --git a/apps/api/internal/clients/openai.go b/apps/api/internal/clients/openai.go index 310ea09..822c729 100644 --- a/apps/api/internal/clients/openai.go +++ b/apps/api/internal/clients/openai.go @@ -6,6 +6,7 @@ import ( "encoding/json" "net/http" "strings" + "time" ) type OpenAIClient struct { @@ -23,6 +24,7 @@ func (c OpenAIClient) Run(ctx context.Context, request Request) (Response, error } body := cloneBody(request.Body) body["model"] = request.Candidate.ModelName + stream := request.Stream || boolValue(body, "stream") raw, _ := json.Marshal(body) req, err := http.NewRequestWithContext(ctx, http.MethodPost, joinURL(request.Candidate.BaseURL, endpoint), bytes.NewReader(raw)) if err != nil { @@ -34,11 +36,36 @@ func (c OpenAIClient) Run(ctx context.Context, request Request) (Response, error if err != nil { return Response{}, &ClientError{Code: "network", Message: err.Error(), Retryable: true} } - result, err := decodeHTTPResponse(resp) + responseStartedAt := time.Now() + requestID := requestIDFromHTTPResponse(resp) + result, err := decodeOpenAIResponse(resp, stream, request.StreamDelta) + responseFinishedAt := time.Now() if err != nil { - return Response{}, err + return Response{}, annotateResponseError(err, requestID, responseStartedAt, responseFinishedAt) } - return Response{Result: result, Usage: usageFromOpenAI(result), Progress: providerProgress(request)}, nil + if requestID == "" { + requestID = requestIDFromResult(result) + } + return Response{ + Result: result, + RequestID: requestID, + Usage: usageFromOpenAI(result), + Progress: providerProgress(request), + ResponseStartedAt: responseStartedAt, + ResponseFinishedAt: responseFinishedAt, + ResponseDurationMS: responseDurationMS(responseStartedAt, responseFinishedAt), + }, nil +} + +func decodeOpenAIResponse(resp *http.Response, stream bool, onDelta StreamDelta) (map[string]any, error) { + if stream { + result, err := decodeOpenAIStreamResponse(resp, onDelta) + if err == nil { + return result, nil + } + return nil, err + } + return decodeHTTPResponse(resp) } func openAIEndpoint(kind string) string { diff --git a/apps/api/internal/clients/simulation.go b/apps/api/internal/clients/simulation.go index 8396fc2..c4c2b3d 100644 --- a/apps/api/internal/clients/simulation.go +++ b/apps/api/internal/clients/simulation.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" ) type SimulationClient struct{} @@ -17,10 +18,17 @@ func (c SimulationClient) Run(ctx context.Context, request Request) (Response, e if profile == "fatal_failure" || profile == "non_retryable_failure" { return Response{}, &ClientError{Code: "bad_request", Message: "simulated non-retryable failure", Retryable: false} } + responseStartedAt := time.Now() + result := simulatedResult(request) + responseFinishedAt := time.Now() return Response{ - Result: simulatedResult(request), - Usage: simulatedUsage(request), - Progress: simulatedProgress(request), + Result: result, + RequestID: requestIDFromResult(result), + Usage: simulatedUsage(request), + Progress: simulatedProgress(request), + ResponseStartedAt: responseStartedAt, + ResponseFinishedAt: responseFinishedAt, + ResponseDurationMS: responseDurationMS(responseStartedAt, responseFinishedAt), }, nil } diff --git a/apps/api/internal/clients/types.go b/apps/api/internal/clients/types.go index f5f539d..7c30f4a 100644 --- a/apps/api/internal/clients/types.go +++ b/apps/api/internal/clients/types.go @@ -4,23 +4,30 @@ import ( "context" "errors" "net/http" + "time" "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" ) type Request struct { - Kind string - ModelType string - Model string - Body map[string]any - Candidate store.RuntimeModelCandidate - Stream bool + Kind string + ModelType string + Model string + Body map[string]any + Candidate store.RuntimeModelCandidate + Stream bool + StreamDelta StreamDelta } type Response struct { - Result map[string]any - Usage Usage - Progress []Progress + Result map[string]any + RequestID string + Usage Usage + Progress []Progress + Metrics map[string]any + ResponseStartedAt time.Time + ResponseFinishedAt time.Time + ResponseDurationMS int64 } type Usage struct { @@ -36,15 +43,21 @@ type Progress struct { Payload map[string]any } +type StreamDelta func(text string) error + type Client interface { Run(ctx context.Context, request Request) (Response, error) } type ClientError struct { - Code string - Message string - StatusCode int - Retryable bool + Code string + Message string + StatusCode int + RequestID string + ResponseStartedAt time.Time + ResponseFinishedAt time.Time + ResponseDurationMS int64 + Retryable bool } func (e *ClientError) Error() string { @@ -67,6 +80,59 @@ func ErrorCode(err error) string { return "client_error" } +type ResponseMetadata struct { + RequestID string + ResponseStartedAt time.Time + ResponseFinishedAt time.Time + ResponseDurationMS int64 + StatusCode int +} + +func ErrorResponseMetadata(err error) ResponseMetadata { + var clientErr *ClientError + if errors.As(err, &clientErr) { + return ResponseMetadata{ + RequestID: clientErr.RequestID, + ResponseStartedAt: clientErr.ResponseStartedAt, + ResponseFinishedAt: clientErr.ResponseFinishedAt, + ResponseDurationMS: clientErr.ResponseDurationMS, + StatusCode: clientErr.StatusCode, + } + } + return ResponseMetadata{} +} + +func annotateResponseError(err error, requestID string, startedAt time.Time, finishedAt time.Time) error { + var clientErr *ClientError + if !errors.As(err, &clientErr) { + return err + } + if clientErr.RequestID == "" { + clientErr.RequestID = requestID + } + if clientErr.ResponseStartedAt.IsZero() { + clientErr.ResponseStartedAt = startedAt + } + if clientErr.ResponseFinishedAt.IsZero() { + clientErr.ResponseFinishedAt = finishedAt + } + if clientErr.ResponseDurationMS == 0 { + clientErr.ResponseDurationMS = responseDurationMS(clientErr.ResponseStartedAt, clientErr.ResponseFinishedAt) + } + return err +} + func HTTPRetryable(status int) bool { return status == http.StatusTooManyRequests || status == http.StatusRequestTimeout || status >= 500 } + +func responseDurationMS(startedAt time.Time, finishedAt time.Time) int64 { + if startedAt.IsZero() || finishedAt.IsZero() { + return 0 + } + duration := finishedAt.Sub(startedAt).Milliseconds() + if duration < 0 { + return 0 + } + return duration +} diff --git a/apps/api/internal/httpapi/core_flow_integration_test.go b/apps/api/internal/httpapi/core_flow_integration_test.go index 7806b02..9a1bab8 100644 --- a/apps/api/internal/httpapi/core_flow_integration_test.go +++ b/apps/api/internal/httpapi/core_flow_integration_test.go @@ -93,6 +93,7 @@ func TestCoreLocalFlow(t *testing.T) { Secret string `json:"secret"` APIKey struct { ID string `json:"id"` + Name string `json:"name"` KeyPrefix string `json:"keyPrefix"` Status string `json:"status"` } `json:"apiKey"` @@ -187,10 +188,10 @@ VALUES ($1, 5, '{"purpose":"core-flow"}'::jsonb)`, inviteCode); err != nil { var baseModels struct { Items []struct { - ID string `json:"id"` - CanonicalModelKey string `json:"canonicalModelKey"` - ProviderModelName string `json:"providerModelName"` - ModelType string `json:"modelType"` + ID string `json:"id"` + CanonicalModelKey string `json:"canonicalModelKey"` + ProviderModelName string `json:"providerModelName"` + ModelType []string `json:"modelType"` } `json:"items"` } doJSON(t, server.URL, http.MethodGet, "/api/v1/catalog/base-models", apiKeyResponse.Secret, nil, http.StatusOK, &baseModels) @@ -202,36 +203,47 @@ VALUES ($1, 5, '{"purpose":"core-flow"}'::jsonb)`, inviteCode); err != nil { "providerKey": "openai", "canonicalModelKey": "openai:smoke-base-" + suffixText, "providerModelName": "smoke-base-" + suffixText, - "modelType": "text_generate", - "displayName": "Smoke Base Model", + "modelType": []string{"text_generate"}, + "modelAlias": "Smoke Base Model", "capabilities": map[string]any{"originalTypes": []string{"text_generate"}}, "metadata": map[string]any{"source": "test"}, } var createdBaseModel struct { ID string `json:"id"` CanonicalModelKey string `json:"canonicalModelKey"` - DisplayName string `json:"displayName"` + ModelAlias string `json:"modelAlias"` } doJSON(t, server.URL, http.MethodPost, "/api/v1/catalog/base-models", apiKeyResponse.Secret, baseModelInput, http.StatusCreated, &createdBaseModel) if createdBaseModel.ID == "" || createdBaseModel.CanonicalModelKey != baseModelInput["canonicalModelKey"] { t.Fatalf("unexpected created base model: %+v", createdBaseModel) } - baseModelInput["displayName"] = "Smoke Base Model Updated" + baseModelInput["modelAlias"] = "Smoke Base Model Updated" var updatedBaseModel struct { - DisplayName string `json:"displayName"` + ModelAlias string `json:"modelAlias"` } doJSON(t, server.URL, http.MethodPatch, "/api/v1/catalog/base-models/"+createdBaseModel.ID, apiKeyResponse.Secret, baseModelInput, http.StatusOK, &updatedBaseModel) - if updatedBaseModel.DisplayName != "Smoke Base Model Updated" { + if updatedBaseModel.ModelAlias != "Smoke Base Model Updated" { t.Fatalf("unexpected updated base model: %+v", updatedBaseModel) } doJSON(t, server.URL, http.MethodDelete, "/api/v1/catalog/base-models/"+createdBaseModel.ID, apiKeyResponse.Secret, nil, http.StatusNoContent, nil) var taskResponse struct { Task struct { - ID string `json:"id"` - Status string `json:"status"` - RunMode string `json:"runMode"` - Result map[string]any `json:"result"` + ID string `json:"id"` + Status string `json:"status"` + RunMode string `json:"runMode"` + APIKeyID string `json:"apiKeyId"` + APIKeyName string `json:"apiKeyName"` + RequestID string `json:"requestId"` + ResolvedModel string `json:"resolvedModel"` + Usage map[string]any `json:"usage"` + Metrics map[string]any `json:"metrics"` + BillingSummary map[string]any `json:"billingSummary"` + FinalChargeAmount float64 `json:"finalChargeAmount"` + ResponseStartedAt string `json:"responseStartedAt"` + ResponseFinishedAt string `json:"responseFinishedAt"` + ResponseDurationMS int64 `json:"responseDurationMs"` + Result map[string]any `json:"result"` } `json:"task"` } doJSON(t, server.URL, http.MethodPost, "/api/v1/chat/completions", apiKeyResponse.Secret, map[string]any{ @@ -243,6 +255,18 @@ VALUES ($1, 5, '{"purpose":"core-flow"}'::jsonb)`, inviteCode); err != nil { if taskResponse.Task.ID == "" || taskResponse.Task.Status != "succeeded" || taskResponse.Task.RunMode != "simulation" { t.Fatalf("unexpected task response: %+v", taskResponse.Task) } + if taskResponse.Task.APIKeyID != apiKeyResponse.APIKey.ID || taskResponse.Task.APIKeyName != apiKeyResponse.APIKey.Name { + t.Fatalf("task should record full api key identity: %+v key=%+v", taskResponse.Task, apiKeyResponse.APIKey) + } + if taskResponse.Task.RequestID == "" || taskResponse.Task.ResolvedModel == "" || taskResponse.Task.ResponseStartedAt == "" || taskResponse.Task.ResponseFinishedAt == "" { + t.Fatalf("task should record provider request and response timing: %+v", taskResponse.Task) + } + if taskResponse.Task.Usage["totalTokens"] == nil || taskResponse.Task.FinalChargeAmount <= 0 { + t.Fatalf("task should record token usage and final charge: %+v", taskResponse.Task) + } + if taskResponse.Task.BillingSummary["finalCharge"] == nil || taskResponse.Task.Metrics["requestedModel"] == nil { + t.Fatalf("task should record billing summary and task metrics: %+v", taskResponse.Task) + } var compatChat map[string]any doJSON(t, server.URL, http.MethodPost, "/v1/chat/completions", apiKeyResponse.Secret, map[string]any{ @@ -348,14 +372,22 @@ VALUES ($1, 5, '{"purpose":"core-flow"}'::jsonb)`, inviteCode); err != nil { } var taskDetail struct { - ID string `json:"id"` - Status string `json:"status"` - Result map[string]any `json:"result"` + ID string `json:"id"` + Status string `json:"status"` + APIKeyName string `json:"apiKeyName"` + RequestID string `json:"requestId"` + Usage map[string]any `json:"usage"` + BillingSummary map[string]any `json:"billingSummary"` + FinalChargeAmount float64 `json:"finalChargeAmount"` + Result map[string]any `json:"result"` } doJSON(t, server.URL, http.MethodGet, "/api/v1/tasks/"+taskResponse.Task.ID, apiKeyResponse.Secret, nil, http.StatusOK, &taskDetail) if taskDetail.Status != "succeeded" || taskDetail.Result["id"] == "" { t.Fatalf("unexpected task detail: %+v", taskDetail) } + if taskDetail.APIKeyName != apiKeyResponse.APIKey.Name || taskDetail.RequestID == "" || taskDetail.Usage["totalTokens"] == nil || taskDetail.FinalChargeAmount <= 0 { + t.Fatalf("task detail should expose enriched record fields: %+v", taskDetail) + } req, err := http.NewRequest(http.MethodGet, server.URL+"/api/v1/tasks/"+taskResponse.Task.ID+"/events", nil) if err != nil { diff --git a/apps/api/internal/runner/helpers.go b/apps/api/internal/runner/helpers.go index efb8a48..8b0834f 100644 --- a/apps/api/internal/runner/helpers.go +++ b/apps/api/internal/runner/helpers.go @@ -10,6 +10,11 @@ func stringFromMap(values map[string]any, key string) string { return strings.TrimSpace(value) } +func stringFromAny(value any) string { + text, _ := value.(string) + return strings.TrimSpace(text) +} + func boolFromMap(values map[string]any, key string) bool { value, _ := values[key].(bool) return value diff --git a/apps/api/internal/runner/pricing.go b/apps/api/internal/runner/pricing.go index 440ade3..1b4418c 100644 --- a/apps/api/internal/runner/pricing.go +++ b/apps/api/internal/runner/pricing.go @@ -16,7 +16,7 @@ type EstimateResult struct { } func (s *Service) Estimate(ctx context.Context, kind string, model string, body map[string]any, user *auth.User) (EstimateResult, error) { - candidates, err := s.store.ListModelCandidates(ctx, model, modelTypeFromKind(kind)) + candidates, err := s.store.ListModelCandidates(ctx, model, modelTypeFromKind(kind), user) if err != nil { return EstimateResult{}, err } @@ -40,15 +40,15 @@ func (s *Service) Estimate(ctx context.Context, kind string, model string, body func (s *Service) billings(ctx context.Context, user *auth.User, kind string, body map[string]any, candidate store.RuntimeModelCandidate, response clients.Response, simulated bool) []any { config := effectiveBillingConfig(candidate) discount := effectiveDiscount(ctx, s.store, user, candidate) - if modelTypeFromKind(kind) == "chat" { + if isTextGenerationKind(kind) { inputTokens := response.Usage.InputTokens outputTokens := response.Usage.OutputTokens if inputTokens == 0 && outputTokens == 0 { inputTokens = estimateRequestTokens(body) outputTokens = 1 } - inputAmount := roundPrice(float64(inputTokens) / 1000 * price(config, "textInputPer1k") * discount) - outputAmount := roundPrice(float64(outputTokens) / 1000 * price(config, "textOutputPer1k") * discount) + inputAmount := roundPrice(float64(inputTokens) / 1000 * resourcePrice(config, "text", "textInputPer1k", "inputTokenPrice", "basePrice") * discount) + outputAmount := roundPrice(float64(outputTokens) / 1000 * resourcePrice(config, "text", "textOutputPer1k", "outputTokenPrice", "basePrice") * discount) return []any{ billingLine(candidate, "text_input", "1k_tokens", inputTokens, inputAmount, discount, simulated), billingLine(candidate, "text_output", "1k_tokens", outputTokens, outputAmount, discount, simulated), @@ -59,13 +59,19 @@ func (s *Service) billings(ctx context.Context, user *auth.User, kind string, bo count = 1 } resource := "image" + unit := "image" baseKey := "imageBase" if kind == "images.edits" { resource = "image_edit" baseKey = "editBase" } - amount := float64(count) * price(config, baseKey) * weighted(config, "qualityWeights", stringFromMap(body, "quality")) * weighted(config, "sizeWeights", stringFromMap(body, "size")) * discount - return []any{billingLine(candidate, resource, "image", count, roundPrice(amount), discount, simulated)} + if kind == "videos.generations" { + resource = "video" + unit = "video" + baseKey = "videoBase" + } + amount := float64(count) * resourcePrice(config, resource, baseKey, "basePrice") * resourceWeight(config, resource, "qualityWeights", stringFromMap(body, "quality")) * resourceWeight(config, resource, "sizeWeights", stringFromMap(body, "size")) * resourceWeight(config, resource, "resolutionWeights", firstNonEmptyString(stringFromMap(body, "resolution"), stringFromMap(body, "size"))) * discount + return []any{billingLine(candidate, resource, unit, count, roundPrice(amount), discount, simulated)} } func effectiveBillingConfig(candidate store.RuntimeModelCandidate) map[string]any { @@ -121,6 +127,28 @@ func price(config map[string]any, key string) float64 { return 0 } +func resourcePrice(config map[string]any, resource string, keys ...string) float64 { + for _, key := range keys { + if value := price(config, key); value > 0 { + return value + } + } + if resourceConfig, ok := config[resource].(map[string]any); ok { + for _, key := range keys { + if value := floatFromAny(resourceConfig[key]); value > 0 { + return value + } + } + if value := floatFromAny(resourceConfig["basePrice"]); value > 0 { + return value + } + } + if resource == "image_edit" { + return resourcePrice(config, "image", keys...) + } + return 0 +} + func weighted(config map[string]any, key string, name string) float64 { if strings.TrimSpace(name) == "" { return 1 @@ -132,6 +160,39 @@ func weighted(config map[string]any, key string, name string) float64 { return 1 } +func resourceWeight(config map[string]any, resource string, key string, name string) float64 { + if value := weighted(config, key, name); value != 1 { + return value + } + if strings.TrimSpace(name) == "" { + return 1 + } + resourceConfig, _ := config[resource].(map[string]any) + if len(resourceConfig) == 0 && resource == "image_edit" { + resourceConfig, _ = config["image"].(map[string]any) + } + if weights, ok := resourceConfig["dynamicWeight"].(map[string]any); ok { + if value := floatFromAny(weights[name]); value > 0 { + return value + } + } + if weights, ok := resourceConfig[key].(map[string]any); ok { + if value := floatFromAny(weights[name]); value > 0 { + return value + } + } + return 1 +} + +func firstNonEmptyString(values ...string) string { + for _, value := range values { + if strings.TrimSpace(value) != "" { + return strings.TrimSpace(value) + } + } + return "" +} + func roundPrice(value float64) float64 { return math.Round(value*1000000) / 1000000 } diff --git a/apps/api/internal/runner/recording.go b/apps/api/internal/runner/recording.go new file mode 100644 index 0000000..6eae8e3 --- /dev/null +++ b/apps/api/internal/runner/recording.go @@ -0,0 +1,221 @@ +package runner + +import ( + "strings" + "time" + + "github.com/easyai/easyai-ai-gateway/apps/api/internal/auth" + "github.com/easyai/easyai-ai-gateway/apps/api/internal/clients" + "github.com/easyai/easyai-ai-gateway/apps/api/internal/store" +) + +type taskRecordDetails struct { + RequestID string + ResolvedModel string + Usage map[string]any + Metrics map[string]any + BillingSummary map[string]any + FinalChargeAmount float64 + ResponseStartedAt time.Time + ResponseFinishedAt time.Time + ResponseDurationMS int64 +} + +func buildSuccessRecord(task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, response clients.Response, billings []any, simulated bool) taskRecordDetails { + usage := usageToMap(response.Usage) + metrics := taskMetrics(task, user, body, candidate, response, simulated) + summary := summarizeBillings(billings, simulated) + finalAmount := floatFromAny(summary["totalAmount"]) + return taskRecordDetails{ + RequestID: response.RequestID, + ResolvedModel: candidate.ModelName, + Usage: usage, + Metrics: metrics, + BillingSummary: summary, + FinalChargeAmount: finalAmount, + ResponseStartedAt: response.ResponseStartedAt, + ResponseFinishedAt: response.ResponseFinishedAt, + ResponseDurationMS: response.ResponseDurationMS, + } +} + +func taskMetrics(task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, response clients.Response, simulated bool) map[string]any { + metrics := map[string]any{ + "kind": task.Kind, + "runMode": task.RunMode, + "requestedModel": task.Model, + "resolvedModel": candidate.ModelName, + "modelAlias": candidate.ModelAlias, + "providerModel": candidate.ProviderModelName, + "canonicalModel": candidate.CanonicalModelKey, + "modelType": candidate.ModelType, + "provider": candidate.Provider, + "platformId": candidate.PlatformID, + "platformName": candidate.PlatformName, + "platformModelId": candidate.PlatformModelID, + "clientId": candidate.ClientID, + "queueKey": candidate.QueueKey, + "requestId": response.RequestID, + "simulated": simulated, + } + if user != nil { + metrics["apiKeyId"] = user.APIKeyID + metrics["apiKeyName"] = user.APIKeyName + metrics["apiKeyPrefix"] = user.APIKeyPrefix + } + if response.ResponseDurationMS > 0 { + metrics["responseDurationMs"] = response.ResponseDurationMS + } + switch task.Kind { + case "chat.completions", "responses": + metrics["stream"] = boolFromMap(body, "stream") + metrics["messageCount"] = messageCount(body) + copyIfPresent(metrics, body, "temperature") + copyIfPresent(metrics, body, "max_tokens") + copyIfPresent(metrics, body, "max_output_tokens") + case "images.generations", "images.edits": + metrics["imageCount"] = requestedCount(body) + metrics["outputImageCount"] = outputDataCount(response.Result) + metrics["inputImageCount"] = imageInputCount(body) + metrics["hasMask"] = stringFromMap(body, "mask") != "" + copyIfPresent(metrics, body, "size") + copyIfPresent(metrics, body, "quality") + copyIfPresent(metrics, body, "style") + case "videos.generations": + metrics["hasReferenceImage"] = imageInputCount(body) > 0 + metrics["hasReferenceVideo"] = hasAnyString(body, "video", "video_url", "videoUrl", "reference_video", "referenceVideo") + copyIfPresent(metrics, body, "duration") + copyIfPresent(metrics, body, "resolution") + copyIfPresent(metrics, body, "size") + copyIfPresent(metrics, body, "aspect_ratio") + copyIfPresent(metrics, body, "aspectRatio") + copyIfPresent(metrics, body, "fps") + } + return metrics +} + +func usageToMap(usage clients.Usage) map[string]any { + out := map[string]any{} + if usage.InputTokens > 0 { + out["inputTokens"] = usage.InputTokens + out["promptTokens"] = usage.InputTokens + } + if usage.OutputTokens > 0 { + out["outputTokens"] = usage.OutputTokens + out["completionTokens"] = usage.OutputTokens + } + if usage.TotalTokens > 0 { + out["totalTokens"] = usage.TotalTokens + } + return out +} + +func summarizeBillings(billings []any, simulated bool) map[string]any { + amountByCurrency := map[string]float64{} + for _, raw := range billings { + line, _ := raw.(map[string]any) + if line == nil { + continue + } + currency := strings.TrimSpace(stringFromAny(line["currency"])) + if currency == "" { + currency = "resource" + } + amountByCurrency[currency] = roundPrice(amountByCurrency[currency] + floatFromAny(line["amount"])) + } + currency := "" + totalAmount := 0.0 + for key, amount := range amountByCurrency { + if currency == "" { + currency = key + } else if currency != key { + currency = "mixed" + } + totalAmount += amount + } + if currency == "" { + currency = "resource" + } + totalAmount = roundPrice(totalAmount) + return map[string]any{ + "lineCount": len(billings), + "totalAmount": totalAmount, + "amountByCurrency": amountByCurrency, + "currency": currency, + "simulated": simulated, + "finalCharge": map[string]any{ + "amount": totalAmount, + "currency": currency, + "simulated": simulated, + }, + } +} + +func failureMetrics(err error, simulated bool) (string, map[string]any, time.Time, time.Time, int64) { + meta := clients.ErrorResponseMetadata(err) + metrics := map[string]any{ + "simulated": simulated, + } + if err != nil { + metrics["error"] = err.Error() + metrics["retryable"] = clients.IsRetryable(err) + } + if meta.StatusCode > 0 { + metrics["statusCode"] = meta.StatusCode + } + if meta.RequestID != "" { + metrics["requestId"] = meta.RequestID + } + if meta.ResponseDurationMS > 0 { + metrics["responseDurationMs"] = meta.ResponseDurationMS + } + return meta.RequestID, metrics, meta.ResponseStartedAt, meta.ResponseFinishedAt, meta.ResponseDurationMS +} + +func messageCount(body map[string]any) int { + messages, _ := body["messages"].([]any) + return len(messages) +} + +func requestedCount(body map[string]any) int { + count := int(floatFromAny(body["n"])) + if count <= 0 { + return 1 + } + return count +} + +func outputDataCount(result map[string]any) int { + data, _ := result["data"].([]any) + return len(data) +} + +func imageInputCount(body map[string]any) int { + count := 0 + for _, key := range []string{"image", "image_url", "imageUrl"} { + if stringFromMap(body, key) != "" { + count++ + } + } + for _, key := range []string{"image_urls", "imageUrls", "images"} { + if values, ok := body[key].([]any); ok { + count += len(values) + } + } + return count +} + +func hasAnyString(body map[string]any, keys ...string) bool { + for _, key := range keys { + if stringFromMap(body, key) != "" { + return true + } + } + return false +} + +func copyIfPresent(target map[string]any, body map[string]any, key string) { + if value, ok := body[key]; ok && value != nil { + target[key] = value + } +} diff --git a/apps/api/internal/runner/service.go b/apps/api/internal/runner/service.go index 3fd0614..9ab77af 100644 --- a/apps/api/internal/runner/service.go +++ b/apps/api/internal/runner/service.go @@ -41,18 +41,26 @@ func New(cfg config.Config, db *store.Store, logger *slog.Logger) *Service { } func (s *Service) Execute(ctx context.Context, task store.GatewayTask, user *auth.User) (Result, error) { + return s.execute(ctx, task, user, nil) +} + +func (s *Service) ExecuteStream(ctx context.Context, task store.GatewayTask, user *auth.User, onDelta clients.StreamDelta) (Result, error) { + return s.execute(ctx, task, user, onDelta) +} + +func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *auth.User, onDelta clients.StreamDelta) (Result, error) { modelType := modelTypeFromKind(task.Kind) body := normalizeRequest(task.Kind, task.Request) if err := validateRequest(task.Kind, body); err != nil { - failed, finishErr := s.failTask(ctx, task.ID, "bad_request", err.Error(), task.RunMode == "simulation") + failed, finishErr := s.failTask(ctx, task.ID, "bad_request", err.Error(), task.RunMode == "simulation", err) if finishErr != nil { return Result{}, finishErr } return Result{Task: failed, Output: failed.Result}, err } - candidates, err := s.store.ListModelCandidates(ctx, task.Model, modelType) + candidates, err := s.store.ListModelCandidates(ctx, task.Model, modelType, user) if err != nil { - failed, finishErr := s.failTask(ctx, task.ID, "no_model_candidate", err.Error(), task.RunMode == "simulation") + failed, finishErr := s.failTask(ctx, task.ID, "no_model_candidate", err.Error(), task.RunMode == "simulation", err) if finishErr != nil { return Result{}, finishErr } @@ -72,14 +80,35 @@ func (s *Service) Execute(ctx context.Context, task store.GatewayTask, user *aut break } attemptNo := index + 1 - response, err := s.runCandidate(ctx, task, user, body, candidate, attemptNo) + response, err := s.runCandidate(ctx, task, user, body, candidate, attemptNo, onDelta) if err == nil { billings := s.billings(ctx, user, task.Kind, body, candidate, response, isSimulation(task, candidate)) - finished, finishErr := s.store.FinishTaskSuccess(ctx, task.ID, response.Result, billings) + record := buildSuccessRecord(task, user, body, candidate, response, billings, isSimulation(task, candidate)) + finished, finishErr := s.store.FinishTaskSuccess(ctx, store.FinishTaskSuccessInput{ + TaskID: task.ID, + Result: response.Result, + Billings: billings, + RequestID: record.RequestID, + ResolvedModel: record.ResolvedModel, + Usage: record.Usage, + Metrics: record.Metrics, + BillingSummary: record.BillingSummary, + FinalChargeAmount: record.FinalChargeAmount, + ResponseStartedAt: record.ResponseStartedAt, + ResponseFinishedAt: record.ResponseFinishedAt, + ResponseDurationMS: record.ResponseDurationMS, + }) if finishErr != nil { return Result{}, finishErr } - if err := s.emit(ctx, task.ID, "task.completed", "succeeded", "completed", 1, "task completed", map[string]any{"result": response.Result, "billings": billings}, isSimulation(task, candidate)); err != nil { + if err := s.emit(ctx, task.ID, "task.completed", "succeeded", "completed", 1, "task completed", map[string]any{ + "result": response.Result, + "billings": billings, + "usage": record.Usage, + "metrics": record.Metrics, + "billingSummary": record.BillingSummary, + "requestId": record.RequestID, + }, isSimulation(task, candidate)); err != nil { return Result{}, err } return Result{Task: finished, Output: response.Result}, nil @@ -98,14 +127,14 @@ func (s *Service) Execute(ctx context.Context, task store.GatewayTask, user *aut if lastErr != nil { message = lastErr.Error() } - failed, err := s.failTask(ctx, task.ID, code, message, task.RunMode == "simulation") + failed, err := s.failTask(ctx, task.ID, code, message, task.RunMode == "simulation", lastErr) if err != nil { return Result{}, err } return Result{Task: failed, Output: failed.Result}, lastErr } -func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, attemptNo int) (clients.Response, error) { +func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, attemptNo int, onDelta clients.StreamDelta) (clients.Response, error) { simulated := isSimulation(task, candidate) if err := s.emit(ctx, task.ID, "task.attempt.started", "running", "submitting", 0.25, "client attempt started", map[string]any{"attempt": attemptNo, "clientId": candidate.ClientID}, simulated); err != nil { return clients.Response{}, err @@ -127,7 +156,14 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user reservations := s.rateLimitReservations(ctx, user, candidate, body) limitResult, err := s.store.ReserveRateLimits(ctx, task.ID, reservations) if err != nil { - _ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{AttemptID: attemptID, Status: "failed", Retryable: false, ErrorCode: "rate_limit", ErrorMessage: err.Error()}) + _ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{ + AttemptID: attemptID, + Status: "failed", + Retryable: false, + Metrics: map[string]any{"error": err.Error(), "candidateModel": candidate.ModelName, "clientId": candidate.ClientID}, + ErrorCode: "rate_limit", + ErrorMessage: err.Error(), + }) return clients.Response{}, &clients.ClientError{Code: "rate_limit", Message: err.Error(), Retryable: false} } defer s.store.ReleaseConcurrencyLeases(context.WithoutCancel(ctx), limitResult.LeaseIDs) @@ -138,34 +174,77 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user defer s.store.RecordClientRelease(context.WithoutCancel(ctx), candidate.ClientID, "") client := s.clientFor(candidate, simulated) + callStartedAt := time.Now() response, err := client.Run(ctx, clients.Request{ - Kind: task.Kind, - ModelType: candidate.ModelType, - Model: task.Model, - Body: body, - Candidate: candidate, - Stream: boolFromMap(body, "stream"), + Kind: task.Kind, + ModelType: candidate.ModelType, + Model: task.Model, + Body: body, + Candidate: candidate, + Stream: boolFromMap(body, "stream"), + StreamDelta: onDelta, }) + callFinishedAt := time.Now() + if response.ResponseStartedAt.IsZero() { + response.ResponseStartedAt = callStartedAt + } + if response.ResponseFinishedAt.IsZero() { + response.ResponseFinishedAt = callFinishedAt + } + if response.ResponseDurationMS == 0 { + response.ResponseDurationMS = response.ResponseFinishedAt.Sub(response.ResponseStartedAt).Milliseconds() + if response.ResponseDurationMS < 0 { + response.ResponseDurationMS = 0 + } + } if err != nil { retryable := clients.IsRetryable(err) + requestID, metrics, responseStartedAt, responseFinishedAt, responseDurationMS := failureMetrics(err, simulated) + if responseStartedAt.IsZero() { + responseStartedAt = callStartedAt + } + if responseFinishedAt.IsZero() { + responseFinishedAt = callFinishedAt + } + if responseDurationMS == 0 { + responseDurationMS = responseFinishedAt.Sub(responseStartedAt).Milliseconds() + if responseDurationMS < 0 { + responseDurationMS = 0 + } + } _ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{ - AttemptID: attemptID, - Status: "failed", - Retryable: retryable, - ErrorCode: clients.ErrorCode(err), - ErrorMessage: err.Error(), + AttemptID: attemptID, + Status: "failed", + Retryable: retryable, + RequestID: requestID, + Metrics: metrics, + ResponseStartedAt: responseStartedAt, + ResponseFinishedAt: responseFinishedAt, + ResponseDurationMS: responseDurationMS, + ErrorCode: clients.ErrorCode(err), + ErrorMessage: err.Error(), }) - _ = s.emit(ctx, task.ID, "task.attempt.failed", "running", "attempt_failed", 0.45, err.Error(), map[string]any{"attempt": attemptNo, "retryable": retryable}, simulated) + _ = s.emit(ctx, task.ID, "task.attempt.failed", "running", "attempt_failed", 0.45, err.Error(), map[string]any{"attempt": attemptNo, "retryable": retryable, "requestId": requestID, "metrics": metrics}, simulated) return clients.Response{}, err } uploadedResult, err := s.uploadGeneratedAssets(ctx, response.Result) if err != nil { + metrics := taskMetrics(task, user, body, candidate, response, simulated) + metrics["error"] = err.Error() + metrics["retryable"] = clients.IsRetryable(err) _ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{ - AttemptID: attemptID, - Status: "failed", - Retryable: clients.IsRetryable(err), - ErrorCode: clients.ErrorCode(err), - ErrorMessage: err.Error(), + AttemptID: attemptID, + Status: "failed", + Retryable: clients.IsRetryable(err), + RequestID: response.RequestID, + Usage: usageToMap(response.Usage), + Metrics: metrics, + ResponseSnapshot: response.Result, + ResponseStartedAt: response.ResponseStartedAt, + ResponseFinishedAt: response.ResponseFinishedAt, + ResponseDurationMS: response.ResponseDurationMS, + ErrorCode: clients.ErrorCode(err), + ErrorMessage: err.Error(), }) return clients.Response{}, err } @@ -176,9 +255,15 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user } } if err := s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{ - AttemptID: attemptID, - Status: "succeeded", - ResponseSnapshot: response.Result, + AttemptID: attemptID, + Status: "succeeded", + RequestID: response.RequestID, + Usage: usageToMap(response.Usage), + Metrics: taskMetrics(task, user, body, candidate, response, simulated), + ResponseSnapshot: response.Result, + ResponseStartedAt: response.ResponseStartedAt, + ResponseFinishedAt: response.ResponseFinishedAt, + ResponseDurationMS: response.ResponseDurationMS, }); err != nil { return clients.Response{}, err } @@ -189,19 +274,32 @@ func (s *Service) clientFor(candidate store.RuntimeModelCandidate, simulated boo if simulated { return s.clients["simulation"] } - key := strings.ToLower(candidate.Provider) + key := strings.ToLower(strings.TrimSpace(candidate.SpecType)) + if key == "" { + key = strings.ToLower(strings.TrimSpace(candidate.Provider)) + } if client, ok := s.clients[key]; ok { return client } return s.clients["openai"] } -func (s *Service) failTask(ctx context.Context, taskID string, code string, message string, simulated bool) (store.GatewayTask, error) { - failed, err := s.store.FinishTaskFailure(ctx, taskID, code, message) +func (s *Service) failTask(ctx context.Context, taskID string, code string, message string, simulated bool, cause error) (store.GatewayTask, error) { + requestID, metrics, responseStartedAt, responseFinishedAt, responseDurationMS := failureMetrics(cause, simulated) + failed, err := s.store.FinishTaskFailure(ctx, store.FinishTaskFailureInput{ + TaskID: taskID, + Code: code, + Message: message, + RequestID: requestID, + Metrics: metrics, + ResponseStartedAt: responseStartedAt, + ResponseFinishedAt: responseFinishedAt, + ResponseDurationMS: responseDurationMS, + }) if err != nil { return store.GatewayTask{}, err } - if eventErr := s.emit(ctx, taskID, "task.failed", "failed", "failed", 1, message, map[string]any{"code": code}, simulated); eventErr != nil { + if eventErr := s.emit(ctx, taskID, "task.failed", "failed", "failed", 1, message, map[string]any{"code": code, "requestId": requestID, "metrics": metrics}, simulated); eventErr != nil { return store.GatewayTask{}, eventErr } return failed, nil @@ -221,14 +319,23 @@ func (s *Service) emit(ctx context.Context, taskID string, eventType string, sta func modelTypeFromKind(kind string) string { switch kind { case "chat.completions", "responses": - return "chat" + return "text_generate" case "images.generations", "images.edits": - return "image" + if kind == "images.edits" { + return "image_edit" + } + return "image_generate" + case "videos.generations": + return "video_generate" default: return "task" } } +func isTextGenerationKind(kind string) bool { + return kind == "chat.completions" || kind == "responses" +} + func isSimulation(task store.GatewayTask, candidate store.RuntimeModelCandidate) bool { if task.RunMode == "simulation" { return true diff --git a/apps/api/internal/store/postgres.go b/apps/api/internal/store/postgres.go index ae250c7..8f48d41 100644 --- a/apps/api/internal/store/postgres.go +++ b/apps/api/internal/store/postgres.go @@ -25,6 +25,7 @@ var ( ErrInvalidCredentials = errors.New("invalid account or password") ErrInvalidInvitation = errors.New("invalid or expired invitation code") ErrLocalUserRequired = errors.New("local gateway user is required") + ErrProtectedDefault = errors.New("protected default resource cannot be deleted") ErrUserAlreadyExists = errors.New("user already exists") ErrWeakPassword = errors.New("password must be at least 8 characters") ) @@ -54,6 +55,7 @@ type Platform struct { Provider string `json:"provider"` PlatformKey string `json:"platformKey"` Name string `json:"name"` + InternalName string `json:"internalName,omitempty"` BaseURL string `json:"baseUrl,omitempty"` AuthType string `json:"authType"` Status string `json:"status"` @@ -62,6 +64,9 @@ type Platform struct { DefaultDiscountFactor float64 `json:"defaultDiscountFactor"` PricingRuleSetID string `json:"pricingRuleSetId,omitempty"` Config map[string]any `json:"config,omitempty"` + CredentialsPreview map[string]any `json:"credentialsPreview,omitempty"` + RetryPolicy map[string]any `json:"retryPolicy,omitempty"` + RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` } @@ -70,10 +75,13 @@ type CreatePlatformInput struct { Provider string `json:"provider"` PlatformKey string `json:"platformKey"` Name string `json:"name"` + InternalName string `json:"internalName"` BaseURL string `json:"baseUrl"` AuthType string `json:"authType"` Credentials map[string]any `json:"credentials"` Config map[string]any `json:"config"` + RetryPolicy map[string]any `json:"retryPolicy"` + RateLimitPolicy map[string]any `json:"rateLimitPolicy"` DefaultPricingMode string `json:"defaultPricingMode"` DefaultDiscountFactor float64 `json:"defaultDiscountFactor"` PricingRuleSetID string `json:"pricingRuleSetId"` @@ -106,6 +114,11 @@ type APIKey struct { UpdatedAt time.Time `json:"updatedAt"` } +type PlayableAPIKey struct { + APIKey + Secret string `json:"secret"` +} + type CreatedAPIKey struct { APIKey APIKey `json:"apiKey"` Secret string `json:"secret"` @@ -128,11 +141,32 @@ type PlatformModel struct { PricingRuleSetID string `json:"pricingRuleSetId,omitempty"` BillingConfigOverride map[string]any `json:"billingConfigOverride,omitempty"` BillingConfig map[string]any `json:"billingConfig,omitempty"` + PermissionConfig map[string]any `json:"permissionConfig,omitempty"` + RetryPolicy map[string]any `json:"retryPolicy,omitempty"` + RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"` + RuntimePolicySetID string `json:"runtimePolicySetId,omitempty"` + RuntimePolicyOverride map[string]any `json:"runtimePolicyOverride,omitempty"` Enabled bool `json:"enabled"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` } +type AccessRule struct { + ID string `json:"id"` + SubjectType string `json:"subjectType"` + SubjectID string `json:"subjectId"` + ResourceType string `json:"resourceType"` + ResourceID string `json:"resourceId"` + Effect string `json:"effect"` + Priority int `json:"priority"` + MinPermissionLevel int `json:"minPermissionLevel"` + Conditions map[string]any `json:"conditions,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` + Status string `json:"status"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + type CatalogProvider struct { ID string `json:"id"` ProviderKey string `json:"providerKey"` @@ -140,6 +174,8 @@ type CatalogProvider struct { DisplayName string `json:"displayName"` ProviderType string `json:"providerType"` IconPath string `json:"iconPath,omitempty"` + DefaultBaseURL string `json:"defaultBaseUrl,omitempty"` + DefaultAuthType string `json:"defaultAuthType,omitempty"` Source string `json:"source,omitempty"` CapabilitySchema map[string]any `json:"capabilitySchema,omitempty"` DefaultRateLimitPolicy map[string]any `json:"defaultRateLimitPolicy,omitempty"` @@ -154,18 +190,40 @@ type BaseModel struct { ProviderKey string `json:"providerKey"` CanonicalModelKey string `json:"canonicalModelKey"` ProviderModelName string `json:"providerModelName"` - ModelType string `json:"modelType"` - DisplayName string `json:"displayName"` + ModelType StringList `json:"modelType"` + ModelAlias string `json:"modelAlias"` + DisplayName string `json:"-"` Capabilities map[string]any `json:"capabilities,omitempty"` BaseBillingConfig map[string]any `json:"baseBillingConfig,omitempty"` DefaultRateLimitPolicy map[string]any `json:"defaultRateLimitPolicy,omitempty"` + PricingRuleSetID string `json:"pricingRuleSetId,omitempty"` + RuntimePolicySetID string `json:"runtimePolicySetId,omitempty"` + RuntimePolicyOverride map[string]any `json:"runtimePolicyOverride,omitempty"` Metadata map[string]any `json:"metadata,omitempty"` + CatalogType string `json:"catalogType"` + DefaultSnapshot map[string]any `json:"defaultSnapshot,omitempty"` + CustomizedAt string `json:"customizedAt,omitempty"` PricingVersion int `json:"pricingVersion"` Status string `json:"status"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` } +type RuntimePolicySet struct { + ID string `json:"id"` + PolicyKey string `json:"policyKey"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"` + RetryPolicy map[string]any `json:"retryPolicy,omitempty"` + AutoDisablePolicy map[string]any `json:"autoDisablePolicy,omitempty"` + DegradePolicy map[string]any `json:"degradePolicy,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` + Status string `json:"status"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` +} + type PricingRule struct { ID string `json:"id"` RuleSetID string `json:"ruleSetId,omitempty"` @@ -298,27 +356,54 @@ type CreateTaskInput struct { } type GatewayTask struct { - ID string `json:"id"` - Kind string `json:"kind"` - RunMode string `json:"runMode"` - UserID string `json:"userId"` - GatewayUserID string `json:"gatewayUserId,omitempty"` - UserSource string `json:"userSource,omitempty"` - GatewayTenantID string `json:"gatewayTenantId,omitempty"` - TenantID string `json:"tenantId,omitempty"` - TenantKey string `json:"tenantKey,omitempty"` - UserGroupID string `json:"userGroupId,omitempty"` - UserGroupKey string `json:"userGroupKey,omitempty"` - Model string `json:"model"` - Request map[string]any `json:"request,omitempty"` - Status string `json:"status"` - Result map[string]any `json:"result,omitempty"` - Billings []any `json:"billings,omitempty"` - Error string `json:"error,omitempty"` - CreatedAt time.Time `json:"createdAt"` - UpdatedAt time.Time `json:"updatedAt"` + ID string `json:"id"` + Kind string `json:"kind"` + RunMode string `json:"runMode"` + UserID string `json:"userId"` + GatewayUserID string `json:"gatewayUserId,omitempty"` + UserSource string `json:"userSource,omitempty"` + GatewayTenantID string `json:"gatewayTenantId,omitempty"` + TenantID string `json:"tenantId,omitempty"` + TenantKey string `json:"tenantKey,omitempty"` + APIKeyID string `json:"apiKeyId,omitempty"` + APIKeyName string `json:"apiKeyName,omitempty"` + APIKeyPrefix string `json:"apiKeyPrefix,omitempty"` + UserGroupID string `json:"userGroupId,omitempty"` + UserGroupKey string `json:"userGroupKey,omitempty"` + Model string `json:"model"` + ModelType string `json:"modelType,omitempty"` + RequestedModel string `json:"requestedModel,omitempty"` + ResolvedModel string `json:"resolvedModel,omitempty"` + RequestID string `json:"requestId,omitempty"` + Request map[string]any `json:"request,omitempty"` + Status string `json:"status"` + Result map[string]any `json:"result,omitempty"` + Billings []any `json:"billings,omitempty"` + Usage map[string]any `json:"usage"` + Metrics map[string]any `json:"metrics"` + BillingSummary map[string]any `json:"billingSummary"` + FinalChargeAmount float64 `json:"finalChargeAmount"` + ResponseStartedAt string `json:"responseStartedAt,omitempty"` + ResponseFinishedAt string `json:"responseFinishedAt,omitempty"` + ResponseDurationMS int64 `json:"responseDurationMs"` + FinishedAt string `json:"finishedAt,omitempty"` + Error string `json:"error,omitempty"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` } +const gatewayTaskColumns = ` +id::text, kind, run_mode, user_id, COALESCE(gateway_user_id::text, ''), user_source, +COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), +COALESCE(api_key_id, ''), COALESCE(api_key_name, ''), COALESCE(api_key_prefix, ''), +COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model, +COALESCE(model_type, ''), COALESCE(requested_model, ''), COALESCE(resolved_model, ''), COALESCE(request_id, ''), +request, status, COALESCE(result, '{}'::jsonb), COALESCE(billings, '[]'::jsonb), +COALESCE(usage, '{}'::jsonb), COALESCE(metrics, '{}'::jsonb), COALESCE(billing_summary, '{}'::jsonb), +COALESCE(final_charge_amount, 0)::float8, COALESCE(response_started_at::text, ''), +COALESCE(response_finished_at::text, ''), COALESCE(response_duration_ms, 0), COALESCE(error, ''), +created_at, updated_at, COALESCE(finished_at::text, '')` + type TaskEvent struct { ID string `json:"id"` TaskID string `json:"taskId"` @@ -335,9 +420,9 @@ type TaskEvent struct { func (s *Store) ListPlatforms(ctx context.Context) ([]Platform, error) { rows, err := s.pool.Query(ctx, ` -SELECT id::text, provider, platform_key, name, COALESCE(base_url, ''), auth_type, status, priority, +SELECT id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority, default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''), - config, created_at, updated_at + config, credentials, retry_policy, rate_limit_policy, created_at, updated_at FROM integration_platforms ORDER BY priority ASC, created_at DESC`) if err != nil { @@ -349,11 +434,15 @@ ORDER BY priority ASC, created_at DESC`) for rows.Next() { var platform Platform var configBytes []byte + var credentialsBytes []byte + var retryPolicyBytes []byte + var rateLimitPolicyBytes []byte if err := rows.Scan( &platform.ID, &platform.Provider, &platform.PlatformKey, &platform.Name, + &platform.InternalName, &platform.BaseURL, &platform.AuthType, &platform.Status, @@ -362,20 +451,28 @@ ORDER BY priority ASC, created_at DESC`) &platform.DefaultDiscountFactor, &platform.PricingRuleSetID, &configBytes, + &credentialsBytes, + &retryPolicyBytes, + &rateLimitPolicyBytes, &platform.CreatedAt, &platform.UpdatedAt, ); err != nil { return nil, err } platform.Config = decodeObject(configBytes) + platform.CredentialsPreview = maskCredentialsPreview(credentialsBytes) + platform.RetryPolicy = decodeObject(retryPolicyBytes) + platform.RateLimitPolicy = decodeObject(rateLimitPolicyBytes) platforms = append(platforms, platform) } return platforms, rows.Err() } func (s *Store) CreatePlatform(ctx context.Context, input CreatePlatformInput) (Platform, error) { - credentials, _ := json.Marshal(input.Credentials) - config, _ := json.Marshal(input.Config) + credentials, _ := json.Marshal(emptyObjectIfNil(input.Credentials)) + config, _ := json.Marshal(emptyObjectIfNil(input.Config)) + retryPolicy, _ := json.Marshal(emptyObjectIfNil(input.RetryPolicy)) + rateLimitPolicy, _ := json.Marshal(emptyObjectIfNil(input.RateLimitPolicy)) if input.DefaultPricingMode == "" { input.DefaultPricingMode = "inherit_discount" } @@ -387,17 +484,31 @@ func (s *Store) CreatePlatform(ctx context.Context, input CreatePlatformInput) ( } var platform Platform var configBytes []byte + var credentialsResultBytes []byte + var retryPolicyBytes []byte + var rateLimitPolicyBytes []byte err := s.pool.QueryRow(ctx, ` -INSERT INTO integration_platforms (provider, platform_key, name, base_url, auth_type, credentials, config, default_pricing_mode, default_discount_factor, pricing_rule_set_id, priority) -VALUES ($1, COALESCE(NULLIF($2, ''), 'platform_' || replace(gen_random_uuid()::text, '-', '')), $3, $4, $5, $6, $7, $8, $9, NULLIF($10, '')::uuid, $11) -RETURNING id::text, provider, platform_key, name, COALESCE(base_url, ''), auth_type, status, priority, - default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''), config, created_at, updated_at`, - input.Provider, input.PlatformKey, input.Name, input.BaseURL, input.AuthType, credentials, config, input.DefaultPricingMode, input.DefaultDiscountFactor, input.PricingRuleSetID, input.Priority, +INSERT INTO integration_platforms ( + provider, platform_key, name, internal_name, base_url, auth_type, credentials, config, + default_pricing_mode, default_discount_factor, pricing_rule_set_id, + priority, retry_policy, rate_limit_policy +) +VALUES ( + $1, COALESCE(NULLIF($2, ''), 'platform_' || replace(gen_random_uuid()::text, '-', '')), $3, NULLIF($4, ''), $5, $6, $7, $8, + $9, $10, NULLIF($11, '')::uuid, $12, $13, $14 +) +RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority, + default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''), + config, credentials, retry_policy, rate_limit_policy, created_at, updated_at`, + input.Provider, input.PlatformKey, input.Name, strings.TrimSpace(input.InternalName), input.BaseURL, input.AuthType, credentials, config, + input.DefaultPricingMode, input.DefaultDiscountFactor, input.PricingRuleSetID, input.Priority, + string(retryPolicy), string(rateLimitPolicy), ).Scan( &platform.ID, &platform.Provider, &platform.PlatformKey, &platform.Name, + &platform.InternalName, &platform.BaseURL, &platform.AuthType, &platform.Status, @@ -406,6 +517,9 @@ RETURNING id::text, provider, platform_key, name, COALESCE(base_url, ''), auth_t &platform.DefaultDiscountFactor, &platform.PricingRuleSetID, &configBytes, + &credentialsResultBytes, + &retryPolicyBytes, + &rateLimitPolicyBytes, &platform.CreatedAt, &platform.UpdatedAt, ) @@ -413,19 +527,143 @@ RETURNING id::text, provider, platform_key, name, COALESCE(base_url, ''), auth_t return Platform{}, err } platform.Config = decodeObject(configBytes) + platform.CredentialsPreview = maskCredentialsPreview(credentialsResultBytes) + platform.RetryPolicy = decodeObject(retryPolicyBytes) + platform.RateLimitPolicy = decodeObject(rateLimitPolicyBytes) return platform, nil } +func (s *Store) UpdatePlatform(ctx context.Context, id string, input CreatePlatformInput) (Platform, error) { + var credentials any + if input.Credentials != nil { + credentialsBytes, _ := json.Marshal(emptyObjectIfNil(input.Credentials)) + credentials = string(credentialsBytes) + } + config, _ := json.Marshal(emptyObjectIfNil(input.Config)) + retryPolicy, _ := json.Marshal(emptyObjectIfNil(input.RetryPolicy)) + rateLimitPolicy, _ := json.Marshal(emptyObjectIfNil(input.RateLimitPolicy)) + if input.DefaultPricingMode == "" { + input.DefaultPricingMode = "inherit_discount" + } + if input.DefaultDiscountFactor == 0 { + input.DefaultDiscountFactor = 1 + } + if input.Priority == 0 { + input.Priority = 100 + } + var platform Platform + var configBytes []byte + var credentialsResultBytes []byte + var retryPolicyBytes []byte + var rateLimitPolicyBytes []byte + err := s.pool.QueryRow(ctx, ` +UPDATE integration_platforms +SET provider = $2, + platform_key = COALESCE(NULLIF($3, ''), platform_key), + name = $4, + internal_name = NULLIF($5, ''), + base_url = $6, + auth_type = $7, + credentials = CASE + WHEN $8::jsonb IS NULL THEN credentials + WHEN $8::jsonb = '{}'::jsonb THEN '{}'::jsonb + ELSE credentials || $8::jsonb + END, + config = $9, + default_pricing_mode = $10, + default_discount_factor = $11, + pricing_rule_set_id = NULLIF($12, '')::uuid, + priority = $13, + retry_policy = $14, + rate_limit_policy = $15, + updated_at = now() +WHERE id = $1::uuid +RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority, + default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''), + config, credentials, retry_policy, rate_limit_policy, created_at, updated_at`, + id, + input.Provider, + input.PlatformKey, + input.Name, + strings.TrimSpace(input.InternalName), + input.BaseURL, + input.AuthType, + credentials, + string(config), + input.DefaultPricingMode, + input.DefaultDiscountFactor, + input.PricingRuleSetID, + input.Priority, + string(retryPolicy), + string(rateLimitPolicy), + ).Scan( + &platform.ID, + &platform.Provider, + &platform.PlatformKey, + &platform.Name, + &platform.InternalName, + &platform.BaseURL, + &platform.AuthType, + &platform.Status, + &platform.Priority, + &platform.DefaultPricingMode, + &platform.DefaultDiscountFactor, + &platform.PricingRuleSetID, + &configBytes, + &credentialsResultBytes, + &retryPolicyBytes, + &rateLimitPolicyBytes, + &platform.CreatedAt, + &platform.UpdatedAt, + ) + if err != nil { + return Platform{}, err + } + platform.Config = decodeObject(configBytes) + platform.CredentialsPreview = maskCredentialsPreview(credentialsResultBytes) + platform.RetryPolicy = decodeObject(retryPolicyBytes) + platform.RateLimitPolicy = decodeObject(rateLimitPolicyBytes) + return platform, nil +} + +func (s *Store) DeletePlatform(ctx context.Context, id string) error { + result, err := s.pool.Exec(ctx, `DELETE FROM integration_platforms WHERE id = $1::uuid`, id) + if err != nil { + return err + } + if result.RowsAffected() == 0 { + return pgx.ErrNoRows + } + return nil +} + func (s *Store) ListModels(ctx context.Context) ([]PlatformModel, error) { + return s.listModels(ctx, "") +} + +func (s *Store) ListPlatformModels(ctx context.Context, platformID string) ([]PlatformModel, error) { + return s.listModels(ctx, strings.TrimSpace(platformID)) +} + +func (s *Store) listModels(ctx context.Context, platformID string) ([]PlatformModel, error) { + args := []any{} + where := "" + if platformID != "" { + where = "WHERE m.platform_id = $1::uuid" + args = append(args, platformID) + } + rows, err := s.pool.Query(ctx, ` SELECT m.id::text, m.platform_id::text, COALESCE(m.base_model_id::text, ''), p.provider, p.name, m.model_name, COALESCE(m.model_alias, ''), m.model_type, m.display_name, m.capability_override, m.capabilities, m.pricing_mode, COALESCE(m.discount_factor, 0)::float8, COALESCE(m.pricing_rule_set_id::text, ''), m.billing_config_override, m.billing_config, + m.permission_config, m.retry_policy, m.rate_limit_policy, COALESCE(m.runtime_policy_set_id::text, ''), m.runtime_policy_override, m.enabled, m.created_at, m.updated_at FROM platform_models m JOIN integration_platforms p ON p.id = m.platform_id -ORDER BY m.model_type ASC, m.model_name ASC`) +`+where+` +ORDER BY m.model_type ASC, m.model_name ASC`, args...) if err != nil { return nil, err } @@ -438,6 +676,10 @@ ORDER BY m.model_type ASC, m.model_name ASC`) var capabilities []byte var billingConfigOverride []byte var billingConfig []byte + var permissionConfig []byte + var retryPolicy []byte + var rateLimitPolicy []byte + var runtimePolicyOverride []byte if err := rows.Scan( &model.ID, &model.PlatformID, @@ -455,6 +697,11 @@ ORDER BY m.model_type ASC, m.model_name ASC`) &model.PricingRuleSetID, &billingConfigOverride, &billingConfig, + &permissionConfig, + &retryPolicy, + &rateLimitPolicy, + &model.RuntimePolicySetID, + &runtimePolicyOverride, &model.Enabled, &model.CreatedAt, &model.UpdatedAt, @@ -465,6 +712,10 @@ ORDER BY m.model_type ASC, m.model_name ASC`) model.Capabilities = decodeObject(capabilities) model.BillingConfigOverride = decodeObject(billingConfigOverride) model.BillingConfig = decodeObject(billingConfig) + model.PermissionConfig = decodeObject(permissionConfig) + model.RetryPolicy = decodeObject(retryPolicy) + model.RateLimitPolicy = decodeObject(rateLimitPolicy) + model.RuntimePolicyOverride = decodeObject(runtimePolicyOverride) models = append(models, model) } return models, rows.Err() @@ -731,6 +982,60 @@ ORDER BY created_at DESC`, gatewayUserID) return items, rows.Err() } +func (s *Store) ListPlayableAPIKeys(ctx context.Context, user *auth.User) ([]PlayableAPIKey, error) { + items, err := s.listRecoverableAPIKeys(ctx, user) + if err != nil { + return nil, err + } + if len(items) > 0 { + return items, nil + } + created, err := s.CreateAPIKey(ctx, CreateAPIKeyInput{ + Name: "Playground API Key", + Scopes: []string{"chat", "image", "video"}, + }, user) + if err != nil { + return nil, err + } + return []PlayableAPIKey{{APIKey: created.APIKey, Secret: created.Secret}}, nil +} + +func (s *Store) listRecoverableAPIKeys(ctx context.Context, user *auth.User) ([]PlayableAPIKey, error) { + gatewayUserID := localGatewayUserID(user) + if gatewayUserID == "" { + return nil, ErrLocalUserRequired + } + rows, err := s.pool.Query(ctx, ` +SELECT id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text, + COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''), + key_prefix, name, scopes, COALESCE(user_group_id::text, ''), + rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''), + COALESCE(last_used_at::text, ''), created_at, updated_at, COALESCE(key_secret, '') +FROM gateway_api_keys +WHERE gateway_user_id = $1::uuid + AND status = 'active' + AND deleted_at IS NULL + AND COALESCE(key_secret, '') <> '' + AND (expires_at IS NULL OR expires_at > now()) +ORDER BY created_at DESC`, gatewayUserID) + if err != nil { + return nil, err + } + defer rows.Close() + + items := make([]PlayableAPIKey, 0) + for rows.Next() { + var item PlayableAPIKey + apiKey, err := scanAPIKeyWithSecret(rows, &item.Secret) + if err != nil { + return nil, err + } + item.APIKey = apiKey + items = append(items, item) + } + return items, rows.Err() +} + func (s *Store) CreateAPIKey(ctx context.Context, input CreateAPIKeyInput, user *auth.User) (CreatedAPIKey, error) { gatewayUserID := localGatewayUserID(user) if gatewayUserID == "" { @@ -764,17 +1069,17 @@ func (s *Store) CreateAPIKey(ctx context.Context, input CreateAPIKeyInput, user err = s.pool.QueryRow(ctx, ` INSERT INTO gateway_api_keys ( gateway_tenant_id, gateway_user_id, tenant_id, tenant_key, user_id, - key_prefix, key_hash, name, scopes, expires_at + key_prefix, key_secret, key_hash, name, scopes, expires_at ) VALUES (NULLIF($1, '')::uuid, $2::uuid, NULLIF($3, ''), NULLIF($4, ''), NULLIF($5, ''), - $6, $7, $8, $9::jsonb, NULLIF($10, '')::timestamptz) + $6, $7, $8, $9, $10::jsonb, NULLIF($11, '')::timestamptz) RETURNING id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text, COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''), key_prefix, name, scopes, COALESCE(user_group_id::text, ''), rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''), COALESCE(last_used_at::text, ''), created_at, updated_at`, user.GatewayTenantID, gatewayUserID, user.TenantID, user.TenantKey, user.ID, - apiKeyPrefix(secret), string(keyHash), name, string(scopesJSON), strings.TrimSpace(input.ExpiresAt), + apiKeyPrefix(secret), secret, string(keyHash), name, string(scopesJSON), strings.TrimSpace(input.ExpiresAt), ).Scan( &item.ID, &item.GatewayTenantID, @@ -856,7 +1161,7 @@ func (s *Store) VerifyLocalAPIKey(ctx context.Context, secret string) (*auth.Use return nil, auth.ErrUnauthorized } rows, err := s.pool.Query(ctx, ` -SELECT k.id::text, k.key_hash, COALESCE(k.user_group_id::text, ''), +SELECT k.id::text, k.key_hash, k.key_prefix, k.name, COALESCE(k.user_group_id::text, ''), u.id::text, u.username, u.roles, COALESCE(u.gateway_tenant_id::text, ''), COALESCE(u.tenant_id, ''), COALESCE(u.tenant_key, '') FROM gateway_api_keys k @@ -875,6 +1180,8 @@ WHERE k.key_prefix = $1 for rows.Next() { var apiKeyID string var hash string + var keyPrefix string + var keyName string var userGroupID string var gatewayUserID string var username string @@ -882,7 +1189,7 @@ WHERE k.key_prefix = $1 var gatewayTenantID string var tenantID string var tenantKey string - if err := rows.Scan(&apiKeyID, &hash, &userGroupID, &gatewayUserID, &username, &rolesBytes, &gatewayTenantID, &tenantID, &tenantKey); err != nil { + if err := rows.Scan(&apiKeyID, &hash, &keyPrefix, &keyName, &userGroupID, &gatewayUserID, &username, &rolesBytes, &gatewayTenantID, &tenantID, &tenantKey); err != nil { return nil, err } if bcrypt.CompareHashAndPassword([]byte(hash), []byte(secret)) != nil { @@ -902,7 +1209,8 @@ WHERE k.key_prefix = $1 GatewayUserID: gatewayUserID, UserGroupID: userGroupID, APIKeyID: apiKeyID, - APIKeyName: prefix, + APIKeyName: keyName, + APIKeyPrefix: keyPrefix, }, nil } if err := rows.Err(); err != nil { @@ -953,7 +1261,7 @@ SELECT NOT EXISTS ( return GatewayUser{}, err } if isBootstrapUser { - role = "admin" + role = "manager" } if err := tx.QueryRow(ctx, ` INSERT INTO gateway_tenants (tenant_key, source, external_tenant_id, name) @@ -1197,21 +1505,16 @@ func (s *Store) CreateTask(ctx context.Context, input CreateTaskInput, user *aut } defer tx.Rollback(ctx) - var task GatewayTask - var requestBytes []byte - var resultBytes []byte - var billingsBytes []byte - err = tx.QueryRow(ctx, ` + task, err := scanGatewayTask(tx.QueryRow(ctx, ` INSERT INTO gateway_tasks ( kind, run_mode, user_id, gateway_user_id, user_source, gateway_tenant_id, tenant_id, tenant_key, - api_key_id, user_group_id, user_group_key, model, request, status, result, billings, finished_at + api_key_id, api_key_name, api_key_prefix, user_group_id, user_group_key, + model, requested_model, request, status, result, billings, finished_at ) - VALUES ($1, $2, $3, NULLIF($4, '')::uuid, COALESCE(NULLIF($5, ''), 'gateway'), NULLIF($6, '')::uuid, NULLIF($7, ''), NULLIF($8, ''), NULLIF($9, ''), NULLIF($10, '')::uuid, NULLIF($11, ''), $12, $13, $14, $15::jsonb, $16::jsonb, CASE WHEN $17 THEN now() ELSE NULL END) - RETURNING id::text, kind, run_mode, user_id, COALESCE(gateway_user_id::text, ''), user_source, - COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), - COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model, request, status, result, billings, COALESCE(error, ''), created_at, updated_at`, - input.Kind, runMode, user.ID, user.GatewayUserID, user.Source, user.GatewayTenantID, user.TenantID, user.TenantKey, user.APIKeyID, user.UserGroupID, user.UserGroupKey, input.Model, requestBody, status, resultBody, billingsBody, false, - ).Scan(&task.ID, &task.Kind, &task.RunMode, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt) + VALUES ($1, $2, $3, NULLIF($4, '')::uuid, COALESCE(NULLIF($5, ''), 'gateway'), NULLIF($6, '')::uuid, NULLIF($7, ''), NULLIF($8, ''), NULLIF($9, ''), NULLIF($10, ''), NULLIF($11, ''), NULLIF($12, '')::uuid, NULLIF($13, ''), $14, $14, $15, $16, $17::jsonb, $18::jsonb, CASE WHEN $19 THEN now() ELSE NULL END) + RETURNING `+gatewayTaskColumns, + input.Kind, runMode, user.ID, user.GatewayUserID, user.Source, user.GatewayTenantID, user.TenantID, user.TenantKey, user.APIKeyID, user.APIKeyName, user.APIKeyPrefix, user.UserGroupID, user.UserGroupKey, input.Model, requestBody, status, resultBody, billingsBody, false, + )) if err != nil { return GatewayTask{}, err } @@ -1229,30 +1532,77 @@ VALUES ($1::uuid, $2, $3, NULLIF($4, ''), NULLIF($5, ''), $6, NULLIF($7, ''), $8 if err := tx.Commit(ctx); err != nil { return GatewayTask{}, err } - task.Request = decodeObject(requestBytes) - task.Result = decodeObject(resultBytes) - task.Billings = decodeArray(billingsBytes) return task, nil } func (s *Store) GetTask(ctx context.Context, taskID string) (GatewayTask, error) { + task, err := scanGatewayTask(s.pool.QueryRow(ctx, ` +SELECT `+gatewayTaskColumns+` + FROM gateway_tasks + WHERE id=$1`, taskID, + )) + if err != nil { + return GatewayTask{}, err + } + return task, nil +} + +type taskScanner interface { + Scan(dest ...any) error +} + +func scanGatewayTask(scanner taskScanner) (GatewayTask, error) { var task GatewayTask var requestBytes []byte var resultBytes []byte var billingsBytes []byte - err := s.pool.QueryRow(ctx, ` -SELECT id::text, kind, run_mode, user_id, COALESCE(gateway_user_id::text, ''), user_source, - COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), - COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model, request, status, result, billings, COALESCE(error, ''), created_at, updated_at - FROM gateway_tasks - WHERE id=$1`, taskID, - ).Scan(&task.ID, &task.Kind, &task.RunMode, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt) - if err != nil { + var usageBytes []byte + var metricsBytes []byte + var billingSummaryBytes []byte + if err := scanner.Scan( + &task.ID, + &task.Kind, + &task.RunMode, + &task.UserID, + &task.GatewayUserID, + &task.UserSource, + &task.GatewayTenantID, + &task.TenantID, + &task.TenantKey, + &task.APIKeyID, + &task.APIKeyName, + &task.APIKeyPrefix, + &task.UserGroupID, + &task.UserGroupKey, + &task.Model, + &task.ModelType, + &task.RequestedModel, + &task.ResolvedModel, + &task.RequestID, + &requestBytes, + &task.Status, + &resultBytes, + &billingsBytes, + &usageBytes, + &metricsBytes, + &billingSummaryBytes, + &task.FinalChargeAmount, + &task.ResponseStartedAt, + &task.ResponseFinishedAt, + &task.ResponseDurationMS, + &task.Error, + &task.CreatedAt, + &task.UpdatedAt, + &task.FinishedAt, + ); err != nil { return GatewayTask{}, err } task.Request = decodeObject(requestBytes) task.Result = decodeObject(resultBytes) task.Billings = decodeArray(billingsBytes) + task.Usage = decodeObject(usageBytes) + task.Metrics = decodeObject(metricsBytes) + task.BillingSummary = decodeObject(billingSummaryBytes) return task, nil } @@ -1342,6 +1692,39 @@ func scanAPIKey(scanner apiKeyScanner) (APIKey, error) { return item, nil } +func scanAPIKeyWithSecret(scanner apiKeyScanner, secret *string) (APIKey, error) { + var item APIKey + var scopesBytes []byte + var rateLimitPolicy []byte + var quotaPolicy []byte + if err := scanner.Scan( + &item.ID, + &item.GatewayTenantID, + &item.GatewayUserID, + &item.TenantID, + &item.TenantKey, + &item.UserID, + &item.KeyPrefix, + &item.Name, + &scopesBytes, + &item.UserGroupID, + &rateLimitPolicy, + "aPolicy, + &item.Status, + &item.ExpiresAt, + &item.LastUsedAt, + &item.CreatedAt, + &item.UpdatedAt, + secret, + ); err != nil { + return APIKey{}, err + } + item.Scopes = decodeStringArray(scopesBytes) + item.RateLimitPolicy = decodeObject(rateLimitPolicy) + item.QuotaPolicy = decodeObject(quotaPolicy) + return item, nil +} + func localGatewayUserID(user *auth.User) string { if user == nil { return "" @@ -1477,6 +1860,50 @@ func decodeObject(bytes []byte) map[string]any { return out } +func maskCredentialsPreview(bytes []byte) map[string]any { + credentials := decodeObject(bytes) + if len(credentials) == 0 { + return nil + } + out := make(map[string]any, len(credentials)) + for key, value := range credentials { + out[key] = maskCredentialValue(value) + } + return out +} + +func maskCredentialValue(value any) any { + switch typed := value.(type) { + case string: + return maskSecret(typed) + case map[string]any: + out := make(map[string]any, len(typed)) + for key, nested := range typed { + out[key] = maskCredentialValue(nested) + } + return out + case []any: + out := make([]any, 0, len(typed)) + for _, nested := range typed { + out = append(out, maskCredentialValue(nested)) + } + return out + default: + return value + } +} + +func maskSecret(value string) string { + value = strings.TrimSpace(value) + if value == "" { + return "" + } + if len(value) <= 6 { + return strings.Repeat("*", len(value)) + } + return value[:3] + strings.Repeat("*", len(value)-6) + value[len(value)-3:] +} + func decodeArray(bytes []byte) []any { if len(bytes) == 0 { return nil diff --git a/apps/api/internal/store/runtime_types.go b/apps/api/internal/store/runtime_types.go index aadd022..8dfa0a6 100644 --- a/apps/api/internal/store/runtime_types.go +++ b/apps/api/internal/store/runtime_types.go @@ -1,6 +1,9 @@ package store -import "errors" +import ( + "errors" + "time" +) var ( ErrNoModelCandidate = errors.New("no enabled platform model matches request") @@ -25,6 +28,8 @@ type CreatePlatformModelInput struct { PermissionConfig map[string]any `json:"permissionConfig"` RetryPolicy map[string]any `json:"retryPolicy"` RateLimitPolicy map[string]any `json:"rateLimitPolicy"` + RuntimePolicySetID string `json:"runtimePolicySetId"` + RuntimePolicyOverride map[string]any `json:"runtimePolicyOverride"` Enabled bool `json:"enabled"` } @@ -33,6 +38,7 @@ type RuntimeModelCandidate struct { PlatformKey string PlatformName string Provider string + SpecType string BaseURL string AuthType string Credentials map[string]any @@ -55,12 +61,15 @@ type RuntimeModelCandidate struct { BaseBillingConfig map[string]any BillingConfig map[string]any BillingConfigOverride map[string]any + PermissionConfig map[string]any PricingMode string DiscountFactor float64 PlatformPricingRuleSetID string ModelPricingRuleSetID string ModelRetryPolicy map[string]any ModelRateLimitPolicy map[string]any + RuntimePolicySetID string + RuntimePolicyOverride map[string]any ClientID string QueueKey string } @@ -92,10 +101,42 @@ type CreateTaskAttemptInput struct { } type FinishTaskAttemptInput struct { - AttemptID string - Status string - Retryable bool - ResponseSnapshot map[string]any - ErrorCode string - ErrorMessage string + AttemptID string + Status string + Retryable bool + RequestID string + Usage map[string]any + Metrics map[string]any + ResponseSnapshot map[string]any + ResponseStartedAt time.Time + ResponseFinishedAt time.Time + ResponseDurationMS int64 + ErrorCode string + ErrorMessage string +} + +type FinishTaskSuccessInput struct { + TaskID string + Result map[string]any + Billings []any + RequestID string + ResolvedModel string + Usage map[string]any + Metrics map[string]any + BillingSummary map[string]any + FinalChargeAmount float64 + ResponseStartedAt time.Time + ResponseFinishedAt time.Time + ResponseDurationMS int64 +} + +type FinishTaskFailureInput struct { + TaskID string + Code string + Message string + RequestID string + Metrics map[string]any + ResponseStartedAt time.Time + ResponseFinishedAt time.Time + ResponseDurationMS int64 } diff --git a/apps/api/internal/store/tasks_runtime.go b/apps/api/internal/store/tasks_runtime.go index e79133b..2d622ac 100644 --- a/apps/api/internal/store/tasks_runtime.go +++ b/apps/api/internal/store/tasks_runtime.go @@ -3,6 +3,7 @@ package store import ( "context" "encoding/json" + "time" ) func (s *Store) MarkTaskRunning(ctx context.Context, taskID string, modelType string, normalizedRequest map[string]any) error { @@ -62,57 +63,118 @@ WHERE id = $1::uuid`, input.TaskID, input.AttemptNo); err != nil { func (s *Store) FinishTaskAttempt(ctx context.Context, input FinishTaskAttemptInput) error { responseJSON, _ := json.Marshal(emptyObjectIfNil(input.ResponseSnapshot)) + usageJSON, _ := json.Marshal(emptyObjectIfNil(input.Usage)) + metricsJSON, _ := json.Marshal(emptyObjectIfNil(input.Metrics)) _, err := s.pool.Exec(ctx, ` UPDATE gateway_task_attempts SET status = $2, retryable = $3, - response_snapshot = $4::jsonb, - error_code = NULLIF($5, ''), - error_message = NULLIF($6, ''), + request_id = NULLIF($4, ''), + usage = $5::jsonb, + metrics = $6::jsonb, + response_snapshot = $7::jsonb, + response_started_at = $8::timestamptz, + response_finished_at = $9::timestamptz, + response_duration_ms = $10, + error_code = NULLIF($11, ''), + error_message = NULLIF($12, ''), finished_at = now() WHERE id = $1::uuid`, input.AttemptID, input.Status, input.Retryable, + input.RequestID, + string(usageJSON), + string(metricsJSON), string(responseJSON), + nullableTime(input.ResponseStartedAt), + nullableTime(input.ResponseFinishedAt), + input.ResponseDurationMS, input.ErrorCode, input.ErrorMessage, ) return err } -func (s *Store) FinishTaskSuccess(ctx context.Context, taskID string, result map[string]any, billings []any) (GatewayTask, error) { - resultJSON, _ := json.Marshal(emptyObjectIfNil(result)) - billingsJSON, _ := json.Marshal(billings) +func (s *Store) FinishTaskSuccess(ctx context.Context, input FinishTaskSuccessInput) (GatewayTask, error) { + resultJSON, _ := json.Marshal(emptyObjectIfNil(input.Result)) + billingsJSON, _ := json.Marshal(input.Billings) + usageJSON, _ := json.Marshal(emptyObjectIfNil(input.Usage)) + metricsJSON, _ := json.Marshal(emptyObjectIfNil(input.Metrics)) + billingSummaryJSON, _ := json.Marshal(emptyObjectIfNil(input.BillingSummary)) if _, err := s.pool.Exec(ctx, ` UPDATE gateway_tasks SET status = 'succeeded', result = $2::jsonb, billings = $3::jsonb, + request_id = NULLIF($4, ''), + resolved_model = NULLIF($5, ''), + usage = $6::jsonb, + metrics = $7::jsonb, + billing_summary = $8::jsonb, + final_charge_amount = $9, + response_started_at = $10::timestamptz, + response_finished_at = $11::timestamptz, + response_duration_ms = $12, error = NULL, error_code = NULL, error_message = NULL, finished_at = now(), updated_at = now() -WHERE id = $1::uuid`, taskID, string(resultJSON), string(billingsJSON)); err != nil { +WHERE id = $1::uuid`, + input.TaskID, + string(resultJSON), + string(billingsJSON), + input.RequestID, + input.ResolvedModel, + string(usageJSON), + string(metricsJSON), + string(billingSummaryJSON), + input.FinalChargeAmount, + nullableTime(input.ResponseStartedAt), + nullableTime(input.ResponseFinishedAt), + input.ResponseDurationMS, + ); err != nil { return GatewayTask{}, err } - return s.GetTask(ctx, taskID) + return s.GetTask(ctx, input.TaskID) } -func (s *Store) FinishTaskFailure(ctx context.Context, taskID string, code string, message string) (GatewayTask, error) { +func (s *Store) FinishTaskFailure(ctx context.Context, input FinishTaskFailureInput) (GatewayTask, error) { + metricsJSON, _ := json.Marshal(emptyObjectIfNil(input.Metrics)) if _, err := s.pool.Exec(ctx, ` UPDATE gateway_tasks SET status = 'failed', error = NULLIF($2, ''), error_code = NULLIF($3, ''), error_message = NULLIF($2, ''), + request_id = NULLIF($4, ''), + metrics = $5::jsonb, + response_started_at = $6::timestamptz, + response_finished_at = $7::timestamptz, + response_duration_ms = $8, finished_at = now(), updated_at = now() -WHERE id = $1::uuid`, taskID, message, code); err != nil { +WHERE id = $1::uuid`, + input.TaskID, + input.Message, + input.Code, + input.RequestID, + string(metricsJSON), + nullableTime(input.ResponseStartedAt), + nullableTime(input.ResponseFinishedAt), + input.ResponseDurationMS, + ); err != nil { return GatewayTask{}, err } - return s.GetTask(ctx, taskID) + return s.GetTask(ctx, input.TaskID) +} + +func nullableTime(value time.Time) any { + if value.IsZero() { + return nil + } + return value } func (s *Store) AddTaskEvent(ctx context.Context, taskID string, eventType string, status string, phase string, progress float64, message string, payload map[string]any, simulated bool) (TaskEvent, error) { diff --git a/apps/api/migrations/0017_task_record_enrichment.sql b/apps/api/migrations/0017_task_record_enrichment.sql new file mode 100644 index 0000000..d119e9d --- /dev/null +++ b/apps/api/migrations/0017_task_record_enrichment.sql @@ -0,0 +1,72 @@ +ALTER TABLE IF EXISTS gateway_tasks + ADD COLUMN IF NOT EXISTS api_key_name text, + ADD COLUMN IF NOT EXISTS api_key_prefix text, + ADD COLUMN IF NOT EXISTS requested_model text, + ADD COLUMN IF NOT EXISTS resolved_model text, + ADD COLUMN IF NOT EXISTS request_id text, + ADD COLUMN IF NOT EXISTS usage jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS metrics jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS billing_summary jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS final_charge_amount numeric NOT NULL DEFAULT 0, + ADD COLUMN IF NOT EXISTS response_started_at timestamptz, + ADD COLUMN IF NOT EXISTS response_finished_at timestamptz, + ADD COLUMN IF NOT EXISTS response_duration_ms bigint; + +ALTER TABLE IF EXISTS gateway_task_attempts + ADD COLUMN IF NOT EXISTS request_id text, + ADD COLUMN IF NOT EXISTS usage jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS metrics jsonb NOT NULL DEFAULT '{}'::jsonb, + ADD COLUMN IF NOT EXISTS response_started_at timestamptz, + ADD COLUMN IF NOT EXISTS response_finished_at timestamptz, + ADD COLUMN IF NOT EXISTS response_duration_ms bigint; + +CREATE INDEX IF NOT EXISTS idx_gateway_tasks_request_id + ON gateway_tasks(request_id) + WHERE request_id IS NOT NULL; + +CREATE INDEX IF NOT EXISTS idx_gateway_tasks_api_key_created + ON gateway_tasks(api_key_id, created_at DESC) + WHERE api_key_id IS NOT NULL; + +UPDATE gateway_tasks +SET requested_model = model +WHERE requested_model IS NULL; + +UPDATE gateway_tasks AS t +SET api_key_name = COALESCE(t.api_key_name, k.name), + api_key_prefix = COALESCE(t.api_key_prefix, k.key_prefix) +FROM gateway_api_keys AS k +WHERE t.api_key_id = k.id::text; + +WITH billing_totals AS ( + SELECT + t.id, + COUNT(line.value) AS line_count, + COALESCE(SUM( + CASE + WHEN jsonb_typeof(line.value) = 'object' + AND line.value ? 'amount' + AND line.value->>'amount' ~ '^-?[0-9]+(\.[0-9]+)?$' + THEN (line.value->>'amount')::numeric + ELSE 0 + END + ), 0) AS total_amount + FROM gateway_tasks AS t + LEFT JOIN LATERAL jsonb_array_elements(COALESCE(t.billings, '[]'::jsonb)) AS line(value) ON true + GROUP BY t.id +) +UPDATE gateway_tasks AS t +SET final_charge_amount = billing_totals.total_amount, + billing_summary = jsonb_build_object( + 'lineCount', billing_totals.line_count, + 'totalAmount', billing_totals.total_amount, + 'currency', 'resource', + 'finalCharge', jsonb_build_object( + 'amount', billing_totals.total_amount, + 'currency', 'resource', + 'simulated', COALESCE(t.run_mode = 'simulation', false) + ) + ) +FROM billing_totals +WHERE t.id = billing_totals.id + AND COALESCE(t.billing_summary, '{}'::jsonb) = '{}'::jsonb; diff --git a/apps/web/src/pages/WorkspacePage.tsx b/apps/web/src/pages/WorkspacePage.tsx index b840c36..95ef81a 100644 --- a/apps/web/src/pages/WorkspacePage.tsx +++ b/apps/web/src/pages/WorkspacePage.tsx @@ -2,7 +2,6 @@ import type { FormEvent, ReactNode } from 'react'; import { CreditCard, KeyRound, ListChecks, UserRound } from 'lucide-react'; import type { ConsoleData } from '../app-state'; import { EntityTable } from '../components/EntityTable'; -import { PageHeader } from '../components/PageHeader'; import { Badge, Button, Card, CardContent, CardHeader, CardTitle, Input, Label, Tabs } from '../components/ui'; import type { ApiKeyForm, LoadState, WorkspaceSection } from '../types'; @@ -22,10 +21,10 @@ export function WorkspacePage(props: { onApiKeyFormChange: (value: ApiKeyForm) => void; onSectionChange: (value: WorkspaceSection) => void; onSubmitApiKey: (event: FormEvent) => void; + onUseApiKeyForPlayground: (apiKeyId?: string) => void; }) { return (
-
@@ -106,7 +105,9 @@ function ApiKeyPanel(props: { state: LoadState; onApiKeyFormChange: (value: ApiKeyForm) => void; onSubmitApiKey: (event: FormEvent) => void; + onUseApiKeyForPlayground: (apiKeyId?: string) => void; }) { + const latestUsableKey = props.apiKeySecret ? props.data.apiKeys[0] : undefined; return (
@@ -123,7 +124,14 @@ function ApiKeyPanel(props: { 创建 - {props.apiKeySecret && {props.apiKeySecret}} + {props.apiKeySecret && ( +
+ {props.apiKeySecret} + +
+ )}
@@ -145,6 +153,9 @@ function ApiKeyPanel(props: { function TaskPanel(props: { data: ConsoleData }) { const task = props.data.taskResult; + const usage = task?.usage ?? {}; + const tokenText = usage.totalTokens ? `${usage.totalTokens}` : '-'; + const chargeText = task?.finalChargeAmount ? `${task.finalChargeAmount}` : '-'; return ( @@ -156,7 +167,15 @@ function TaskPanel(props: { data: ConsoleData }) { {task.status} {task.kind} {task.model} -
{JSON.stringify(task.result ?? {}, null, 2)}
+
+ + + + + + +
+
{JSON.stringify({ result: task.result, usage: task.usage, billings: task.billings, billingSummary: task.billingSummary, metrics: task.metrics }, null, 2)}
) : (
diff --git a/apps/web/src/styles.css b/apps/web/src/styles.css index e7f81d3..3021a96 100644 --- a/apps/web/src/styles.css +++ b/apps/web/src/styles.css @@ -407,6 +407,16 @@ strong { gap: 10px; } +.infoGrid { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(160px, 1fr)); + gap: 10px; +} + +.infoGrid.compact .infoItem { + padding: 10px; +} + .spanTwo { grid-column: 1 / -1; } diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts index 51b2cbd..7575d8a 100644 --- a/packages/contracts/src/index.ts +++ b/packages/contracts/src/index.ts @@ -16,6 +16,7 @@ export interface AuthUser { apiKeyId?: string; apiKeySecret?: string; apiKeyName?: string; + apiKeyPrefix?: string; } export interface AuthResponse { @@ -56,6 +57,7 @@ export interface IntegrationPlatform { provider: string; platformKey: string; name: string; + internalName?: string; baseUrl?: string; authType: string; status: 'enabled' | 'disabled' | string; @@ -64,6 +66,9 @@ export interface IntegrationPlatform { defaultDiscountFactor: number; pricingRuleSetId?: string; config?: Record; + credentialsPreview?: Record; + retryPolicy?: RateLimitPolicy | Record; + rateLimitPolicy?: RateLimitPolicy; createdAt: string; updatedAt: string; } @@ -75,6 +80,7 @@ export type RateLimitMetric = | 'tpm_input' | 'tpm_output' | 'rpm' + | 'rps' | 'concurrent' | 'queue_size'; @@ -85,6 +91,8 @@ export interface CatalogProvider { displayName: string; providerType: string; iconPath?: string; + defaultBaseUrl?: string; + defaultAuthType?: string; source?: 'gateway' | 'server-main' | 'sync' | 'server-main.integration-platform' | string; capabilitySchema?: Record; defaultRateLimitPolicy?: RateLimitPolicy; @@ -100,6 +108,8 @@ export interface CatalogProviderUpsertRequest { displayName: string; providerType?: string; iconPath?: string; + defaultBaseUrl?: string; + defaultAuthType?: string; source?: 'gateway' | 'server-main' | 'sync' | 'server-main.integration-platform' | string; capabilitySchema?: Record; defaultRateLimitPolicy?: RateLimitPolicy; @@ -112,12 +122,19 @@ export interface BaseModelCatalogItem { providerKey: string; canonicalModelKey: string; providerModelName: string; - modelType: 'chat' | 'image' | 'video' | 'audio' | 'embedding' | 'music' | 'digital_human' | 'model_3d' | string; - displayName: string; + modelType: string[]; + modelAlias: string; + displayName?: string; capabilities?: Record; baseBillingConfig?: BillingConfig; defaultRateLimitPolicy?: RateLimitPolicy; + pricingRuleSetId?: string; + runtimePolicySetId?: string; + runtimePolicyOverride?: RuntimePolicyOverride; metadata?: Record; + catalogType?: 'system' | 'custom' | string; + defaultSnapshot?: Record; + customizedAt?: string; pricingVersion: number; status: 'active' | 'deprecated' | 'hidden' | string; createdAt: string; @@ -128,12 +145,18 @@ export interface BaseModelUpsertRequest { providerKey: string; canonicalModelKey?: string; providerModelName: string; - modelType: string; + modelType: string[]; + modelAlias?: string; displayName?: string; capabilities?: Record; baseBillingConfig?: BillingConfig; defaultRateLimitPolicy?: RateLimitPolicy; + pricingRuleSetId?: string; + runtimePolicySetId?: string; + runtimePolicyOverride?: RuntimePolicyOverride; metadata?: Record; + catalogType?: 'system' | 'custom' | string; + defaultSnapshot?: Record; pricingVersion?: number; status?: 'active' | 'deprecated' | 'hidden' | string; } @@ -213,6 +236,40 @@ export interface PricingRuleSetUpsertRequest { rules: PricingRuleInput[]; } +export interface RuntimePolicySet { + id: string; + policyKey: string; + name: string; + description?: string; + rateLimitPolicy?: RateLimitPolicy | Record; + retryPolicy?: Record; + autoDisablePolicy?: Record; + degradePolicy?: Record; + metadata?: Record; + status: 'active' | 'disabled' | string; + createdAt: string; + updatedAt: string; +} + +export interface RuntimePolicySetUpsertRequest { + policyKey: string; + name: string; + description?: string; + rateLimitPolicy?: RateLimitPolicy | Record; + retryPolicy?: Record; + autoDisablePolicy?: Record; + degradePolicy?: Record; + metadata?: Record; + status?: 'active' | 'disabled' | string; +} + +export interface RuntimePolicyOverride { + rateLimitPolicy?: RateLimitPolicy | Record; + retryPolicy?: Record; + autoDisablePolicy?: Record; + degradePolicy?: Record; +} + export interface GatewayUser { id: string; userKey: string; @@ -238,6 +295,26 @@ export interface GatewayUser { updatedAt: string; } +export interface GatewayUserUpsertRequest { + userKey?: string; + source?: 'gateway' | 'server-main' | 'sync' | string; + externalUserId?: string; + username: string; + displayName?: string; + email?: string; + phone?: string; + avatarUrl?: string; + password?: string; + gatewayTenantId?: string; + tenantId?: string; + tenantKey?: string; + defaultUserGroupId?: string; + roles?: string[]; + authProfile?: Record; + metadata?: Record; + status?: 'active' | 'disabled' | 'locked' | 'deleted' | string; +} + export interface GatewayTenant { id: string; tenantKey: string; @@ -258,6 +335,21 @@ export interface GatewayTenant { updatedAt: string; } +export interface GatewayTenantUpsertRequest { + tenantKey: string; + source?: 'gateway' | 'server-main' | 'sync' | string; + externalTenantId?: string; + name: string; + description?: string; + defaultUserGroupId?: string; + planKey?: string; + billingProfile?: Record; + rateLimitPolicy?: RateLimitPolicy; + authPolicy?: Record; + metadata?: Record; + status?: 'active' | 'disabled' | 'locked' | 'deleted' | string; +} + export interface UserGroup { id: string; groupKey: string; @@ -269,11 +361,26 @@ export interface UserGroup { billingDiscountPolicy?: Record; rateLimitPolicy?: RateLimitPolicy; quotaPolicy?: Record; + metadata?: Record; status: 'active' | 'disabled' | string; createdAt: string; updatedAt: string; } +export interface UserGroupUpsertRequest { + groupKey: string; + name: string; + description?: string; + source?: 'gateway' | 'server-main' | 'sync' | string; + priority?: number; + rechargeDiscountPolicy?: Record; + billingDiscountPolicy?: Record; + rateLimitPolicy?: RateLimitPolicy; + quotaPolicy?: Record; + metadata?: Record; + status?: 'active' | 'disabled' | string; +} + export interface UserGroupMembership { id: string; groupId: string; @@ -288,6 +395,57 @@ export interface UserGroupMembership { updatedAt: string; } +export type GatewayAccessSubjectType = 'user_group' | 'tenant' | 'user' | 'api_key'; +export type GatewayAccessResourceType = 'platform' | 'platform_model' | 'base_model'; +export type GatewayAccessEffect = 'allow' | 'deny'; + +export interface GatewayAccessRule { + id: string; + subjectType: GatewayAccessSubjectType | string; + subjectId: string; + resourceType: GatewayAccessResourceType | string; + resourceId: string; + effect: GatewayAccessEffect | string; + priority: number; + minPermissionLevel: number; + conditions?: Record; + metadata?: Record; + status: 'active' | 'disabled' | string; + createdAt: string; + updatedAt: string; +} + +export interface GatewayAccessRuleUpsertRequest { + subjectType: GatewayAccessSubjectType | string; + subjectId: string; + resourceType: GatewayAccessResourceType | string; + resourceId: string; + effect: GatewayAccessEffect | string; + priority?: number; + minPermissionLevel?: number; + conditions?: Record; + metadata?: Record; + status?: 'active' | 'disabled' | string; +} + +export interface GatewayAccessRuleResourceRequest { + resourceType: GatewayAccessResourceType | string; + resourceId: string; + priority?: number; + minPermissionLevel?: number; + conditions?: Record; + metadata?: Record; + status?: 'active' | 'disabled' | string; +} + +export interface GatewayAccessRuleBatchRequest { + subjectType: GatewayAccessSubjectType | string; + subjectId: string; + effect: GatewayAccessEffect | string; + upsertResources?: GatewayAccessRuleResourceRequest[]; + deleteResources?: GatewayAccessRuleResourceRequest[]; +} + export interface GatewayApiKey { id: string; gatewayTenantId?: string; @@ -313,6 +471,10 @@ export interface CreatedGatewayApiKey { secret: string; } +export interface PlayableGatewayApiKey extends GatewayApiKey { + secret: string; +} + export interface GatewayWalletAccount { id: string; gatewayTenantId?: string; @@ -406,6 +568,11 @@ export interface PlatformModel { pricingRuleSetId?: string; billingConfigOverride?: BillingConfig; billingConfig?: BillingConfig; + permissionConfig?: Record; + retryPolicy?: Record; + rateLimitPolicy?: RateLimitPolicy; + runtimePolicySetId?: string; + runtimePolicyOverride?: RuntimePolicyOverride; enabled: boolean; createdAt: string; updatedAt: string; @@ -433,14 +600,29 @@ export interface GatewayTask { gatewayTenantId?: string; tenantId?: string; tenantKey?: string; + apiKeyId?: string; + apiKeyName?: string; + apiKeyPrefix?: string; userGroupId?: string; userGroupKey?: string; userGroupPolicySnapshot?: Record; model: string; + modelType?: string; + requestedModel?: string; + resolvedModel?: string; + requestId?: string; request?: Record; status: 'queued' | 'running' | 'succeeded' | 'failed' | 'cancelled' | string; result?: Record; billings?: unknown[]; + usage?: Record; + metrics?: Record; + billingSummary?: Record; + finalChargeAmount?: number; + responseStartedAt?: string; + responseFinishedAt?: string; + responseDurationMs?: number; + finishedAt?: string; error?: string; createdAt: string; updatedAt: string;