easyai-ai-gateway/apps/api/internal/clients/openai.go
wangbo f550c0acd5 feat(admin): 添加网络代理配置和钱包交易功能
- 在管理面板中集成网络代理配置显示和平台代理设置
- 添加钱包摘要和交易列表API接口及数据管理
- 实现SSE流式响应中的错误处理机制
- 添加全局HTTP代理环境变量配置支持
- 更新平台表单以支持代理模式选择和自定义代理地址
- 集成钱包交易查询过滤和分页功能
- 优化API错误详情解析和显示格式
2026-05-11 23:02:10 +08:00

132 lines
3.8 KiB
Go

package clients
import (
"bytes"
"context"
"encoding/json"
"net/http"
"strings"
"time"
)
type OpenAIClient struct {
HTTPClient *http.Client
}
func (c OpenAIClient) Run(ctx context.Context, request Request) (Response, error) {
apiKey := credential(request.Candidate.Credentials, "apiKey", "api_key", "key", "token")
if apiKey == "" {
return Response{}, &ClientError{Code: "missing_credentials", Message: "openai api key is required", Retryable: false}
}
endpoint := openAIEndpoint(request.Kind)
if endpoint == "" {
return Response{}, &ClientError{Code: "unsupported_kind", Message: "unsupported openai request kind", Retryable: false}
}
body := cloneBody(request.Body)
body["model"] = upstreamModelName(request.Candidate)
stream := request.Stream || boolValue(body, "stream")
ensureOpenAIStreamUsage(body, request.Kind, stream)
raw, _ := json.Marshal(body)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, joinURL(request.Candidate.BaseURL, endpoint), bytes.NewReader(raw))
if err != nil {
return Response{}, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+apiKey)
resp, err := httpClient(request.HTTPClient, c.HTTPClient).Do(req)
if err != nil {
return Response{}, &ClientError{Code: "network", Message: err.Error(), Retryable: true}
}
responseStartedAt := time.Now()
requestID := requestIDFromHTTPResponse(resp)
result, err := decodeOpenAIResponse(resp, stream, request.StreamDelta)
responseFinishedAt := time.Now()
if err != nil {
return Response{}, annotateResponseError(err, requestID, responseStartedAt, responseFinishedAt)
}
if requestID == "" {
requestID = requestIDFromResult(result)
}
return Response{
Result: result,
RequestID: requestID,
Usage: usageFromOpenAI(result),
Progress: providerProgress(request),
ResponseStartedAt: responseStartedAt,
ResponseFinishedAt: responseFinishedAt,
ResponseDurationMS: responseDurationMS(responseStartedAt, responseFinishedAt),
}, nil
}
func decodeOpenAIResponse(resp *http.Response, stream bool, onDelta StreamDelta) (map[string]any, error) {
if stream {
result, err := decodeOpenAIStreamResponse(resp, onDelta)
if err == nil {
return result, nil
}
return nil, err
}
return decodeHTTPResponse(resp)
}
func openAIEndpoint(kind string) string {
switch kind {
case "chat.completions":
return "/chat/completions"
case "responses":
return "/responses"
case "images.generations":
return "/images/generations"
case "images.edits":
return "/images/edits"
default:
return ""
}
}
func cloneBody(body map[string]any) map[string]any {
out := map[string]any{}
for key, value := range body {
out[key] = value
}
return out
}
func ensureOpenAIStreamUsage(body map[string]any, kind string, stream bool) {
if !stream || kind != "chat.completions" {
return
}
streamOptions := map[string]any{}
if existing, ok := body["stream_options"].(map[string]any); ok {
for key, value := range existing {
streamOptions[key] = value
}
}
streamOptions["include_usage"] = true
body["stream_options"] = streamOptions
}
func joinURL(base string, path string) string {
base = strings.TrimRight(strings.TrimSpace(base), "/")
if base == "" {
base = "https://api.openai.com/v1"
}
return base + path
}
func httpClient(clients ...*http.Client) *http.Client {
for _, client := range clients {
if client != nil {
return client
}
}
return http.DefaultClient
}
func providerProgress(request Request) []Progress {
return []Progress{
{Phase: "submitting", Progress: 0.35, Message: "provider request submitted", Payload: map[string]any{"clientId": request.Candidate.ClientID}},
{Phase: "fetching_result", Progress: 0.8, Message: "provider response received", Payload: map[string]any{"provider": request.Candidate.Provider}},
}
}