feat: enrich task record details

This commit is contained in:
wangbo 2026-05-10 22:33:58 +08:00
parent 205a4b625e
commit 53f8edfb67
19 changed files with 1781 additions and 165 deletions

View File

@ -39,6 +39,7 @@ type User struct {
APIKeyID string `json:"apiKeyId,omitempty"`
APIKeySecret string `json:"apiKeySecret,omitempty"`
APIKeyName string `json:"apiKeyName,omitempty"`
APIKeyPrefix string `json:"apiKeyPrefix,omitempty"`
}
type contextKey string
@ -135,6 +136,7 @@ func (a *Authenticator) verifyJWT(tokenString string) (*User, error) {
APIKeyID: stringClaim(claims, "apiKeyId"),
APIKeySecret: stringClaim(claims, "apiKeySecret"),
APIKeyName: stringClaim(claims, "apiKeyName"),
APIKeyPrefix: stringClaim(claims, "apiKeyPrefix"),
}
if user.Source == "" {
user.Source = "gateway"
@ -162,6 +164,9 @@ func (a *Authenticator) SignJWT(user *User, ttl time.Duration) (string, error) {
"userGroupId": user.UserGroupID,
"userGroupKey": user.UserGroupKey,
"userGroupKeys": user.UserGroupKeys,
"apiKeyId": user.APIKeyID,
"apiKeyName": user.APIKeyName,
"apiKeyPrefix": user.APIKeyPrefix,
"iat": now.Unix(),
"exp": now.Add(ttl).Unix(),
}
@ -235,6 +240,31 @@ func hasPermission(roles []string, required Permission) bool {
return granted[required]
}
func PermissionLevel(roles []string) int {
level := 0
for _, role := range roles {
switch role {
case "admin", "manager":
if level < 4 {
level = 4
}
case "operator":
if level < 3 {
level = 3
}
case "creator":
if level < 2 {
level = 2
}
case "user":
if level < 1 {
level = 1
}
}
}
return level
}
func permissionsForRole(role string) []Permission {
switch role {
case "admin", "manager":

View File

@ -17,6 +17,7 @@ func TestOpenAIClientChatContract(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotPath = r.URL.Path
gotAuth = r.Header.Get("Authorization")
w.Header().Set("X-Request-Id", "req-chat-test")
var body map[string]any
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Fatalf("decode request: %v", err)
@ -53,6 +54,55 @@ func TestOpenAIClientChatContract(t *testing.T) {
if response.Usage.TotalTokens != 5 || response.Result["id"] != "chatcmpl-test" {
t.Fatalf("unexpected response: %+v", response)
}
if response.RequestID != "req-chat-test" || response.ResponseStartedAt.IsZero() || response.ResponseFinishedAt.IsZero() {
t.Fatalf("response metadata was not captured: %+v", response)
}
}
func TestOpenAIClientChatStreamContract(t *testing.T) {
var gotStream bool
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var body map[string]any
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Fatalf("decode request: %v", err)
}
gotStream, _ = body["stream"].(bool)
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte("data: {\"id\":\"chatcmpl-stream\",\"object\":\"chat.completion.chunk\",\"model\":\"deepseek-v4-flash\",\"choices\":[{\"delta\":{\"content\":\"hello\"}}]}\n\n"))
_, _ = w.Write([]byte("data: {\"id\":\"chatcmpl-stream\",\"object\":\"chat.completion.chunk\",\"model\":\"deepseek-v4-flash\",\"choices\":[{\"delta\":{\"content\":\" world\"},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":1,\"completion_tokens\":2,\"total_tokens\":3}}\n\n"))
_, _ = w.Write([]byte("data: [DONE]\n\n"))
}))
defer server.Close()
response, err := (OpenAIClient{HTTPClient: server.Client()}).Run(context.Background(), Request{
Kind: "chat.completions",
Model: "DeepSeek-V4-Flash",
Body: map[string]any{
"model": "DeepSeek-V4-Flash",
"messages": []any{map[string]any{"role": "user", "content": "ping"}},
"stream": true,
},
Candidate: store.RuntimeModelCandidate{
BaseURL: server.URL,
ModelName: "deepseek-v4-flash",
Credentials: map[string]any{"apiKey": "test-key"},
},
})
if err != nil {
t.Fatalf("run openai stream client: %v", err)
}
if !gotStream {
t.Fatalf("expected upstream stream request")
}
if response.Usage.TotalTokens != 3 {
t.Fatalf("unexpected usage: %+v", response.Usage)
}
choices, _ := response.Result["choices"].([]any)
choice, _ := choices[0].(map[string]any)
message, _ := choice["message"].(map[string]any)
if message["content"] != "hello world" {
t.Fatalf("unexpected stream response: %+v", response.Result)
}
}
func TestGeminiClientChatContract(t *testing.T) {
@ -111,6 +161,14 @@ func TestGeminiClientChatContract(t *testing.T) {
}
}
func TestGeminiURLAcceptsVersionedBaseURL(t *testing.T) {
got := geminiURL("https://generativelanguage.googleapis.com/v1beta", "gemini-2.5-flash", "test-key")
want := "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key=test-key"
if got != want {
t.Fatalf("unexpected gemini url: %s", got)
}
}
func extractText(result map[string]any) string {
choices, _ := result["choices"].([]any)
choice, _ := choices[0].(map[string]any)

View File

@ -8,6 +8,7 @@ import (
"net/http"
"net/url"
"strings"
"time"
)
type GeminiClient struct {
@ -30,11 +31,26 @@ func (c GeminiClient) Run(ctx context.Context, request Request) (Response, error
if err != nil {
return Response{}, &ClientError{Code: "network", Message: err.Error(), Retryable: true}
}
responseStartedAt := time.Now()
requestID := requestIDFromHTTPResponse(resp)
result, err := decodeHTTPResponse(resp)
responseFinishedAt := time.Now()
if err != nil {
return Response{}, err
return Response{}, annotateResponseError(err, requestID, responseStartedAt, responseFinishedAt)
}
return Response{Result: geminiResult(request, result), Usage: geminiUsage(result), Progress: providerProgress(request)}, nil
output := geminiResult(request, result)
if requestID == "" {
requestID = requestIDFromResult(output)
}
return Response{
Result: output,
RequestID: requestID,
Usage: geminiUsage(result),
Progress: providerProgress(request),
ResponseStartedAt: responseStartedAt,
ResponseFinishedAt: responseFinishedAt,
ResponseDurationMS: responseDurationMS(responseStartedAt, responseFinishedAt),
}, nil
}
func geminiURL(baseURL string, model string, apiKey string) string {
@ -42,6 +58,9 @@ func geminiURL(baseURL string, model string, apiKey string) string {
if base == "" {
base = "https://generativelanguage.googleapis.com"
}
if strings.HasSuffix(base, "/v1beta") {
base = strings.TrimSuffix(base, "/v1beta")
}
escapedModel := url.PathEscape(model)
return fmt.Sprintf("%s/v1beta/models/%s:generateContent?key=%s", base, escapedModel, url.QueryEscape(apiKey))
}

View File

@ -1,6 +1,8 @@
package clients
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
@ -48,6 +50,7 @@ func decodeHTTPResponse(resp *http.Response) (map[string]any, error) {
Code: statusCodeName(resp.StatusCode),
Message: errorMessage(raw, resp.Status),
StatusCode: resp.StatusCode,
RequestID: requestIDFromHTTPResponse(resp),
Retryable: HTTPRetryable(resp.StatusCode),
}
}
@ -61,6 +64,137 @@ func decodeHTTPResponse(resp *http.Response) (map[string]any, error) {
return out, nil
}
func decodeOpenAIStreamResponse(resp *http.Response, onDelta StreamDelta) (map[string]any, error) {
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
raw, _ := io.ReadAll(io.LimitReader(resp.Body, 16*1024*1024))
return nil, &ClientError{
Code: statusCodeName(resp.StatusCode),
Message: errorMessage(raw, resp.Status),
StatusCode: resp.StatusCode,
RequestID: requestIDFromHTTPResponse(resp),
Retryable: HTTPRetryable(resp.StatusCode),
}
}
if result, ok, err := decodeOpenAIStreamReader(resp.Body, onDelta); ok || err != nil {
return result, err
}
return map[string]any{}, nil
}
func decodeOpenAIStreamReader(reader io.Reader, onDelta StreamDelta) (map[string]any, bool, error) {
scanner := bufio.NewScanner(reader)
scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024)
rawLines := make([]string, 0)
parts := make([]string, 0)
var last map[string]any
var usage Usage
for scanner.Scan() {
rawLine := scanner.Text()
rawLines = append(rawLines, rawLine)
line := strings.TrimSpace(rawLine)
if !strings.HasPrefix(line, "data:") {
continue
}
payload := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
if payload == "" || payload == "[DONE]" {
continue
}
var event map[string]any
if err := json.Unmarshal([]byte(payload), &event); err != nil {
continue
}
last = event
if text := streamEventText(event); text != "" {
parts = append(parts, text)
if onDelta != nil {
if err := onDelta(text); err != nil {
return nil, true, err
}
}
}
if eventUsage := usageFromOpenAI(event); eventUsage.TotalTokens > 0 {
usage = eventUsage
}
}
if err := scanner.Err(); err != nil {
return nil, true, &ClientError{Code: "stream_read_error", Message: err.Error(), Retryable: true}
}
if last == nil {
raw := []byte(strings.Join(rawLines, "\n"))
if len(raw) == 0 {
return map[string]any{}, true, nil
}
var out map[string]any
if err := json.Unmarshal(raw, &out); err != nil {
return nil, false, nil
}
return out, true, nil
}
return buildOpenAIStreamResult(last, parts, usage), true, nil
}
func decodeOpenAIStream(raw []byte) (map[string]any, bool) {
if !bytes.Contains(raw, []byte("data:")) {
return nil, false
}
result, ok, err := decodeOpenAIStreamReader(bytes.NewReader(raw), nil)
return result, ok && err == nil
}
func buildOpenAIStreamResult(last map[string]any, parts []string, usage Usage) map[string]any {
if len(parts) == 0 {
return last
}
var out map[string]any
out = map[string]any{
"id": stringFromAny(firstPresent(last["id"], "chatcmpl-stream")),
"object": "chat.completion",
"model": stringFromAny(last["model"]),
"choices": []any{map[string]any{
"index": 0,
"message": map[string]any{
"role": "assistant",
"content": strings.Join(parts, ""),
},
"finish_reason": "stop",
}},
}
if usage.TotalTokens > 0 {
out["usage"] = map[string]any{
"prompt_tokens": usage.InputTokens,
"completion_tokens": usage.OutputTokens,
"total_tokens": usage.TotalTokens,
}
}
return out
}
func streamEventText(event map[string]any) string {
if choices, ok := event["choices"].([]any); ok {
for _, rawChoice := range choices {
choice, _ := rawChoice.(map[string]any)
if delta, ok := choice["delta"].(map[string]any); ok {
if content, ok := delta["content"].(string); ok {
return content
}
}
if message, ok := choice["message"].(map[string]any); ok {
if content, ok := message["content"].(string); ok {
return content
}
}
}
}
if delta, ok := event["delta"].(string); ok {
return delta
}
if text, ok := event["output_text"].(string); ok {
return text
}
return ""
}
func usageFromOpenAI(result map[string]any) Usage {
usage, _ := result["usage"].(map[string]any)
input := intFromAny(firstPresent(usage["prompt_tokens"], usage["input_tokens"]))
@ -72,6 +206,34 @@ func usageFromOpenAI(result map[string]any) Usage {
return Usage{InputTokens: input, OutputTokens: output, TotalTokens: total}
}
func requestIDFromHTTPResponse(resp *http.Response) string {
if resp == nil {
return ""
}
for _, key := range []string{
"x-request-id",
"x-requestid",
"request-id",
"x-amzn-requestid",
"x-amz-request-id",
"cf-ray",
} {
if value := strings.TrimSpace(resp.Header.Get(key)); value != "" {
return value
}
}
return ""
}
func requestIDFromResult(result map[string]any) string {
for _, key := range []string{"request_id", "requestId", "id", "response_id", "responseId"} {
if value := strings.TrimSpace(stringFromAny(result[key])); value != "" {
return value
}
}
return ""
}
func intFromAny(value any) int {
switch typed := value.(type) {
case float64:
@ -85,6 +247,13 @@ func intFromAny(value any) int {
}
}
func stringFromAny(value any) string {
if text, ok := value.(string); ok {
return text
}
return ""
}
func firstPresent(values ...any) any {
for _, value := range values {
if value != nil {

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"net/http"
"strings"
"time"
)
type OpenAIClient struct {
@ -23,6 +24,7 @@ func (c OpenAIClient) Run(ctx context.Context, request Request) (Response, error
}
body := cloneBody(request.Body)
body["model"] = request.Candidate.ModelName
stream := request.Stream || boolValue(body, "stream")
raw, _ := json.Marshal(body)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, joinURL(request.Candidate.BaseURL, endpoint), bytes.NewReader(raw))
if err != nil {
@ -34,11 +36,36 @@ func (c OpenAIClient) Run(ctx context.Context, request Request) (Response, error
if err != nil {
return Response{}, &ClientError{Code: "network", Message: err.Error(), Retryable: true}
}
result, err := decodeHTTPResponse(resp)
responseStartedAt := time.Now()
requestID := requestIDFromHTTPResponse(resp)
result, err := decodeOpenAIResponse(resp, stream, request.StreamDelta)
responseFinishedAt := time.Now()
if err != nil {
return Response{}, err
return Response{}, annotateResponseError(err, requestID, responseStartedAt, responseFinishedAt)
}
return Response{Result: result, Usage: usageFromOpenAI(result), Progress: providerProgress(request)}, nil
if requestID == "" {
requestID = requestIDFromResult(result)
}
return Response{
Result: result,
RequestID: requestID,
Usage: usageFromOpenAI(result),
Progress: providerProgress(request),
ResponseStartedAt: responseStartedAt,
ResponseFinishedAt: responseFinishedAt,
ResponseDurationMS: responseDurationMS(responseStartedAt, responseFinishedAt),
}, nil
}
func decodeOpenAIResponse(resp *http.Response, stream bool, onDelta StreamDelta) (map[string]any, error) {
if stream {
result, err := decodeOpenAIStreamResponse(resp, onDelta)
if err == nil {
return result, nil
}
return nil, err
}
return decodeHTTPResponse(resp)
}
func openAIEndpoint(kind string) string {

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"
)
type SimulationClient struct{}
@ -17,10 +18,17 @@ func (c SimulationClient) Run(ctx context.Context, request Request) (Response, e
if profile == "fatal_failure" || profile == "non_retryable_failure" {
return Response{}, &ClientError{Code: "bad_request", Message: "simulated non-retryable failure", Retryable: false}
}
responseStartedAt := time.Now()
result := simulatedResult(request)
responseFinishedAt := time.Now()
return Response{
Result: simulatedResult(request),
Usage: simulatedUsage(request),
Progress: simulatedProgress(request),
Result: result,
RequestID: requestIDFromResult(result),
Usage: simulatedUsage(request),
Progress: simulatedProgress(request),
ResponseStartedAt: responseStartedAt,
ResponseFinishedAt: responseFinishedAt,
ResponseDurationMS: responseDurationMS(responseStartedAt, responseFinishedAt),
}, nil
}

View File

@ -4,23 +4,30 @@ import (
"context"
"errors"
"net/http"
"time"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/store"
)
type Request struct {
Kind string
ModelType string
Model string
Body map[string]any
Candidate store.RuntimeModelCandidate
Stream bool
Kind string
ModelType string
Model string
Body map[string]any
Candidate store.RuntimeModelCandidate
Stream bool
StreamDelta StreamDelta
}
type Response struct {
Result map[string]any
Usage Usage
Progress []Progress
Result map[string]any
RequestID string
Usage Usage
Progress []Progress
Metrics map[string]any
ResponseStartedAt time.Time
ResponseFinishedAt time.Time
ResponseDurationMS int64
}
type Usage struct {
@ -36,15 +43,21 @@ type Progress struct {
Payload map[string]any
}
type StreamDelta func(text string) error
type Client interface {
Run(ctx context.Context, request Request) (Response, error)
}
type ClientError struct {
Code string
Message string
StatusCode int
Retryable bool
Code string
Message string
StatusCode int
RequestID string
ResponseStartedAt time.Time
ResponseFinishedAt time.Time
ResponseDurationMS int64
Retryable bool
}
func (e *ClientError) Error() string {
@ -67,6 +80,59 @@ func ErrorCode(err error) string {
return "client_error"
}
type ResponseMetadata struct {
RequestID string
ResponseStartedAt time.Time
ResponseFinishedAt time.Time
ResponseDurationMS int64
StatusCode int
}
func ErrorResponseMetadata(err error) ResponseMetadata {
var clientErr *ClientError
if errors.As(err, &clientErr) {
return ResponseMetadata{
RequestID: clientErr.RequestID,
ResponseStartedAt: clientErr.ResponseStartedAt,
ResponseFinishedAt: clientErr.ResponseFinishedAt,
ResponseDurationMS: clientErr.ResponseDurationMS,
StatusCode: clientErr.StatusCode,
}
}
return ResponseMetadata{}
}
func annotateResponseError(err error, requestID string, startedAt time.Time, finishedAt time.Time) error {
var clientErr *ClientError
if !errors.As(err, &clientErr) {
return err
}
if clientErr.RequestID == "" {
clientErr.RequestID = requestID
}
if clientErr.ResponseStartedAt.IsZero() {
clientErr.ResponseStartedAt = startedAt
}
if clientErr.ResponseFinishedAt.IsZero() {
clientErr.ResponseFinishedAt = finishedAt
}
if clientErr.ResponseDurationMS == 0 {
clientErr.ResponseDurationMS = responseDurationMS(clientErr.ResponseStartedAt, clientErr.ResponseFinishedAt)
}
return err
}
func HTTPRetryable(status int) bool {
return status == http.StatusTooManyRequests || status == http.StatusRequestTimeout || status >= 500
}
func responseDurationMS(startedAt time.Time, finishedAt time.Time) int64 {
if startedAt.IsZero() || finishedAt.IsZero() {
return 0
}
duration := finishedAt.Sub(startedAt).Milliseconds()
if duration < 0 {
return 0
}
return duration
}

View File

@ -93,6 +93,7 @@ func TestCoreLocalFlow(t *testing.T) {
Secret string `json:"secret"`
APIKey struct {
ID string `json:"id"`
Name string `json:"name"`
KeyPrefix string `json:"keyPrefix"`
Status string `json:"status"`
} `json:"apiKey"`
@ -187,10 +188,10 @@ VALUES ($1, 5, '{"purpose":"core-flow"}'::jsonb)`, inviteCode); err != nil {
var baseModels struct {
Items []struct {
ID string `json:"id"`
CanonicalModelKey string `json:"canonicalModelKey"`
ProviderModelName string `json:"providerModelName"`
ModelType string `json:"modelType"`
ID string `json:"id"`
CanonicalModelKey string `json:"canonicalModelKey"`
ProviderModelName string `json:"providerModelName"`
ModelType []string `json:"modelType"`
} `json:"items"`
}
doJSON(t, server.URL, http.MethodGet, "/api/v1/catalog/base-models", apiKeyResponse.Secret, nil, http.StatusOK, &baseModels)
@ -202,36 +203,47 @@ VALUES ($1, 5, '{"purpose":"core-flow"}'::jsonb)`, inviteCode); err != nil {
"providerKey": "openai",
"canonicalModelKey": "openai:smoke-base-" + suffixText,
"providerModelName": "smoke-base-" + suffixText,
"modelType": "text_generate",
"displayName": "Smoke Base Model",
"modelType": []string{"text_generate"},
"modelAlias": "Smoke Base Model",
"capabilities": map[string]any{"originalTypes": []string{"text_generate"}},
"metadata": map[string]any{"source": "test"},
}
var createdBaseModel struct {
ID string `json:"id"`
CanonicalModelKey string `json:"canonicalModelKey"`
DisplayName string `json:"displayName"`
ModelAlias string `json:"modelAlias"`
}
doJSON(t, server.URL, http.MethodPost, "/api/v1/catalog/base-models", apiKeyResponse.Secret, baseModelInput, http.StatusCreated, &createdBaseModel)
if createdBaseModel.ID == "" || createdBaseModel.CanonicalModelKey != baseModelInput["canonicalModelKey"] {
t.Fatalf("unexpected created base model: %+v", createdBaseModel)
}
baseModelInput["displayName"] = "Smoke Base Model Updated"
baseModelInput["modelAlias"] = "Smoke Base Model Updated"
var updatedBaseModel struct {
DisplayName string `json:"displayName"`
ModelAlias string `json:"modelAlias"`
}
doJSON(t, server.URL, http.MethodPatch, "/api/v1/catalog/base-models/"+createdBaseModel.ID, apiKeyResponse.Secret, baseModelInput, http.StatusOK, &updatedBaseModel)
if updatedBaseModel.DisplayName != "Smoke Base Model Updated" {
if updatedBaseModel.ModelAlias != "Smoke Base Model Updated" {
t.Fatalf("unexpected updated base model: %+v", updatedBaseModel)
}
doJSON(t, server.URL, http.MethodDelete, "/api/v1/catalog/base-models/"+createdBaseModel.ID, apiKeyResponse.Secret, nil, http.StatusNoContent, nil)
var taskResponse struct {
Task struct {
ID string `json:"id"`
Status string `json:"status"`
RunMode string `json:"runMode"`
Result map[string]any `json:"result"`
ID string `json:"id"`
Status string `json:"status"`
RunMode string `json:"runMode"`
APIKeyID string `json:"apiKeyId"`
APIKeyName string `json:"apiKeyName"`
RequestID string `json:"requestId"`
ResolvedModel string `json:"resolvedModel"`
Usage map[string]any `json:"usage"`
Metrics map[string]any `json:"metrics"`
BillingSummary map[string]any `json:"billingSummary"`
FinalChargeAmount float64 `json:"finalChargeAmount"`
ResponseStartedAt string `json:"responseStartedAt"`
ResponseFinishedAt string `json:"responseFinishedAt"`
ResponseDurationMS int64 `json:"responseDurationMs"`
Result map[string]any `json:"result"`
} `json:"task"`
}
doJSON(t, server.URL, http.MethodPost, "/api/v1/chat/completions", apiKeyResponse.Secret, map[string]any{
@ -243,6 +255,18 @@ VALUES ($1, 5, '{"purpose":"core-flow"}'::jsonb)`, inviteCode); err != nil {
if taskResponse.Task.ID == "" || taskResponse.Task.Status != "succeeded" || taskResponse.Task.RunMode != "simulation" {
t.Fatalf("unexpected task response: %+v", taskResponse.Task)
}
if taskResponse.Task.APIKeyID != apiKeyResponse.APIKey.ID || taskResponse.Task.APIKeyName != apiKeyResponse.APIKey.Name {
t.Fatalf("task should record full api key identity: %+v key=%+v", taskResponse.Task, apiKeyResponse.APIKey)
}
if taskResponse.Task.RequestID == "" || taskResponse.Task.ResolvedModel == "" || taskResponse.Task.ResponseStartedAt == "" || taskResponse.Task.ResponseFinishedAt == "" {
t.Fatalf("task should record provider request and response timing: %+v", taskResponse.Task)
}
if taskResponse.Task.Usage["totalTokens"] == nil || taskResponse.Task.FinalChargeAmount <= 0 {
t.Fatalf("task should record token usage and final charge: %+v", taskResponse.Task)
}
if taskResponse.Task.BillingSummary["finalCharge"] == nil || taskResponse.Task.Metrics["requestedModel"] == nil {
t.Fatalf("task should record billing summary and task metrics: %+v", taskResponse.Task)
}
var compatChat map[string]any
doJSON(t, server.URL, http.MethodPost, "/v1/chat/completions", apiKeyResponse.Secret, map[string]any{
@ -348,14 +372,22 @@ VALUES ($1, 5, '{"purpose":"core-flow"}'::jsonb)`, inviteCode); err != nil {
}
var taskDetail struct {
ID string `json:"id"`
Status string `json:"status"`
Result map[string]any `json:"result"`
ID string `json:"id"`
Status string `json:"status"`
APIKeyName string `json:"apiKeyName"`
RequestID string `json:"requestId"`
Usage map[string]any `json:"usage"`
BillingSummary map[string]any `json:"billingSummary"`
FinalChargeAmount float64 `json:"finalChargeAmount"`
Result map[string]any `json:"result"`
}
doJSON(t, server.URL, http.MethodGet, "/api/v1/tasks/"+taskResponse.Task.ID, apiKeyResponse.Secret, nil, http.StatusOK, &taskDetail)
if taskDetail.Status != "succeeded" || taskDetail.Result["id"] == "" {
t.Fatalf("unexpected task detail: %+v", taskDetail)
}
if taskDetail.APIKeyName != apiKeyResponse.APIKey.Name || taskDetail.RequestID == "" || taskDetail.Usage["totalTokens"] == nil || taskDetail.FinalChargeAmount <= 0 {
t.Fatalf("task detail should expose enriched record fields: %+v", taskDetail)
}
req, err := http.NewRequest(http.MethodGet, server.URL+"/api/v1/tasks/"+taskResponse.Task.ID+"/events", nil)
if err != nil {

View File

@ -10,6 +10,11 @@ func stringFromMap(values map[string]any, key string) string {
return strings.TrimSpace(value)
}
func stringFromAny(value any) string {
text, _ := value.(string)
return strings.TrimSpace(text)
}
func boolFromMap(values map[string]any, key string) bool {
value, _ := values[key].(bool)
return value

View File

@ -16,7 +16,7 @@ type EstimateResult struct {
}
func (s *Service) Estimate(ctx context.Context, kind string, model string, body map[string]any, user *auth.User) (EstimateResult, error) {
candidates, err := s.store.ListModelCandidates(ctx, model, modelTypeFromKind(kind))
candidates, err := s.store.ListModelCandidates(ctx, model, modelTypeFromKind(kind), user)
if err != nil {
return EstimateResult{}, err
}
@ -40,15 +40,15 @@ func (s *Service) Estimate(ctx context.Context, kind string, model string, body
func (s *Service) billings(ctx context.Context, user *auth.User, kind string, body map[string]any, candidate store.RuntimeModelCandidate, response clients.Response, simulated bool) []any {
config := effectiveBillingConfig(candidate)
discount := effectiveDiscount(ctx, s.store, user, candidate)
if modelTypeFromKind(kind) == "chat" {
if isTextGenerationKind(kind) {
inputTokens := response.Usage.InputTokens
outputTokens := response.Usage.OutputTokens
if inputTokens == 0 && outputTokens == 0 {
inputTokens = estimateRequestTokens(body)
outputTokens = 1
}
inputAmount := roundPrice(float64(inputTokens) / 1000 * price(config, "textInputPer1k") * discount)
outputAmount := roundPrice(float64(outputTokens) / 1000 * price(config, "textOutputPer1k") * discount)
inputAmount := roundPrice(float64(inputTokens) / 1000 * resourcePrice(config, "text", "textInputPer1k", "inputTokenPrice", "basePrice") * discount)
outputAmount := roundPrice(float64(outputTokens) / 1000 * resourcePrice(config, "text", "textOutputPer1k", "outputTokenPrice", "basePrice") * discount)
return []any{
billingLine(candidate, "text_input", "1k_tokens", inputTokens, inputAmount, discount, simulated),
billingLine(candidate, "text_output", "1k_tokens", outputTokens, outputAmount, discount, simulated),
@ -59,13 +59,19 @@ func (s *Service) billings(ctx context.Context, user *auth.User, kind string, bo
count = 1
}
resource := "image"
unit := "image"
baseKey := "imageBase"
if kind == "images.edits" {
resource = "image_edit"
baseKey = "editBase"
}
amount := float64(count) * price(config, baseKey) * weighted(config, "qualityWeights", stringFromMap(body, "quality")) * weighted(config, "sizeWeights", stringFromMap(body, "size")) * discount
return []any{billingLine(candidate, resource, "image", count, roundPrice(amount), discount, simulated)}
if kind == "videos.generations" {
resource = "video"
unit = "video"
baseKey = "videoBase"
}
amount := float64(count) * resourcePrice(config, resource, baseKey, "basePrice") * resourceWeight(config, resource, "qualityWeights", stringFromMap(body, "quality")) * resourceWeight(config, resource, "sizeWeights", stringFromMap(body, "size")) * resourceWeight(config, resource, "resolutionWeights", firstNonEmptyString(stringFromMap(body, "resolution"), stringFromMap(body, "size"))) * discount
return []any{billingLine(candidate, resource, unit, count, roundPrice(amount), discount, simulated)}
}
func effectiveBillingConfig(candidate store.RuntimeModelCandidate) map[string]any {
@ -121,6 +127,28 @@ func price(config map[string]any, key string) float64 {
return 0
}
func resourcePrice(config map[string]any, resource string, keys ...string) float64 {
for _, key := range keys {
if value := price(config, key); value > 0 {
return value
}
}
if resourceConfig, ok := config[resource].(map[string]any); ok {
for _, key := range keys {
if value := floatFromAny(resourceConfig[key]); value > 0 {
return value
}
}
if value := floatFromAny(resourceConfig["basePrice"]); value > 0 {
return value
}
}
if resource == "image_edit" {
return resourcePrice(config, "image", keys...)
}
return 0
}
func weighted(config map[string]any, key string, name string) float64 {
if strings.TrimSpace(name) == "" {
return 1
@ -132,6 +160,39 @@ func weighted(config map[string]any, key string, name string) float64 {
return 1
}
func resourceWeight(config map[string]any, resource string, key string, name string) float64 {
if value := weighted(config, key, name); value != 1 {
return value
}
if strings.TrimSpace(name) == "" {
return 1
}
resourceConfig, _ := config[resource].(map[string]any)
if len(resourceConfig) == 0 && resource == "image_edit" {
resourceConfig, _ = config["image"].(map[string]any)
}
if weights, ok := resourceConfig["dynamicWeight"].(map[string]any); ok {
if value := floatFromAny(weights[name]); value > 0 {
return value
}
}
if weights, ok := resourceConfig[key].(map[string]any); ok {
if value := floatFromAny(weights[name]); value > 0 {
return value
}
}
return 1
}
func firstNonEmptyString(values ...string) string {
for _, value := range values {
if strings.TrimSpace(value) != "" {
return strings.TrimSpace(value)
}
}
return ""
}
func roundPrice(value float64) float64 {
return math.Round(value*1000000) / 1000000
}

View File

@ -0,0 +1,221 @@
package runner
import (
"strings"
"time"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/auth"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/clients"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/store"
)
type taskRecordDetails struct {
RequestID string
ResolvedModel string
Usage map[string]any
Metrics map[string]any
BillingSummary map[string]any
FinalChargeAmount float64
ResponseStartedAt time.Time
ResponseFinishedAt time.Time
ResponseDurationMS int64
}
func buildSuccessRecord(task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, response clients.Response, billings []any, simulated bool) taskRecordDetails {
usage := usageToMap(response.Usage)
metrics := taskMetrics(task, user, body, candidate, response, simulated)
summary := summarizeBillings(billings, simulated)
finalAmount := floatFromAny(summary["totalAmount"])
return taskRecordDetails{
RequestID: response.RequestID,
ResolvedModel: candidate.ModelName,
Usage: usage,
Metrics: metrics,
BillingSummary: summary,
FinalChargeAmount: finalAmount,
ResponseStartedAt: response.ResponseStartedAt,
ResponseFinishedAt: response.ResponseFinishedAt,
ResponseDurationMS: response.ResponseDurationMS,
}
}
func taskMetrics(task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, response clients.Response, simulated bool) map[string]any {
metrics := map[string]any{
"kind": task.Kind,
"runMode": task.RunMode,
"requestedModel": task.Model,
"resolvedModel": candidate.ModelName,
"modelAlias": candidate.ModelAlias,
"providerModel": candidate.ProviderModelName,
"canonicalModel": candidate.CanonicalModelKey,
"modelType": candidate.ModelType,
"provider": candidate.Provider,
"platformId": candidate.PlatformID,
"platformName": candidate.PlatformName,
"platformModelId": candidate.PlatformModelID,
"clientId": candidate.ClientID,
"queueKey": candidate.QueueKey,
"requestId": response.RequestID,
"simulated": simulated,
}
if user != nil {
metrics["apiKeyId"] = user.APIKeyID
metrics["apiKeyName"] = user.APIKeyName
metrics["apiKeyPrefix"] = user.APIKeyPrefix
}
if response.ResponseDurationMS > 0 {
metrics["responseDurationMs"] = response.ResponseDurationMS
}
switch task.Kind {
case "chat.completions", "responses":
metrics["stream"] = boolFromMap(body, "stream")
metrics["messageCount"] = messageCount(body)
copyIfPresent(metrics, body, "temperature")
copyIfPresent(metrics, body, "max_tokens")
copyIfPresent(metrics, body, "max_output_tokens")
case "images.generations", "images.edits":
metrics["imageCount"] = requestedCount(body)
metrics["outputImageCount"] = outputDataCount(response.Result)
metrics["inputImageCount"] = imageInputCount(body)
metrics["hasMask"] = stringFromMap(body, "mask") != ""
copyIfPresent(metrics, body, "size")
copyIfPresent(metrics, body, "quality")
copyIfPresent(metrics, body, "style")
case "videos.generations":
metrics["hasReferenceImage"] = imageInputCount(body) > 0
metrics["hasReferenceVideo"] = hasAnyString(body, "video", "video_url", "videoUrl", "reference_video", "referenceVideo")
copyIfPresent(metrics, body, "duration")
copyIfPresent(metrics, body, "resolution")
copyIfPresent(metrics, body, "size")
copyIfPresent(metrics, body, "aspect_ratio")
copyIfPresent(metrics, body, "aspectRatio")
copyIfPresent(metrics, body, "fps")
}
return metrics
}
func usageToMap(usage clients.Usage) map[string]any {
out := map[string]any{}
if usage.InputTokens > 0 {
out["inputTokens"] = usage.InputTokens
out["promptTokens"] = usage.InputTokens
}
if usage.OutputTokens > 0 {
out["outputTokens"] = usage.OutputTokens
out["completionTokens"] = usage.OutputTokens
}
if usage.TotalTokens > 0 {
out["totalTokens"] = usage.TotalTokens
}
return out
}
func summarizeBillings(billings []any, simulated bool) map[string]any {
amountByCurrency := map[string]float64{}
for _, raw := range billings {
line, _ := raw.(map[string]any)
if line == nil {
continue
}
currency := strings.TrimSpace(stringFromAny(line["currency"]))
if currency == "" {
currency = "resource"
}
amountByCurrency[currency] = roundPrice(amountByCurrency[currency] + floatFromAny(line["amount"]))
}
currency := ""
totalAmount := 0.0
for key, amount := range amountByCurrency {
if currency == "" {
currency = key
} else if currency != key {
currency = "mixed"
}
totalAmount += amount
}
if currency == "" {
currency = "resource"
}
totalAmount = roundPrice(totalAmount)
return map[string]any{
"lineCount": len(billings),
"totalAmount": totalAmount,
"amountByCurrency": amountByCurrency,
"currency": currency,
"simulated": simulated,
"finalCharge": map[string]any{
"amount": totalAmount,
"currency": currency,
"simulated": simulated,
},
}
}
func failureMetrics(err error, simulated bool) (string, map[string]any, time.Time, time.Time, int64) {
meta := clients.ErrorResponseMetadata(err)
metrics := map[string]any{
"simulated": simulated,
}
if err != nil {
metrics["error"] = err.Error()
metrics["retryable"] = clients.IsRetryable(err)
}
if meta.StatusCode > 0 {
metrics["statusCode"] = meta.StatusCode
}
if meta.RequestID != "" {
metrics["requestId"] = meta.RequestID
}
if meta.ResponseDurationMS > 0 {
metrics["responseDurationMs"] = meta.ResponseDurationMS
}
return meta.RequestID, metrics, meta.ResponseStartedAt, meta.ResponseFinishedAt, meta.ResponseDurationMS
}
func messageCount(body map[string]any) int {
messages, _ := body["messages"].([]any)
return len(messages)
}
func requestedCount(body map[string]any) int {
count := int(floatFromAny(body["n"]))
if count <= 0 {
return 1
}
return count
}
func outputDataCount(result map[string]any) int {
data, _ := result["data"].([]any)
return len(data)
}
func imageInputCount(body map[string]any) int {
count := 0
for _, key := range []string{"image", "image_url", "imageUrl"} {
if stringFromMap(body, key) != "" {
count++
}
}
for _, key := range []string{"image_urls", "imageUrls", "images"} {
if values, ok := body[key].([]any); ok {
count += len(values)
}
}
return count
}
func hasAnyString(body map[string]any, keys ...string) bool {
for _, key := range keys {
if stringFromMap(body, key) != "" {
return true
}
}
return false
}
func copyIfPresent(target map[string]any, body map[string]any, key string) {
if value, ok := body[key]; ok && value != nil {
target[key] = value
}
}

View File

@ -41,18 +41,26 @@ func New(cfg config.Config, db *store.Store, logger *slog.Logger) *Service {
}
func (s *Service) Execute(ctx context.Context, task store.GatewayTask, user *auth.User) (Result, error) {
return s.execute(ctx, task, user, nil)
}
func (s *Service) ExecuteStream(ctx context.Context, task store.GatewayTask, user *auth.User, onDelta clients.StreamDelta) (Result, error) {
return s.execute(ctx, task, user, onDelta)
}
func (s *Service) execute(ctx context.Context, task store.GatewayTask, user *auth.User, onDelta clients.StreamDelta) (Result, error) {
modelType := modelTypeFromKind(task.Kind)
body := normalizeRequest(task.Kind, task.Request)
if err := validateRequest(task.Kind, body); err != nil {
failed, finishErr := s.failTask(ctx, task.ID, "bad_request", err.Error(), task.RunMode == "simulation")
failed, finishErr := s.failTask(ctx, task.ID, "bad_request", err.Error(), task.RunMode == "simulation", err)
if finishErr != nil {
return Result{}, finishErr
}
return Result{Task: failed, Output: failed.Result}, err
}
candidates, err := s.store.ListModelCandidates(ctx, task.Model, modelType)
candidates, err := s.store.ListModelCandidates(ctx, task.Model, modelType, user)
if err != nil {
failed, finishErr := s.failTask(ctx, task.ID, "no_model_candidate", err.Error(), task.RunMode == "simulation")
failed, finishErr := s.failTask(ctx, task.ID, "no_model_candidate", err.Error(), task.RunMode == "simulation", err)
if finishErr != nil {
return Result{}, finishErr
}
@ -72,14 +80,35 @@ func (s *Service) Execute(ctx context.Context, task store.GatewayTask, user *aut
break
}
attemptNo := index + 1
response, err := s.runCandidate(ctx, task, user, body, candidate, attemptNo)
response, err := s.runCandidate(ctx, task, user, body, candidate, attemptNo, onDelta)
if err == nil {
billings := s.billings(ctx, user, task.Kind, body, candidate, response, isSimulation(task, candidate))
finished, finishErr := s.store.FinishTaskSuccess(ctx, task.ID, response.Result, billings)
record := buildSuccessRecord(task, user, body, candidate, response, billings, isSimulation(task, candidate))
finished, finishErr := s.store.FinishTaskSuccess(ctx, store.FinishTaskSuccessInput{
TaskID: task.ID,
Result: response.Result,
Billings: billings,
RequestID: record.RequestID,
ResolvedModel: record.ResolvedModel,
Usage: record.Usage,
Metrics: record.Metrics,
BillingSummary: record.BillingSummary,
FinalChargeAmount: record.FinalChargeAmount,
ResponseStartedAt: record.ResponseStartedAt,
ResponseFinishedAt: record.ResponseFinishedAt,
ResponseDurationMS: record.ResponseDurationMS,
})
if finishErr != nil {
return Result{}, finishErr
}
if err := s.emit(ctx, task.ID, "task.completed", "succeeded", "completed", 1, "task completed", map[string]any{"result": response.Result, "billings": billings}, isSimulation(task, candidate)); err != nil {
if err := s.emit(ctx, task.ID, "task.completed", "succeeded", "completed", 1, "task completed", map[string]any{
"result": response.Result,
"billings": billings,
"usage": record.Usage,
"metrics": record.Metrics,
"billingSummary": record.BillingSummary,
"requestId": record.RequestID,
}, isSimulation(task, candidate)); err != nil {
return Result{}, err
}
return Result{Task: finished, Output: response.Result}, nil
@ -98,14 +127,14 @@ func (s *Service) Execute(ctx context.Context, task store.GatewayTask, user *aut
if lastErr != nil {
message = lastErr.Error()
}
failed, err := s.failTask(ctx, task.ID, code, message, task.RunMode == "simulation")
failed, err := s.failTask(ctx, task.ID, code, message, task.RunMode == "simulation", lastErr)
if err != nil {
return Result{}, err
}
return Result{Task: failed, Output: failed.Result}, lastErr
}
func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, attemptNo int) (clients.Response, error) {
func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user *auth.User, body map[string]any, candidate store.RuntimeModelCandidate, attemptNo int, onDelta clients.StreamDelta) (clients.Response, error) {
simulated := isSimulation(task, candidate)
if err := s.emit(ctx, task.ID, "task.attempt.started", "running", "submitting", 0.25, "client attempt started", map[string]any{"attempt": attemptNo, "clientId": candidate.ClientID}, simulated); err != nil {
return clients.Response{}, err
@ -127,7 +156,14 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
reservations := s.rateLimitReservations(ctx, user, candidate, body)
limitResult, err := s.store.ReserveRateLimits(ctx, task.ID, reservations)
if err != nil {
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{AttemptID: attemptID, Status: "failed", Retryable: false, ErrorCode: "rate_limit", ErrorMessage: err.Error()})
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
AttemptID: attemptID,
Status: "failed",
Retryable: false,
Metrics: map[string]any{"error": err.Error(), "candidateModel": candidate.ModelName, "clientId": candidate.ClientID},
ErrorCode: "rate_limit",
ErrorMessage: err.Error(),
})
return clients.Response{}, &clients.ClientError{Code: "rate_limit", Message: err.Error(), Retryable: false}
}
defer s.store.ReleaseConcurrencyLeases(context.WithoutCancel(ctx), limitResult.LeaseIDs)
@ -138,34 +174,77 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
defer s.store.RecordClientRelease(context.WithoutCancel(ctx), candidate.ClientID, "")
client := s.clientFor(candidate, simulated)
callStartedAt := time.Now()
response, err := client.Run(ctx, clients.Request{
Kind: task.Kind,
ModelType: candidate.ModelType,
Model: task.Model,
Body: body,
Candidate: candidate,
Stream: boolFromMap(body, "stream"),
Kind: task.Kind,
ModelType: candidate.ModelType,
Model: task.Model,
Body: body,
Candidate: candidate,
Stream: boolFromMap(body, "stream"),
StreamDelta: onDelta,
})
callFinishedAt := time.Now()
if response.ResponseStartedAt.IsZero() {
response.ResponseStartedAt = callStartedAt
}
if response.ResponseFinishedAt.IsZero() {
response.ResponseFinishedAt = callFinishedAt
}
if response.ResponseDurationMS == 0 {
response.ResponseDurationMS = response.ResponseFinishedAt.Sub(response.ResponseStartedAt).Milliseconds()
if response.ResponseDurationMS < 0 {
response.ResponseDurationMS = 0
}
}
if err != nil {
retryable := clients.IsRetryable(err)
requestID, metrics, responseStartedAt, responseFinishedAt, responseDurationMS := failureMetrics(err, simulated)
if responseStartedAt.IsZero() {
responseStartedAt = callStartedAt
}
if responseFinishedAt.IsZero() {
responseFinishedAt = callFinishedAt
}
if responseDurationMS == 0 {
responseDurationMS = responseFinishedAt.Sub(responseStartedAt).Milliseconds()
if responseDurationMS < 0 {
responseDurationMS = 0
}
}
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
AttemptID: attemptID,
Status: "failed",
Retryable: retryable,
ErrorCode: clients.ErrorCode(err),
ErrorMessage: err.Error(),
AttemptID: attemptID,
Status: "failed",
Retryable: retryable,
RequestID: requestID,
Metrics: metrics,
ResponseStartedAt: responseStartedAt,
ResponseFinishedAt: responseFinishedAt,
ResponseDurationMS: responseDurationMS,
ErrorCode: clients.ErrorCode(err),
ErrorMessage: err.Error(),
})
_ = s.emit(ctx, task.ID, "task.attempt.failed", "running", "attempt_failed", 0.45, err.Error(), map[string]any{"attempt": attemptNo, "retryable": retryable}, simulated)
_ = s.emit(ctx, task.ID, "task.attempt.failed", "running", "attempt_failed", 0.45, err.Error(), map[string]any{"attempt": attemptNo, "retryable": retryable, "requestId": requestID, "metrics": metrics}, simulated)
return clients.Response{}, err
}
uploadedResult, err := s.uploadGeneratedAssets(ctx, response.Result)
if err != nil {
metrics := taskMetrics(task, user, body, candidate, response, simulated)
metrics["error"] = err.Error()
metrics["retryable"] = clients.IsRetryable(err)
_ = s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
AttemptID: attemptID,
Status: "failed",
Retryable: clients.IsRetryable(err),
ErrorCode: clients.ErrorCode(err),
ErrorMessage: err.Error(),
AttemptID: attemptID,
Status: "failed",
Retryable: clients.IsRetryable(err),
RequestID: response.RequestID,
Usage: usageToMap(response.Usage),
Metrics: metrics,
ResponseSnapshot: response.Result,
ResponseStartedAt: response.ResponseStartedAt,
ResponseFinishedAt: response.ResponseFinishedAt,
ResponseDurationMS: response.ResponseDurationMS,
ErrorCode: clients.ErrorCode(err),
ErrorMessage: err.Error(),
})
return clients.Response{}, err
}
@ -176,9 +255,15 @@ func (s *Service) runCandidate(ctx context.Context, task store.GatewayTask, user
}
}
if err := s.store.FinishTaskAttempt(ctx, store.FinishTaskAttemptInput{
AttemptID: attemptID,
Status: "succeeded",
ResponseSnapshot: response.Result,
AttemptID: attemptID,
Status: "succeeded",
RequestID: response.RequestID,
Usage: usageToMap(response.Usage),
Metrics: taskMetrics(task, user, body, candidate, response, simulated),
ResponseSnapshot: response.Result,
ResponseStartedAt: response.ResponseStartedAt,
ResponseFinishedAt: response.ResponseFinishedAt,
ResponseDurationMS: response.ResponseDurationMS,
}); err != nil {
return clients.Response{}, err
}
@ -189,19 +274,32 @@ func (s *Service) clientFor(candidate store.RuntimeModelCandidate, simulated boo
if simulated {
return s.clients["simulation"]
}
key := strings.ToLower(candidate.Provider)
key := strings.ToLower(strings.TrimSpace(candidate.SpecType))
if key == "" {
key = strings.ToLower(strings.TrimSpace(candidate.Provider))
}
if client, ok := s.clients[key]; ok {
return client
}
return s.clients["openai"]
}
func (s *Service) failTask(ctx context.Context, taskID string, code string, message string, simulated bool) (store.GatewayTask, error) {
failed, err := s.store.FinishTaskFailure(ctx, taskID, code, message)
func (s *Service) failTask(ctx context.Context, taskID string, code string, message string, simulated bool, cause error) (store.GatewayTask, error) {
requestID, metrics, responseStartedAt, responseFinishedAt, responseDurationMS := failureMetrics(cause, simulated)
failed, err := s.store.FinishTaskFailure(ctx, store.FinishTaskFailureInput{
TaskID: taskID,
Code: code,
Message: message,
RequestID: requestID,
Metrics: metrics,
ResponseStartedAt: responseStartedAt,
ResponseFinishedAt: responseFinishedAt,
ResponseDurationMS: responseDurationMS,
})
if err != nil {
return store.GatewayTask{}, err
}
if eventErr := s.emit(ctx, taskID, "task.failed", "failed", "failed", 1, message, map[string]any{"code": code}, simulated); eventErr != nil {
if eventErr := s.emit(ctx, taskID, "task.failed", "failed", "failed", 1, message, map[string]any{"code": code, "requestId": requestID, "metrics": metrics}, simulated); eventErr != nil {
return store.GatewayTask{}, eventErr
}
return failed, nil
@ -221,14 +319,23 @@ func (s *Service) emit(ctx context.Context, taskID string, eventType string, sta
func modelTypeFromKind(kind string) string {
switch kind {
case "chat.completions", "responses":
return "chat"
return "text_generate"
case "images.generations", "images.edits":
return "image"
if kind == "images.edits" {
return "image_edit"
}
return "image_generate"
case "videos.generations":
return "video_generate"
default:
return "task"
}
}
func isTextGenerationKind(kind string) bool {
return kind == "chat.completions" || kind == "responses"
}
func isSimulation(task store.GatewayTask, candidate store.RuntimeModelCandidate) bool {
if task.RunMode == "simulation" {
return true

View File

@ -25,6 +25,7 @@ var (
ErrInvalidCredentials = errors.New("invalid account or password")
ErrInvalidInvitation = errors.New("invalid or expired invitation code")
ErrLocalUserRequired = errors.New("local gateway user is required")
ErrProtectedDefault = errors.New("protected default resource cannot be deleted")
ErrUserAlreadyExists = errors.New("user already exists")
ErrWeakPassword = errors.New("password must be at least 8 characters")
)
@ -54,6 +55,7 @@ type Platform struct {
Provider string `json:"provider"`
PlatformKey string `json:"platformKey"`
Name string `json:"name"`
InternalName string `json:"internalName,omitempty"`
BaseURL string `json:"baseUrl,omitempty"`
AuthType string `json:"authType"`
Status string `json:"status"`
@ -62,6 +64,9 @@ type Platform struct {
DefaultDiscountFactor float64 `json:"defaultDiscountFactor"`
PricingRuleSetID string `json:"pricingRuleSetId,omitempty"`
Config map[string]any `json:"config,omitempty"`
CredentialsPreview map[string]any `json:"credentialsPreview,omitempty"`
RetryPolicy map[string]any `json:"retryPolicy,omitempty"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
@ -70,10 +75,13 @@ type CreatePlatformInput struct {
Provider string `json:"provider"`
PlatformKey string `json:"platformKey"`
Name string `json:"name"`
InternalName string `json:"internalName"`
BaseURL string `json:"baseUrl"`
AuthType string `json:"authType"`
Credentials map[string]any `json:"credentials"`
Config map[string]any `json:"config"`
RetryPolicy map[string]any `json:"retryPolicy"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy"`
DefaultPricingMode string `json:"defaultPricingMode"`
DefaultDiscountFactor float64 `json:"defaultDiscountFactor"`
PricingRuleSetID string `json:"pricingRuleSetId"`
@ -106,6 +114,11 @@ type APIKey struct {
UpdatedAt time.Time `json:"updatedAt"`
}
type PlayableAPIKey struct {
APIKey
Secret string `json:"secret"`
}
type CreatedAPIKey struct {
APIKey APIKey `json:"apiKey"`
Secret string `json:"secret"`
@ -128,11 +141,32 @@ type PlatformModel struct {
PricingRuleSetID string `json:"pricingRuleSetId,omitempty"`
BillingConfigOverride map[string]any `json:"billingConfigOverride,omitempty"`
BillingConfig map[string]any `json:"billingConfig,omitempty"`
PermissionConfig map[string]any `json:"permissionConfig,omitempty"`
RetryPolicy map[string]any `json:"retryPolicy,omitempty"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"`
RuntimePolicySetID string `json:"runtimePolicySetId,omitempty"`
RuntimePolicyOverride map[string]any `json:"runtimePolicyOverride,omitempty"`
Enabled bool `json:"enabled"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type AccessRule struct {
ID string `json:"id"`
SubjectType string `json:"subjectType"`
SubjectID string `json:"subjectId"`
ResourceType string `json:"resourceType"`
ResourceID string `json:"resourceId"`
Effect string `json:"effect"`
Priority int `json:"priority"`
MinPermissionLevel int `json:"minPermissionLevel"`
Conditions map[string]any `json:"conditions,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Status string `json:"status"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type CatalogProvider struct {
ID string `json:"id"`
ProviderKey string `json:"providerKey"`
@ -140,6 +174,8 @@ type CatalogProvider struct {
DisplayName string `json:"displayName"`
ProviderType string `json:"providerType"`
IconPath string `json:"iconPath,omitempty"`
DefaultBaseURL string `json:"defaultBaseUrl,omitempty"`
DefaultAuthType string `json:"defaultAuthType,omitempty"`
Source string `json:"source,omitempty"`
CapabilitySchema map[string]any `json:"capabilitySchema,omitempty"`
DefaultRateLimitPolicy map[string]any `json:"defaultRateLimitPolicy,omitempty"`
@ -154,18 +190,40 @@ type BaseModel struct {
ProviderKey string `json:"providerKey"`
CanonicalModelKey string `json:"canonicalModelKey"`
ProviderModelName string `json:"providerModelName"`
ModelType string `json:"modelType"`
DisplayName string `json:"displayName"`
ModelType StringList `json:"modelType"`
ModelAlias string `json:"modelAlias"`
DisplayName string `json:"-"`
Capabilities map[string]any `json:"capabilities,omitempty"`
BaseBillingConfig map[string]any `json:"baseBillingConfig,omitempty"`
DefaultRateLimitPolicy map[string]any `json:"defaultRateLimitPolicy,omitempty"`
PricingRuleSetID string `json:"pricingRuleSetId,omitempty"`
RuntimePolicySetID string `json:"runtimePolicySetId,omitempty"`
RuntimePolicyOverride map[string]any `json:"runtimePolicyOverride,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
CatalogType string `json:"catalogType"`
DefaultSnapshot map[string]any `json:"defaultSnapshot,omitempty"`
CustomizedAt string `json:"customizedAt,omitempty"`
PricingVersion int `json:"pricingVersion"`
Status string `json:"status"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type RuntimePolicySet struct {
ID string `json:"id"`
PolicyKey string `json:"policyKey"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy,omitempty"`
RetryPolicy map[string]any `json:"retryPolicy,omitempty"`
AutoDisablePolicy map[string]any `json:"autoDisablePolicy,omitempty"`
DegradePolicy map[string]any `json:"degradePolicy,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Status string `json:"status"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type PricingRule struct {
ID string `json:"id"`
RuleSetID string `json:"ruleSetId,omitempty"`
@ -298,27 +356,54 @@ type CreateTaskInput struct {
}
type GatewayTask struct {
ID string `json:"id"`
Kind string `json:"kind"`
RunMode string `json:"runMode"`
UserID string `json:"userId"`
GatewayUserID string `json:"gatewayUserId,omitempty"`
UserSource string `json:"userSource,omitempty"`
GatewayTenantID string `json:"gatewayTenantId,omitempty"`
TenantID string `json:"tenantId,omitempty"`
TenantKey string `json:"tenantKey,omitempty"`
UserGroupID string `json:"userGroupId,omitempty"`
UserGroupKey string `json:"userGroupKey,omitempty"`
Model string `json:"model"`
Request map[string]any `json:"request,omitempty"`
Status string `json:"status"`
Result map[string]any `json:"result,omitempty"`
Billings []any `json:"billings,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
ID string `json:"id"`
Kind string `json:"kind"`
RunMode string `json:"runMode"`
UserID string `json:"userId"`
GatewayUserID string `json:"gatewayUserId,omitempty"`
UserSource string `json:"userSource,omitempty"`
GatewayTenantID string `json:"gatewayTenantId,omitempty"`
TenantID string `json:"tenantId,omitempty"`
TenantKey string `json:"tenantKey,omitempty"`
APIKeyID string `json:"apiKeyId,omitempty"`
APIKeyName string `json:"apiKeyName,omitempty"`
APIKeyPrefix string `json:"apiKeyPrefix,omitempty"`
UserGroupID string `json:"userGroupId,omitempty"`
UserGroupKey string `json:"userGroupKey,omitempty"`
Model string `json:"model"`
ModelType string `json:"modelType,omitempty"`
RequestedModel string `json:"requestedModel,omitempty"`
ResolvedModel string `json:"resolvedModel,omitempty"`
RequestID string `json:"requestId,omitempty"`
Request map[string]any `json:"request,omitempty"`
Status string `json:"status"`
Result map[string]any `json:"result,omitempty"`
Billings []any `json:"billings,omitempty"`
Usage map[string]any `json:"usage"`
Metrics map[string]any `json:"metrics"`
BillingSummary map[string]any `json:"billingSummary"`
FinalChargeAmount float64 `json:"finalChargeAmount"`
ResponseStartedAt string `json:"responseStartedAt,omitempty"`
ResponseFinishedAt string `json:"responseFinishedAt,omitempty"`
ResponseDurationMS int64 `json:"responseDurationMs"`
FinishedAt string `json:"finishedAt,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
const gatewayTaskColumns = `
id::text, kind, run_mode, user_id, COALESCE(gateway_user_id::text, ''), user_source,
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(api_key_id, ''), COALESCE(api_key_name, ''), COALESCE(api_key_prefix, ''),
COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model,
COALESCE(model_type, ''), COALESCE(requested_model, ''), COALESCE(resolved_model, ''), COALESCE(request_id, ''),
request, status, COALESCE(result, '{}'::jsonb), COALESCE(billings, '[]'::jsonb),
COALESCE(usage, '{}'::jsonb), COALESCE(metrics, '{}'::jsonb), COALESCE(billing_summary, '{}'::jsonb),
COALESCE(final_charge_amount, 0)::float8, COALESCE(response_started_at::text, ''),
COALESCE(response_finished_at::text, ''), COALESCE(response_duration_ms, 0), COALESCE(error, ''),
created_at, updated_at, COALESCE(finished_at::text, '')`
type TaskEvent struct {
ID string `json:"id"`
TaskID string `json:"taskId"`
@ -335,9 +420,9 @@ type TaskEvent struct {
func (s *Store) ListPlatforms(ctx context.Context) ([]Platform, error) {
rows, err := s.pool.Query(ctx, `
SELECT id::text, provider, platform_key, name, COALESCE(base_url, ''), auth_type, status, priority,
SELECT id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority,
default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''),
config, created_at, updated_at
config, credentials, retry_policy, rate_limit_policy, created_at, updated_at
FROM integration_platforms
ORDER BY priority ASC, created_at DESC`)
if err != nil {
@ -349,11 +434,15 @@ ORDER BY priority ASC, created_at DESC`)
for rows.Next() {
var platform Platform
var configBytes []byte
var credentialsBytes []byte
var retryPolicyBytes []byte
var rateLimitPolicyBytes []byte
if err := rows.Scan(
&platform.ID,
&platform.Provider,
&platform.PlatformKey,
&platform.Name,
&platform.InternalName,
&platform.BaseURL,
&platform.AuthType,
&platform.Status,
@ -362,20 +451,28 @@ ORDER BY priority ASC, created_at DESC`)
&platform.DefaultDiscountFactor,
&platform.PricingRuleSetID,
&configBytes,
&credentialsBytes,
&retryPolicyBytes,
&rateLimitPolicyBytes,
&platform.CreatedAt,
&platform.UpdatedAt,
); err != nil {
return nil, err
}
platform.Config = decodeObject(configBytes)
platform.CredentialsPreview = maskCredentialsPreview(credentialsBytes)
platform.RetryPolicy = decodeObject(retryPolicyBytes)
platform.RateLimitPolicy = decodeObject(rateLimitPolicyBytes)
platforms = append(platforms, platform)
}
return platforms, rows.Err()
}
func (s *Store) CreatePlatform(ctx context.Context, input CreatePlatformInput) (Platform, error) {
credentials, _ := json.Marshal(input.Credentials)
config, _ := json.Marshal(input.Config)
credentials, _ := json.Marshal(emptyObjectIfNil(input.Credentials))
config, _ := json.Marshal(emptyObjectIfNil(input.Config))
retryPolicy, _ := json.Marshal(emptyObjectIfNil(input.RetryPolicy))
rateLimitPolicy, _ := json.Marshal(emptyObjectIfNil(input.RateLimitPolicy))
if input.DefaultPricingMode == "" {
input.DefaultPricingMode = "inherit_discount"
}
@ -387,17 +484,31 @@ func (s *Store) CreatePlatform(ctx context.Context, input CreatePlatformInput) (
}
var platform Platform
var configBytes []byte
var credentialsResultBytes []byte
var retryPolicyBytes []byte
var rateLimitPolicyBytes []byte
err := s.pool.QueryRow(ctx, `
INSERT INTO integration_platforms (provider, platform_key, name, base_url, auth_type, credentials, config, default_pricing_mode, default_discount_factor, pricing_rule_set_id, priority)
VALUES ($1, COALESCE(NULLIF($2, ''), 'platform_' || replace(gen_random_uuid()::text, '-', '')), $3, $4, $5, $6, $7, $8, $9, NULLIF($10, '')::uuid, $11)
RETURNING id::text, provider, platform_key, name, COALESCE(base_url, ''), auth_type, status, priority,
default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''), config, created_at, updated_at`,
input.Provider, input.PlatformKey, input.Name, input.BaseURL, input.AuthType, credentials, config, input.DefaultPricingMode, input.DefaultDiscountFactor, input.PricingRuleSetID, input.Priority,
INSERT INTO integration_platforms (
provider, platform_key, name, internal_name, base_url, auth_type, credentials, config,
default_pricing_mode, default_discount_factor, pricing_rule_set_id,
priority, retry_policy, rate_limit_policy
)
VALUES (
$1, COALESCE(NULLIF($2, ''), 'platform_' || replace(gen_random_uuid()::text, '-', '')), $3, NULLIF($4, ''), $5, $6, $7, $8,
$9, $10, NULLIF($11, '')::uuid, $12, $13, $14
)
RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority,
default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''),
config, credentials, retry_policy, rate_limit_policy, created_at, updated_at`,
input.Provider, input.PlatformKey, input.Name, strings.TrimSpace(input.InternalName), input.BaseURL, input.AuthType, credentials, config,
input.DefaultPricingMode, input.DefaultDiscountFactor, input.PricingRuleSetID, input.Priority,
string(retryPolicy), string(rateLimitPolicy),
).Scan(
&platform.ID,
&platform.Provider,
&platform.PlatformKey,
&platform.Name,
&platform.InternalName,
&platform.BaseURL,
&platform.AuthType,
&platform.Status,
@ -406,6 +517,9 @@ RETURNING id::text, provider, platform_key, name, COALESCE(base_url, ''), auth_t
&platform.DefaultDiscountFactor,
&platform.PricingRuleSetID,
&configBytes,
&credentialsResultBytes,
&retryPolicyBytes,
&rateLimitPolicyBytes,
&platform.CreatedAt,
&platform.UpdatedAt,
)
@ -413,19 +527,143 @@ RETURNING id::text, provider, platform_key, name, COALESCE(base_url, ''), auth_t
return Platform{}, err
}
platform.Config = decodeObject(configBytes)
platform.CredentialsPreview = maskCredentialsPreview(credentialsResultBytes)
platform.RetryPolicy = decodeObject(retryPolicyBytes)
platform.RateLimitPolicy = decodeObject(rateLimitPolicyBytes)
return platform, nil
}
func (s *Store) UpdatePlatform(ctx context.Context, id string, input CreatePlatformInput) (Platform, error) {
var credentials any
if input.Credentials != nil {
credentialsBytes, _ := json.Marshal(emptyObjectIfNil(input.Credentials))
credentials = string(credentialsBytes)
}
config, _ := json.Marshal(emptyObjectIfNil(input.Config))
retryPolicy, _ := json.Marshal(emptyObjectIfNil(input.RetryPolicy))
rateLimitPolicy, _ := json.Marshal(emptyObjectIfNil(input.RateLimitPolicy))
if input.DefaultPricingMode == "" {
input.DefaultPricingMode = "inherit_discount"
}
if input.DefaultDiscountFactor == 0 {
input.DefaultDiscountFactor = 1
}
if input.Priority == 0 {
input.Priority = 100
}
var platform Platform
var configBytes []byte
var credentialsResultBytes []byte
var retryPolicyBytes []byte
var rateLimitPolicyBytes []byte
err := s.pool.QueryRow(ctx, `
UPDATE integration_platforms
SET provider = $2,
platform_key = COALESCE(NULLIF($3, ''), platform_key),
name = $4,
internal_name = NULLIF($5, ''),
base_url = $6,
auth_type = $7,
credentials = CASE
WHEN $8::jsonb IS NULL THEN credentials
WHEN $8::jsonb = '{}'::jsonb THEN '{}'::jsonb
ELSE credentials || $8::jsonb
END,
config = $9,
default_pricing_mode = $10,
default_discount_factor = $11,
pricing_rule_set_id = NULLIF($12, '')::uuid,
priority = $13,
retry_policy = $14,
rate_limit_policy = $15,
updated_at = now()
WHERE id = $1::uuid
RETURNING id::text, provider, platform_key, name, COALESCE(internal_name, ''), COALESCE(base_url, ''), auth_type, status, priority,
default_pricing_mode, default_discount_factor::float8, COALESCE(pricing_rule_set_id::text, ''),
config, credentials, retry_policy, rate_limit_policy, created_at, updated_at`,
id,
input.Provider,
input.PlatformKey,
input.Name,
strings.TrimSpace(input.InternalName),
input.BaseURL,
input.AuthType,
credentials,
string(config),
input.DefaultPricingMode,
input.DefaultDiscountFactor,
input.PricingRuleSetID,
input.Priority,
string(retryPolicy),
string(rateLimitPolicy),
).Scan(
&platform.ID,
&platform.Provider,
&platform.PlatformKey,
&platform.Name,
&platform.InternalName,
&platform.BaseURL,
&platform.AuthType,
&platform.Status,
&platform.Priority,
&platform.DefaultPricingMode,
&platform.DefaultDiscountFactor,
&platform.PricingRuleSetID,
&configBytes,
&credentialsResultBytes,
&retryPolicyBytes,
&rateLimitPolicyBytes,
&platform.CreatedAt,
&platform.UpdatedAt,
)
if err != nil {
return Platform{}, err
}
platform.Config = decodeObject(configBytes)
platform.CredentialsPreview = maskCredentialsPreview(credentialsResultBytes)
platform.RetryPolicy = decodeObject(retryPolicyBytes)
platform.RateLimitPolicy = decodeObject(rateLimitPolicyBytes)
return platform, nil
}
func (s *Store) DeletePlatform(ctx context.Context, id string) error {
result, err := s.pool.Exec(ctx, `DELETE FROM integration_platforms WHERE id = $1::uuid`, id)
if err != nil {
return err
}
if result.RowsAffected() == 0 {
return pgx.ErrNoRows
}
return nil
}
func (s *Store) ListModels(ctx context.Context) ([]PlatformModel, error) {
return s.listModels(ctx, "")
}
func (s *Store) ListPlatformModels(ctx context.Context, platformID string) ([]PlatformModel, error) {
return s.listModels(ctx, strings.TrimSpace(platformID))
}
func (s *Store) listModels(ctx context.Context, platformID string) ([]PlatformModel, error) {
args := []any{}
where := ""
if platformID != "" {
where = "WHERE m.platform_id = $1::uuid"
args = append(args, platformID)
}
rows, err := s.pool.Query(ctx, `
SELECT m.id::text, m.platform_id::text, COALESCE(m.base_model_id::text, ''), p.provider, p.name,
m.model_name, COALESCE(m.model_alias, ''), m.model_type, m.display_name,
m.capability_override, m.capabilities, m.pricing_mode, COALESCE(m.discount_factor, 0)::float8,
COALESCE(m.pricing_rule_set_id::text, ''), m.billing_config_override, m.billing_config,
m.permission_config, m.retry_policy, m.rate_limit_policy, COALESCE(m.runtime_policy_set_id::text, ''), m.runtime_policy_override,
m.enabled, m.created_at, m.updated_at
FROM platform_models m
JOIN integration_platforms p ON p.id = m.platform_id
ORDER BY m.model_type ASC, m.model_name ASC`)
`+where+`
ORDER BY m.model_type ASC, m.model_name ASC`, args...)
if err != nil {
return nil, err
}
@ -438,6 +676,10 @@ ORDER BY m.model_type ASC, m.model_name ASC`)
var capabilities []byte
var billingConfigOverride []byte
var billingConfig []byte
var permissionConfig []byte
var retryPolicy []byte
var rateLimitPolicy []byte
var runtimePolicyOverride []byte
if err := rows.Scan(
&model.ID,
&model.PlatformID,
@ -455,6 +697,11 @@ ORDER BY m.model_type ASC, m.model_name ASC`)
&model.PricingRuleSetID,
&billingConfigOverride,
&billingConfig,
&permissionConfig,
&retryPolicy,
&rateLimitPolicy,
&model.RuntimePolicySetID,
&runtimePolicyOverride,
&model.Enabled,
&model.CreatedAt,
&model.UpdatedAt,
@ -465,6 +712,10 @@ ORDER BY m.model_type ASC, m.model_name ASC`)
model.Capabilities = decodeObject(capabilities)
model.BillingConfigOverride = decodeObject(billingConfigOverride)
model.BillingConfig = decodeObject(billingConfig)
model.PermissionConfig = decodeObject(permissionConfig)
model.RetryPolicy = decodeObject(retryPolicy)
model.RateLimitPolicy = decodeObject(rateLimitPolicy)
model.RuntimePolicyOverride = decodeObject(runtimePolicyOverride)
models = append(models, model)
}
return models, rows.Err()
@ -731,6 +982,60 @@ ORDER BY created_at DESC`, gatewayUserID)
return items, rows.Err()
}
func (s *Store) ListPlayableAPIKeys(ctx context.Context, user *auth.User) ([]PlayableAPIKey, error) {
items, err := s.listRecoverableAPIKeys(ctx, user)
if err != nil {
return nil, err
}
if len(items) > 0 {
return items, nil
}
created, err := s.CreateAPIKey(ctx, CreateAPIKeyInput{
Name: "Playground API Key",
Scopes: []string{"chat", "image", "video"},
}, user)
if err != nil {
return nil, err
}
return []PlayableAPIKey{{APIKey: created.APIKey, Secret: created.Secret}}, nil
}
func (s *Store) listRecoverableAPIKeys(ctx context.Context, user *auth.User) ([]PlayableAPIKey, error) {
gatewayUserID := localGatewayUserID(user)
if gatewayUserID == "" {
return nil, ErrLocalUserRequired
}
rows, err := s.pool.Query(ctx, `
SELECT id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text,
COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''),
key_prefix, name, scopes, COALESCE(user_group_id::text, ''),
rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''),
COALESCE(last_used_at::text, ''), created_at, updated_at, COALESCE(key_secret, '')
FROM gateway_api_keys
WHERE gateway_user_id = $1::uuid
AND status = 'active'
AND deleted_at IS NULL
AND COALESCE(key_secret, '') <> ''
AND (expires_at IS NULL OR expires_at > now())
ORDER BY created_at DESC`, gatewayUserID)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]PlayableAPIKey, 0)
for rows.Next() {
var item PlayableAPIKey
apiKey, err := scanAPIKeyWithSecret(rows, &item.Secret)
if err != nil {
return nil, err
}
item.APIKey = apiKey
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) CreateAPIKey(ctx context.Context, input CreateAPIKeyInput, user *auth.User) (CreatedAPIKey, error) {
gatewayUserID := localGatewayUserID(user)
if gatewayUserID == "" {
@ -764,17 +1069,17 @@ func (s *Store) CreateAPIKey(ctx context.Context, input CreateAPIKeyInput, user
err = s.pool.QueryRow(ctx, `
INSERT INTO gateway_api_keys (
gateway_tenant_id, gateway_user_id, tenant_id, tenant_key, user_id,
key_prefix, key_hash, name, scopes, expires_at
key_prefix, key_secret, key_hash, name, scopes, expires_at
)
VALUES (NULLIF($1, '')::uuid, $2::uuid, NULLIF($3, ''), NULLIF($4, ''), NULLIF($5, ''),
$6, $7, $8, $9::jsonb, NULLIF($10, '')::timestamptz)
$6, $7, $8, $9, $10::jsonb, NULLIF($11, '')::timestamptz)
RETURNING id::text, COALESCE(gateway_tenant_id::text, ''), gateway_user_id::text,
COALESCE(tenant_id, ''), COALESCE(tenant_key, ''), COALESCE(user_id, ''),
key_prefix, name, scopes, COALESCE(user_group_id::text, ''),
rate_limit_policy, quota_policy, status, COALESCE(expires_at::text, ''),
COALESCE(last_used_at::text, ''), created_at, updated_at`,
user.GatewayTenantID, gatewayUserID, user.TenantID, user.TenantKey, user.ID,
apiKeyPrefix(secret), string(keyHash), name, string(scopesJSON), strings.TrimSpace(input.ExpiresAt),
apiKeyPrefix(secret), secret, string(keyHash), name, string(scopesJSON), strings.TrimSpace(input.ExpiresAt),
).Scan(
&item.ID,
&item.GatewayTenantID,
@ -856,7 +1161,7 @@ func (s *Store) VerifyLocalAPIKey(ctx context.Context, secret string) (*auth.Use
return nil, auth.ErrUnauthorized
}
rows, err := s.pool.Query(ctx, `
SELECT k.id::text, k.key_hash, COALESCE(k.user_group_id::text, ''),
SELECT k.id::text, k.key_hash, k.key_prefix, k.name, COALESCE(k.user_group_id::text, ''),
u.id::text, u.username, u.roles, COALESCE(u.gateway_tenant_id::text, ''),
COALESCE(u.tenant_id, ''), COALESCE(u.tenant_key, '')
FROM gateway_api_keys k
@ -875,6 +1180,8 @@ WHERE k.key_prefix = $1
for rows.Next() {
var apiKeyID string
var hash string
var keyPrefix string
var keyName string
var userGroupID string
var gatewayUserID string
var username string
@ -882,7 +1189,7 @@ WHERE k.key_prefix = $1
var gatewayTenantID string
var tenantID string
var tenantKey string
if err := rows.Scan(&apiKeyID, &hash, &userGroupID, &gatewayUserID, &username, &rolesBytes, &gatewayTenantID, &tenantID, &tenantKey); err != nil {
if err := rows.Scan(&apiKeyID, &hash, &keyPrefix, &keyName, &userGroupID, &gatewayUserID, &username, &rolesBytes, &gatewayTenantID, &tenantID, &tenantKey); err != nil {
return nil, err
}
if bcrypt.CompareHashAndPassword([]byte(hash), []byte(secret)) != nil {
@ -902,7 +1209,8 @@ WHERE k.key_prefix = $1
GatewayUserID: gatewayUserID,
UserGroupID: userGroupID,
APIKeyID: apiKeyID,
APIKeyName: prefix,
APIKeyName: keyName,
APIKeyPrefix: keyPrefix,
}, nil
}
if err := rows.Err(); err != nil {
@ -953,7 +1261,7 @@ SELECT NOT EXISTS (
return GatewayUser{}, err
}
if isBootstrapUser {
role = "admin"
role = "manager"
}
if err := tx.QueryRow(ctx, `
INSERT INTO gateway_tenants (tenant_key, source, external_tenant_id, name)
@ -1197,21 +1505,16 @@ func (s *Store) CreateTask(ctx context.Context, input CreateTaskInput, user *aut
}
defer tx.Rollback(ctx)
var task GatewayTask
var requestBytes []byte
var resultBytes []byte
var billingsBytes []byte
err = tx.QueryRow(ctx, `
task, err := scanGatewayTask(tx.QueryRow(ctx, `
INSERT INTO gateway_tasks (
kind, run_mode, user_id, gateway_user_id, user_source, gateway_tenant_id, tenant_id, tenant_key,
api_key_id, user_group_id, user_group_key, model, request, status, result, billings, finished_at
api_key_id, api_key_name, api_key_prefix, user_group_id, user_group_key,
model, requested_model, request, status, result, billings, finished_at
)
VALUES ($1, $2, $3, NULLIF($4, '')::uuid, COALESCE(NULLIF($5, ''), 'gateway'), NULLIF($6, '')::uuid, NULLIF($7, ''), NULLIF($8, ''), NULLIF($9, ''), NULLIF($10, '')::uuid, NULLIF($11, ''), $12, $13, $14, $15::jsonb, $16::jsonb, CASE WHEN $17 THEN now() ELSE NULL END)
RETURNING id::text, kind, run_mode, user_id, COALESCE(gateway_user_id::text, ''), user_source,
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model, request, status, result, billings, COALESCE(error, ''), created_at, updated_at`,
input.Kind, runMode, user.ID, user.GatewayUserID, user.Source, user.GatewayTenantID, user.TenantID, user.TenantKey, user.APIKeyID, user.UserGroupID, user.UserGroupKey, input.Model, requestBody, status, resultBody, billingsBody, false,
).Scan(&task.ID, &task.Kind, &task.RunMode, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt)
VALUES ($1, $2, $3, NULLIF($4, '')::uuid, COALESCE(NULLIF($5, ''), 'gateway'), NULLIF($6, '')::uuid, NULLIF($7, ''), NULLIF($8, ''), NULLIF($9, ''), NULLIF($10, ''), NULLIF($11, ''), NULLIF($12, '')::uuid, NULLIF($13, ''), $14, $14, $15, $16, $17::jsonb, $18::jsonb, CASE WHEN $19 THEN now() ELSE NULL END)
RETURNING `+gatewayTaskColumns,
input.Kind, runMode, user.ID, user.GatewayUserID, user.Source, user.GatewayTenantID, user.TenantID, user.TenantKey, user.APIKeyID, user.APIKeyName, user.APIKeyPrefix, user.UserGroupID, user.UserGroupKey, input.Model, requestBody, status, resultBody, billingsBody, false,
))
if err != nil {
return GatewayTask{}, err
}
@ -1229,30 +1532,77 @@ VALUES ($1::uuid, $2, $3, NULLIF($4, ''), NULLIF($5, ''), $6, NULLIF($7, ''), $8
if err := tx.Commit(ctx); err != nil {
return GatewayTask{}, err
}
task.Request = decodeObject(requestBytes)
task.Result = decodeObject(resultBytes)
task.Billings = decodeArray(billingsBytes)
return task, nil
}
func (s *Store) GetTask(ctx context.Context, taskID string) (GatewayTask, error) {
task, err := scanGatewayTask(s.pool.QueryRow(ctx, `
SELECT `+gatewayTaskColumns+`
FROM gateway_tasks
WHERE id=$1`, taskID,
))
if err != nil {
return GatewayTask{}, err
}
return task, nil
}
type taskScanner interface {
Scan(dest ...any) error
}
func scanGatewayTask(scanner taskScanner) (GatewayTask, error) {
var task GatewayTask
var requestBytes []byte
var resultBytes []byte
var billingsBytes []byte
err := s.pool.QueryRow(ctx, `
SELECT id::text, kind, run_mode, user_id, COALESCE(gateway_user_id::text, ''), user_source,
COALESCE(gateway_tenant_id::text, ''), COALESCE(tenant_id, ''), COALESCE(tenant_key, ''),
COALESCE(user_group_id::text, ''), COALESCE(user_group_key, ''), model, request, status, result, billings, COALESCE(error, ''), created_at, updated_at
FROM gateway_tasks
WHERE id=$1`, taskID,
).Scan(&task.ID, &task.Kind, &task.RunMode, &task.UserID, &task.GatewayUserID, &task.UserSource, &task.GatewayTenantID, &task.TenantID, &task.TenantKey, &task.UserGroupID, &task.UserGroupKey, &task.Model, &requestBytes, &task.Status, &resultBytes, &billingsBytes, &task.Error, &task.CreatedAt, &task.UpdatedAt)
if err != nil {
var usageBytes []byte
var metricsBytes []byte
var billingSummaryBytes []byte
if err := scanner.Scan(
&task.ID,
&task.Kind,
&task.RunMode,
&task.UserID,
&task.GatewayUserID,
&task.UserSource,
&task.GatewayTenantID,
&task.TenantID,
&task.TenantKey,
&task.APIKeyID,
&task.APIKeyName,
&task.APIKeyPrefix,
&task.UserGroupID,
&task.UserGroupKey,
&task.Model,
&task.ModelType,
&task.RequestedModel,
&task.ResolvedModel,
&task.RequestID,
&requestBytes,
&task.Status,
&resultBytes,
&billingsBytes,
&usageBytes,
&metricsBytes,
&billingSummaryBytes,
&task.FinalChargeAmount,
&task.ResponseStartedAt,
&task.ResponseFinishedAt,
&task.ResponseDurationMS,
&task.Error,
&task.CreatedAt,
&task.UpdatedAt,
&task.FinishedAt,
); err != nil {
return GatewayTask{}, err
}
task.Request = decodeObject(requestBytes)
task.Result = decodeObject(resultBytes)
task.Billings = decodeArray(billingsBytes)
task.Usage = decodeObject(usageBytes)
task.Metrics = decodeObject(metricsBytes)
task.BillingSummary = decodeObject(billingSummaryBytes)
return task, nil
}
@ -1342,6 +1692,39 @@ func scanAPIKey(scanner apiKeyScanner) (APIKey, error) {
return item, nil
}
func scanAPIKeyWithSecret(scanner apiKeyScanner, secret *string) (APIKey, error) {
var item APIKey
var scopesBytes []byte
var rateLimitPolicy []byte
var quotaPolicy []byte
if err := scanner.Scan(
&item.ID,
&item.GatewayTenantID,
&item.GatewayUserID,
&item.TenantID,
&item.TenantKey,
&item.UserID,
&item.KeyPrefix,
&item.Name,
&scopesBytes,
&item.UserGroupID,
&rateLimitPolicy,
&quotaPolicy,
&item.Status,
&item.ExpiresAt,
&item.LastUsedAt,
&item.CreatedAt,
&item.UpdatedAt,
secret,
); err != nil {
return APIKey{}, err
}
item.Scopes = decodeStringArray(scopesBytes)
item.RateLimitPolicy = decodeObject(rateLimitPolicy)
item.QuotaPolicy = decodeObject(quotaPolicy)
return item, nil
}
func localGatewayUserID(user *auth.User) string {
if user == nil {
return ""
@ -1477,6 +1860,50 @@ func decodeObject(bytes []byte) map[string]any {
return out
}
func maskCredentialsPreview(bytes []byte) map[string]any {
credentials := decodeObject(bytes)
if len(credentials) == 0 {
return nil
}
out := make(map[string]any, len(credentials))
for key, value := range credentials {
out[key] = maskCredentialValue(value)
}
return out
}
func maskCredentialValue(value any) any {
switch typed := value.(type) {
case string:
return maskSecret(typed)
case map[string]any:
out := make(map[string]any, len(typed))
for key, nested := range typed {
out[key] = maskCredentialValue(nested)
}
return out
case []any:
out := make([]any, 0, len(typed))
for _, nested := range typed {
out = append(out, maskCredentialValue(nested))
}
return out
default:
return value
}
}
func maskSecret(value string) string {
value = strings.TrimSpace(value)
if value == "" {
return ""
}
if len(value) <= 6 {
return strings.Repeat("*", len(value))
}
return value[:3] + strings.Repeat("*", len(value)-6) + value[len(value)-3:]
}
func decodeArray(bytes []byte) []any {
if len(bytes) == 0 {
return nil

View File

@ -1,6 +1,9 @@
package store
import "errors"
import (
"errors"
"time"
)
var (
ErrNoModelCandidate = errors.New("no enabled platform model matches request")
@ -25,6 +28,8 @@ type CreatePlatformModelInput struct {
PermissionConfig map[string]any `json:"permissionConfig"`
RetryPolicy map[string]any `json:"retryPolicy"`
RateLimitPolicy map[string]any `json:"rateLimitPolicy"`
RuntimePolicySetID string `json:"runtimePolicySetId"`
RuntimePolicyOverride map[string]any `json:"runtimePolicyOverride"`
Enabled bool `json:"enabled"`
}
@ -33,6 +38,7 @@ type RuntimeModelCandidate struct {
PlatformKey string
PlatformName string
Provider string
SpecType string
BaseURL string
AuthType string
Credentials map[string]any
@ -55,12 +61,15 @@ type RuntimeModelCandidate struct {
BaseBillingConfig map[string]any
BillingConfig map[string]any
BillingConfigOverride map[string]any
PermissionConfig map[string]any
PricingMode string
DiscountFactor float64
PlatformPricingRuleSetID string
ModelPricingRuleSetID string
ModelRetryPolicy map[string]any
ModelRateLimitPolicy map[string]any
RuntimePolicySetID string
RuntimePolicyOverride map[string]any
ClientID string
QueueKey string
}
@ -92,10 +101,42 @@ type CreateTaskAttemptInput struct {
}
type FinishTaskAttemptInput struct {
AttemptID string
Status string
Retryable bool
ResponseSnapshot map[string]any
ErrorCode string
ErrorMessage string
AttemptID string
Status string
Retryable bool
RequestID string
Usage map[string]any
Metrics map[string]any
ResponseSnapshot map[string]any
ResponseStartedAt time.Time
ResponseFinishedAt time.Time
ResponseDurationMS int64
ErrorCode string
ErrorMessage string
}
type FinishTaskSuccessInput struct {
TaskID string
Result map[string]any
Billings []any
RequestID string
ResolvedModel string
Usage map[string]any
Metrics map[string]any
BillingSummary map[string]any
FinalChargeAmount float64
ResponseStartedAt time.Time
ResponseFinishedAt time.Time
ResponseDurationMS int64
}
type FinishTaskFailureInput struct {
TaskID string
Code string
Message string
RequestID string
Metrics map[string]any
ResponseStartedAt time.Time
ResponseFinishedAt time.Time
ResponseDurationMS int64
}

View File

@ -3,6 +3,7 @@ package store
import (
"context"
"encoding/json"
"time"
)
func (s *Store) MarkTaskRunning(ctx context.Context, taskID string, modelType string, normalizedRequest map[string]any) error {
@ -62,57 +63,118 @@ WHERE id = $1::uuid`, input.TaskID, input.AttemptNo); err != nil {
func (s *Store) FinishTaskAttempt(ctx context.Context, input FinishTaskAttemptInput) error {
responseJSON, _ := json.Marshal(emptyObjectIfNil(input.ResponseSnapshot))
usageJSON, _ := json.Marshal(emptyObjectIfNil(input.Usage))
metricsJSON, _ := json.Marshal(emptyObjectIfNil(input.Metrics))
_, err := s.pool.Exec(ctx, `
UPDATE gateway_task_attempts
SET status = $2,
retryable = $3,
response_snapshot = $4::jsonb,
error_code = NULLIF($5, ''),
error_message = NULLIF($6, ''),
request_id = NULLIF($4, ''),
usage = $5::jsonb,
metrics = $6::jsonb,
response_snapshot = $7::jsonb,
response_started_at = $8::timestamptz,
response_finished_at = $9::timestamptz,
response_duration_ms = $10,
error_code = NULLIF($11, ''),
error_message = NULLIF($12, ''),
finished_at = now()
WHERE id = $1::uuid`,
input.AttemptID,
input.Status,
input.Retryable,
input.RequestID,
string(usageJSON),
string(metricsJSON),
string(responseJSON),
nullableTime(input.ResponseStartedAt),
nullableTime(input.ResponseFinishedAt),
input.ResponseDurationMS,
input.ErrorCode,
input.ErrorMessage,
)
return err
}
func (s *Store) FinishTaskSuccess(ctx context.Context, taskID string, result map[string]any, billings []any) (GatewayTask, error) {
resultJSON, _ := json.Marshal(emptyObjectIfNil(result))
billingsJSON, _ := json.Marshal(billings)
func (s *Store) FinishTaskSuccess(ctx context.Context, input FinishTaskSuccessInput) (GatewayTask, error) {
resultJSON, _ := json.Marshal(emptyObjectIfNil(input.Result))
billingsJSON, _ := json.Marshal(input.Billings)
usageJSON, _ := json.Marshal(emptyObjectIfNil(input.Usage))
metricsJSON, _ := json.Marshal(emptyObjectIfNil(input.Metrics))
billingSummaryJSON, _ := json.Marshal(emptyObjectIfNil(input.BillingSummary))
if _, err := s.pool.Exec(ctx, `
UPDATE gateway_tasks
SET status = 'succeeded',
result = $2::jsonb,
billings = $3::jsonb,
request_id = NULLIF($4, ''),
resolved_model = NULLIF($5, ''),
usage = $6::jsonb,
metrics = $7::jsonb,
billing_summary = $8::jsonb,
final_charge_amount = $9,
response_started_at = $10::timestamptz,
response_finished_at = $11::timestamptz,
response_duration_ms = $12,
error = NULL,
error_code = NULL,
error_message = NULL,
finished_at = now(),
updated_at = now()
WHERE id = $1::uuid`, taskID, string(resultJSON), string(billingsJSON)); err != nil {
WHERE id = $1::uuid`,
input.TaskID,
string(resultJSON),
string(billingsJSON),
input.RequestID,
input.ResolvedModel,
string(usageJSON),
string(metricsJSON),
string(billingSummaryJSON),
input.FinalChargeAmount,
nullableTime(input.ResponseStartedAt),
nullableTime(input.ResponseFinishedAt),
input.ResponseDurationMS,
); err != nil {
return GatewayTask{}, err
}
return s.GetTask(ctx, taskID)
return s.GetTask(ctx, input.TaskID)
}
func (s *Store) FinishTaskFailure(ctx context.Context, taskID string, code string, message string) (GatewayTask, error) {
func (s *Store) FinishTaskFailure(ctx context.Context, input FinishTaskFailureInput) (GatewayTask, error) {
metricsJSON, _ := json.Marshal(emptyObjectIfNil(input.Metrics))
if _, err := s.pool.Exec(ctx, `
UPDATE gateway_tasks
SET status = 'failed',
error = NULLIF($2, ''),
error_code = NULLIF($3, ''),
error_message = NULLIF($2, ''),
request_id = NULLIF($4, ''),
metrics = $5::jsonb,
response_started_at = $6::timestamptz,
response_finished_at = $7::timestamptz,
response_duration_ms = $8,
finished_at = now(),
updated_at = now()
WHERE id = $1::uuid`, taskID, message, code); err != nil {
WHERE id = $1::uuid`,
input.TaskID,
input.Message,
input.Code,
input.RequestID,
string(metricsJSON),
nullableTime(input.ResponseStartedAt),
nullableTime(input.ResponseFinishedAt),
input.ResponseDurationMS,
); err != nil {
return GatewayTask{}, err
}
return s.GetTask(ctx, taskID)
return s.GetTask(ctx, input.TaskID)
}
func nullableTime(value time.Time) any {
if value.IsZero() {
return nil
}
return value
}
func (s *Store) AddTaskEvent(ctx context.Context, taskID string, eventType string, status string, phase string, progress float64, message string, payload map[string]any, simulated bool) (TaskEvent, error) {

View File

@ -0,0 +1,72 @@
ALTER TABLE IF EXISTS gateway_tasks
ADD COLUMN IF NOT EXISTS api_key_name text,
ADD COLUMN IF NOT EXISTS api_key_prefix text,
ADD COLUMN IF NOT EXISTS requested_model text,
ADD COLUMN IF NOT EXISTS resolved_model text,
ADD COLUMN IF NOT EXISTS request_id text,
ADD COLUMN IF NOT EXISTS usage jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS metrics jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS billing_summary jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS final_charge_amount numeric NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS response_started_at timestamptz,
ADD COLUMN IF NOT EXISTS response_finished_at timestamptz,
ADD COLUMN IF NOT EXISTS response_duration_ms bigint;
ALTER TABLE IF EXISTS gateway_task_attempts
ADD COLUMN IF NOT EXISTS request_id text,
ADD COLUMN IF NOT EXISTS usage jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS metrics jsonb NOT NULL DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS response_started_at timestamptz,
ADD COLUMN IF NOT EXISTS response_finished_at timestamptz,
ADD COLUMN IF NOT EXISTS response_duration_ms bigint;
CREATE INDEX IF NOT EXISTS idx_gateway_tasks_request_id
ON gateway_tasks(request_id)
WHERE request_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_gateway_tasks_api_key_created
ON gateway_tasks(api_key_id, created_at DESC)
WHERE api_key_id IS NOT NULL;
UPDATE gateway_tasks
SET requested_model = model
WHERE requested_model IS NULL;
UPDATE gateway_tasks AS t
SET api_key_name = COALESCE(t.api_key_name, k.name),
api_key_prefix = COALESCE(t.api_key_prefix, k.key_prefix)
FROM gateway_api_keys AS k
WHERE t.api_key_id = k.id::text;
WITH billing_totals AS (
SELECT
t.id,
COUNT(line.value) AS line_count,
COALESCE(SUM(
CASE
WHEN jsonb_typeof(line.value) = 'object'
AND line.value ? 'amount'
AND line.value->>'amount' ~ '^-?[0-9]+(\.[0-9]+)?$'
THEN (line.value->>'amount')::numeric
ELSE 0
END
), 0) AS total_amount
FROM gateway_tasks AS t
LEFT JOIN LATERAL jsonb_array_elements(COALESCE(t.billings, '[]'::jsonb)) AS line(value) ON true
GROUP BY t.id
)
UPDATE gateway_tasks AS t
SET final_charge_amount = billing_totals.total_amount,
billing_summary = jsonb_build_object(
'lineCount', billing_totals.line_count,
'totalAmount', billing_totals.total_amount,
'currency', 'resource',
'finalCharge', jsonb_build_object(
'amount', billing_totals.total_amount,
'currency', 'resource',
'simulated', COALESCE(t.run_mode = 'simulation', false)
)
)
FROM billing_totals
WHERE t.id = billing_totals.id
AND COALESCE(t.billing_summary, '{}'::jsonb) = '{}'::jsonb;

View File

@ -2,7 +2,6 @@ import type { FormEvent, ReactNode } from 'react';
import { CreditCard, KeyRound, ListChecks, UserRound } from 'lucide-react';
import type { ConsoleData } from '../app-state';
import { EntityTable } from '../components/EntityTable';
import { PageHeader } from '../components/PageHeader';
import { Badge, Button, Card, CardContent, CardHeader, CardTitle, Input, Label, Tabs } from '../components/ui';
import type { ApiKeyForm, LoadState, WorkspaceSection } from '../types';
@ -22,10 +21,10 @@ export function WorkspacePage(props: {
onApiKeyFormChange: (value: ApiKeyForm) => void;
onSectionChange: (value: WorkspaceSection) => void;
onSubmitApiKey: (event: FormEvent<HTMLFormElement>) => void;
onUseApiKeyForPlayground: (apiKeyId?: string) => void;
}) {
return (
<div className="pageStack">
<PageHeader eyebrow="Workspace" title="用户工作台" description="个人资产、API Key 和任务记录。" />
<div className="subPageLayout">
<Tabs className="sideTabs" value={props.section} tabs={tabs} onValueChange={props.onSectionChange} />
<div className="subPageContent">
@ -106,7 +105,9 @@ function ApiKeyPanel(props: {
state: LoadState;
onApiKeyFormChange: (value: ApiKeyForm) => void;
onSubmitApiKey: (event: FormEvent<HTMLFormElement>) => void;
onUseApiKeyForPlayground: (apiKeyId?: string) => void;
}) {
const latestUsableKey = props.apiKeySecret ? props.data.apiKeys[0] : undefined;
return (
<section className="contentGrid two">
<Card>
@ -123,7 +124,14 @@ function ApiKeyPanel(props: {
<KeyRound size={15} />
</Button>
{props.apiKeySecret && <code className="secretBox">{props.apiKeySecret}</code>}
{props.apiKeySecret && (
<div className="createdApiKeyBox">
<code className="secretBox">{props.apiKeySecret}</code>
<Button type="button" variant="secondary" onClick={() => props.onUseApiKeyForPlayground(latestUsableKey?.id)}>
使
</Button>
</div>
)}
</form>
</CardContent>
</Card>
@ -145,6 +153,9 @@ function ApiKeyPanel(props: {
function TaskPanel(props: { data: ConsoleData }) {
const task = props.data.taskResult;
const usage = task?.usage ?? {};
const tokenText = usage.totalTokens ? `${usage.totalTokens}` : '-';
const chargeText = task?.finalChargeAmount ? `${task.finalChargeAmount}` : '-';
return (
<Card>
<CardHeader>
@ -156,7 +167,15 @@ function TaskPanel(props: { data: ConsoleData }) {
<Badge variant={task.status === 'succeeded' ? 'success' : 'secondary'}>{task.status}</Badge>
<strong>{task.kind}</strong>
<span>{task.model}</span>
<pre>{JSON.stringify(task.result ?? {}, null, 2)}</pre>
<div className="infoGrid compact">
<InfoItem label="API Key" value={task.apiKeyName || task.apiKeyId || '-'} />
<InfoItem label="RequestID" value={task.requestId || '-'} />
<InfoItem label="实际模型" value={task.resolvedModel || task.model} />
<InfoItem label="Token" value={tokenText} />
<InfoItem label="扣费" value={chargeText} />
<InfoItem label="响应耗时" value={task.responseDurationMs ? `${task.responseDurationMs}ms` : '-'} />
</div>
<pre>{JSON.stringify({ result: task.result, usage: task.usage, billings: task.billings, billingSummary: task.billingSummary, metrics: task.metrics }, null, 2)}</pre>
</div>
) : (
<div className="emptyState">

View File

@ -407,6 +407,16 @@ strong {
gap: 10px;
}
.infoGrid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(160px, 1fr));
gap: 10px;
}
.infoGrid.compact .infoItem {
padding: 10px;
}
.spanTwo {
grid-column: 1 / -1;
}

View File

@ -16,6 +16,7 @@ export interface AuthUser {
apiKeyId?: string;
apiKeySecret?: string;
apiKeyName?: string;
apiKeyPrefix?: string;
}
export interface AuthResponse {
@ -56,6 +57,7 @@ export interface IntegrationPlatform {
provider: string;
platformKey: string;
name: string;
internalName?: string;
baseUrl?: string;
authType: string;
status: 'enabled' | 'disabled' | string;
@ -64,6 +66,9 @@ export interface IntegrationPlatform {
defaultDiscountFactor: number;
pricingRuleSetId?: string;
config?: Record<string, unknown>;
credentialsPreview?: Record<string, unknown>;
retryPolicy?: RateLimitPolicy | Record<string, unknown>;
rateLimitPolicy?: RateLimitPolicy;
createdAt: string;
updatedAt: string;
}
@ -75,6 +80,7 @@ export type RateLimitMetric =
| 'tpm_input'
| 'tpm_output'
| 'rpm'
| 'rps'
| 'concurrent'
| 'queue_size';
@ -85,6 +91,8 @@ export interface CatalogProvider {
displayName: string;
providerType: string;
iconPath?: string;
defaultBaseUrl?: string;
defaultAuthType?: string;
source?: 'gateway' | 'server-main' | 'sync' | 'server-main.integration-platform' | string;
capabilitySchema?: Record<string, unknown>;
defaultRateLimitPolicy?: RateLimitPolicy;
@ -100,6 +108,8 @@ export interface CatalogProviderUpsertRequest {
displayName: string;
providerType?: string;
iconPath?: string;
defaultBaseUrl?: string;
defaultAuthType?: string;
source?: 'gateway' | 'server-main' | 'sync' | 'server-main.integration-platform' | string;
capabilitySchema?: Record<string, unknown>;
defaultRateLimitPolicy?: RateLimitPolicy;
@ -112,12 +122,19 @@ export interface BaseModelCatalogItem {
providerKey: string;
canonicalModelKey: string;
providerModelName: string;
modelType: 'chat' | 'image' | 'video' | 'audio' | 'embedding' | 'music' | 'digital_human' | 'model_3d' | string;
displayName: string;
modelType: string[];
modelAlias: string;
displayName?: string;
capabilities?: Record<string, unknown>;
baseBillingConfig?: BillingConfig;
defaultRateLimitPolicy?: RateLimitPolicy;
pricingRuleSetId?: string;
runtimePolicySetId?: string;
runtimePolicyOverride?: RuntimePolicyOverride;
metadata?: Record<string, unknown>;
catalogType?: 'system' | 'custom' | string;
defaultSnapshot?: Record<string, unknown>;
customizedAt?: string;
pricingVersion: number;
status: 'active' | 'deprecated' | 'hidden' | string;
createdAt: string;
@ -128,12 +145,18 @@ export interface BaseModelUpsertRequest {
providerKey: string;
canonicalModelKey?: string;
providerModelName: string;
modelType: string;
modelType: string[];
modelAlias?: string;
displayName?: string;
capabilities?: Record<string, unknown>;
baseBillingConfig?: BillingConfig;
defaultRateLimitPolicy?: RateLimitPolicy;
pricingRuleSetId?: string;
runtimePolicySetId?: string;
runtimePolicyOverride?: RuntimePolicyOverride;
metadata?: Record<string, unknown>;
catalogType?: 'system' | 'custom' | string;
defaultSnapshot?: Record<string, unknown>;
pricingVersion?: number;
status?: 'active' | 'deprecated' | 'hidden' | string;
}
@ -213,6 +236,40 @@ export interface PricingRuleSetUpsertRequest {
rules: PricingRuleInput[];
}
export interface RuntimePolicySet {
id: string;
policyKey: string;
name: string;
description?: string;
rateLimitPolicy?: RateLimitPolicy | Record<string, unknown>;
retryPolicy?: Record<string, unknown>;
autoDisablePolicy?: Record<string, unknown>;
degradePolicy?: Record<string, unknown>;
metadata?: Record<string, unknown>;
status: 'active' | 'disabled' | string;
createdAt: string;
updatedAt: string;
}
export interface RuntimePolicySetUpsertRequest {
policyKey: string;
name: string;
description?: string;
rateLimitPolicy?: RateLimitPolicy | Record<string, unknown>;
retryPolicy?: Record<string, unknown>;
autoDisablePolicy?: Record<string, unknown>;
degradePolicy?: Record<string, unknown>;
metadata?: Record<string, unknown>;
status?: 'active' | 'disabled' | string;
}
export interface RuntimePolicyOverride {
rateLimitPolicy?: RateLimitPolicy | Record<string, unknown>;
retryPolicy?: Record<string, unknown>;
autoDisablePolicy?: Record<string, unknown>;
degradePolicy?: Record<string, unknown>;
}
export interface GatewayUser {
id: string;
userKey: string;
@ -238,6 +295,26 @@ export interface GatewayUser {
updatedAt: string;
}
export interface GatewayUserUpsertRequest {
userKey?: string;
source?: 'gateway' | 'server-main' | 'sync' | string;
externalUserId?: string;
username: string;
displayName?: string;
email?: string;
phone?: string;
avatarUrl?: string;
password?: string;
gatewayTenantId?: string;
tenantId?: string;
tenantKey?: string;
defaultUserGroupId?: string;
roles?: string[];
authProfile?: Record<string, unknown>;
metadata?: Record<string, unknown>;
status?: 'active' | 'disabled' | 'locked' | 'deleted' | string;
}
export interface GatewayTenant {
id: string;
tenantKey: string;
@ -258,6 +335,21 @@ export interface GatewayTenant {
updatedAt: string;
}
export interface GatewayTenantUpsertRequest {
tenantKey: string;
source?: 'gateway' | 'server-main' | 'sync' | string;
externalTenantId?: string;
name: string;
description?: string;
defaultUserGroupId?: string;
planKey?: string;
billingProfile?: Record<string, unknown>;
rateLimitPolicy?: RateLimitPolicy;
authPolicy?: Record<string, unknown>;
metadata?: Record<string, unknown>;
status?: 'active' | 'disabled' | 'locked' | 'deleted' | string;
}
export interface UserGroup {
id: string;
groupKey: string;
@ -269,11 +361,26 @@ export interface UserGroup {
billingDiscountPolicy?: Record<string, unknown>;
rateLimitPolicy?: RateLimitPolicy;
quotaPolicy?: Record<string, unknown>;
metadata?: Record<string, unknown>;
status: 'active' | 'disabled' | string;
createdAt: string;
updatedAt: string;
}
export interface UserGroupUpsertRequest {
groupKey: string;
name: string;
description?: string;
source?: 'gateway' | 'server-main' | 'sync' | string;
priority?: number;
rechargeDiscountPolicy?: Record<string, unknown>;
billingDiscountPolicy?: Record<string, unknown>;
rateLimitPolicy?: RateLimitPolicy;
quotaPolicy?: Record<string, unknown>;
metadata?: Record<string, unknown>;
status?: 'active' | 'disabled' | string;
}
export interface UserGroupMembership {
id: string;
groupId: string;
@ -288,6 +395,57 @@ export interface UserGroupMembership {
updatedAt: string;
}
export type GatewayAccessSubjectType = 'user_group' | 'tenant' | 'user' | 'api_key';
export type GatewayAccessResourceType = 'platform' | 'platform_model' | 'base_model';
export type GatewayAccessEffect = 'allow' | 'deny';
export interface GatewayAccessRule {
id: string;
subjectType: GatewayAccessSubjectType | string;
subjectId: string;
resourceType: GatewayAccessResourceType | string;
resourceId: string;
effect: GatewayAccessEffect | string;
priority: number;
minPermissionLevel: number;
conditions?: Record<string, unknown>;
metadata?: Record<string, unknown>;
status: 'active' | 'disabled' | string;
createdAt: string;
updatedAt: string;
}
export interface GatewayAccessRuleUpsertRequest {
subjectType: GatewayAccessSubjectType | string;
subjectId: string;
resourceType: GatewayAccessResourceType | string;
resourceId: string;
effect: GatewayAccessEffect | string;
priority?: number;
minPermissionLevel?: number;
conditions?: Record<string, unknown>;
metadata?: Record<string, unknown>;
status?: 'active' | 'disabled' | string;
}
export interface GatewayAccessRuleResourceRequest {
resourceType: GatewayAccessResourceType | string;
resourceId: string;
priority?: number;
minPermissionLevel?: number;
conditions?: Record<string, unknown>;
metadata?: Record<string, unknown>;
status?: 'active' | 'disabled' | string;
}
export interface GatewayAccessRuleBatchRequest {
subjectType: GatewayAccessSubjectType | string;
subjectId: string;
effect: GatewayAccessEffect | string;
upsertResources?: GatewayAccessRuleResourceRequest[];
deleteResources?: GatewayAccessRuleResourceRequest[];
}
export interface GatewayApiKey {
id: string;
gatewayTenantId?: string;
@ -313,6 +471,10 @@ export interface CreatedGatewayApiKey {
secret: string;
}
export interface PlayableGatewayApiKey extends GatewayApiKey {
secret: string;
}
export interface GatewayWalletAccount {
id: string;
gatewayTenantId?: string;
@ -406,6 +568,11 @@ export interface PlatformModel {
pricingRuleSetId?: string;
billingConfigOverride?: BillingConfig;
billingConfig?: BillingConfig;
permissionConfig?: Record<string, unknown>;
retryPolicy?: Record<string, unknown>;
rateLimitPolicy?: RateLimitPolicy;
runtimePolicySetId?: string;
runtimePolicyOverride?: RuntimePolicyOverride;
enabled: boolean;
createdAt: string;
updatedAt: string;
@ -433,14 +600,29 @@ export interface GatewayTask {
gatewayTenantId?: string;
tenantId?: string;
tenantKey?: string;
apiKeyId?: string;
apiKeyName?: string;
apiKeyPrefix?: string;
userGroupId?: string;
userGroupKey?: string;
userGroupPolicySnapshot?: Record<string, unknown>;
model: string;
modelType?: string;
requestedModel?: string;
resolvedModel?: string;
requestId?: string;
request?: Record<string, unknown>;
status: 'queued' | 'running' | 'succeeded' | 'failed' | 'cancelled' | string;
result?: Record<string, unknown>;
billings?: unknown[];
usage?: Record<string, unknown>;
metrics?: Record<string, unknown>;
billingSummary?: Record<string, unknown>;
finalChargeAmount?: number;
responseStartedAt?: string;
responseFinishedAt?: string;
responseDurationMs?: number;
finishedAt?: string;
error?: string;
createdAt: string;
updatedAt: string;