279 lines
7.3 KiB
Go
279 lines
7.3 KiB
Go
package httpapi
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/easyai/easyai-ai-gateway/apps/api/internal/clients"
|
|
)
|
|
|
|
func prepareCompatibleStream(w http.ResponseWriter) http.Flusher {
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
flusher, _ := w.(http.Flusher)
|
|
return flusher
|
|
}
|
|
|
|
type compatibleStreamWriter struct {
|
|
kind string
|
|
model string
|
|
includeUsage bool
|
|
|
|
id string
|
|
created int64
|
|
systemFingerprint any
|
|
sentRole bool
|
|
sentFinish bool
|
|
sentUsage bool
|
|
}
|
|
|
|
func newCompatibleStreamWriter(kind string, model string, includeUsage bool) *compatibleStreamWriter {
|
|
return &compatibleStreamWriter{
|
|
kind: kind,
|
|
model: model,
|
|
includeUsage: includeUsage,
|
|
id: "chatcmpl-stream",
|
|
created: time.Now().Unix(),
|
|
}
|
|
}
|
|
|
|
func (s *compatibleStreamWriter) writeDelta(w http.ResponseWriter, event clients.StreamDeltaEvent) {
|
|
if s.kind == "responses" {
|
|
if event.Text != "" {
|
|
sendSSE(w, "response.output_text.delta", map[string]any{"type": "response.output_text.delta", "delta": event.Text})
|
|
}
|
|
return
|
|
}
|
|
|
|
if event.Event != nil && isChatCompletionChunk(event.Event) {
|
|
s.writeChatChunk(w, event.Event)
|
|
return
|
|
}
|
|
|
|
if event.Text == "" && event.ReasoningContent == "" {
|
|
return
|
|
}
|
|
s.ensureRoleChunk(w)
|
|
if event.ReasoningContent != "" {
|
|
s.writeChatData(w, s.chatChunk([]any{map[string]any{"index": 0, "delta": map[string]any{"reasoning_content": event.ReasoningContent}, "finish_reason": nil}}, nil))
|
|
}
|
|
if event.Text != "" {
|
|
s.writeChatData(w, s.chatChunk([]any{map[string]any{"index": 0, "delta": map[string]any{"content": event.Text}, "finish_reason": nil}}, nil))
|
|
}
|
|
}
|
|
|
|
func (s *compatibleStreamWriter) writeDone(w http.ResponseWriter, output map[string]any) {
|
|
if s.kind == "responses" {
|
|
sendSSE(w, "response.completed", map[string]any{"type": "response.completed", "response": output})
|
|
return
|
|
}
|
|
s.captureOutputMetadata(output)
|
|
if !s.sentRole {
|
|
s.ensureRoleChunk(w)
|
|
}
|
|
if !s.sentFinish {
|
|
s.writeChatData(w, s.chatChunk([]any{map[string]any{"index": 0, "delta": map[string]any{}, "finish_reason": finishReasonFromOutput(output)}}, nil))
|
|
s.sentFinish = true
|
|
}
|
|
if s.includeUsage && !s.sentUsage {
|
|
if usage, ok := output["usage"].(map[string]any); ok && len(usage) > 0 {
|
|
s.writeChatData(w, s.chatChunk([]any{}, usage))
|
|
s.sentUsage = true
|
|
}
|
|
}
|
|
s.writeDoneMarker(w)
|
|
}
|
|
|
|
func (s *compatibleStreamWriter) writeChatChunk(w http.ResponseWriter, chunk map[string]any) {
|
|
chunk = clients.NormalizeChatCompletionStreamEvent(chunk)
|
|
s.captureChunkMetadata(chunk)
|
|
choices, _ := chunk["choices"].([]any)
|
|
usage, hasUsage := chunk["usage"].(map[string]any)
|
|
if len(choices) == 0 && hasUsage {
|
|
if !s.includeUsage {
|
|
return
|
|
}
|
|
s.writeChatData(w, s.chatChunk([]any{}, usage))
|
|
s.sentUsage = true
|
|
return
|
|
}
|
|
if len(choices) > 0 && !chunkHasRole(choices) && !s.sentRole {
|
|
s.ensureRoleChunk(w)
|
|
}
|
|
if chunkHasRole(choices) {
|
|
s.sentRole = true
|
|
}
|
|
if chunkHasFinishReason(choices) {
|
|
s.sentFinish = true
|
|
}
|
|
normalized := cloneMap(chunk)
|
|
normalized["id"] = s.id
|
|
normalized["object"] = "chat.completion.chunk"
|
|
normalized["created"] = s.created
|
|
normalized["model"] = firstString(normalized["model"], s.model)
|
|
normalized["system_fingerprint"] = s.systemFingerprint
|
|
s.writeChatData(w, normalized)
|
|
}
|
|
|
|
func (s *compatibleStreamWriter) ensureRoleChunk(w http.ResponseWriter) {
|
|
if s.sentRole {
|
|
return
|
|
}
|
|
s.writeChatData(w, s.chatChunk([]any{map[string]any{"index": 0, "delta": map[string]any{"role": "assistant"}, "finish_reason": nil}}, nil))
|
|
s.sentRole = true
|
|
}
|
|
|
|
func (s *compatibleStreamWriter) chatChunk(choices []any, usage map[string]any) map[string]any {
|
|
chunk := map[string]any{
|
|
"id": s.id,
|
|
"object": "chat.completion.chunk",
|
|
"created": s.created,
|
|
"model": s.model,
|
|
"system_fingerprint": s.systemFingerprint,
|
|
"choices": choices,
|
|
}
|
|
if usage != nil {
|
|
chunk["usage"] = usage
|
|
} else {
|
|
chunk["usage"] = nil
|
|
}
|
|
return chunk
|
|
}
|
|
|
|
func (s *compatibleStreamWriter) writeChatData(w http.ResponseWriter, payload map[string]any) {
|
|
bytes, _ := json.Marshal(payload)
|
|
_, _ = fmt.Fprintf(w, "data: %s\n\n", bytes)
|
|
}
|
|
|
|
func (s *compatibleStreamWriter) writeDoneMarker(w http.ResponseWriter) {
|
|
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
|
|
}
|
|
|
|
func (s *compatibleStreamWriter) captureChunkMetadata(chunk map[string]any) {
|
|
if id := firstString(chunk["id"], ""); id != "" {
|
|
s.id = id
|
|
}
|
|
if model := firstString(chunk["model"], ""); model != "" {
|
|
s.model = model
|
|
}
|
|
if created := int64FromAny(chunk["created"]); created > 0 {
|
|
s.created = created
|
|
}
|
|
if value, ok := chunk["system_fingerprint"]; ok {
|
|
s.systemFingerprint = value
|
|
}
|
|
}
|
|
|
|
func (s *compatibleStreamWriter) captureOutputMetadata(output map[string]any) {
|
|
if id := firstString(output["id"], ""); id != "" {
|
|
s.id = id
|
|
}
|
|
if model := firstString(output["model"], ""); model != "" {
|
|
s.model = model
|
|
}
|
|
if created := int64FromAny(output["created"]); created > 0 {
|
|
s.created = created
|
|
}
|
|
if value, ok := output["system_fingerprint"]; ok {
|
|
s.systemFingerprint = value
|
|
}
|
|
}
|
|
|
|
func writeCompatibleStream(w http.ResponseWriter, kind string, model string, output map[string]any) {
|
|
prepareCompatibleStream(w)
|
|
writer := newCompatibleStreamWriter(kind, model, true)
|
|
content := extractOutputText(output)
|
|
if content == "" {
|
|
content = "done"
|
|
}
|
|
writer.writeDelta(w, clients.StreamDeltaEvent{Text: content})
|
|
writer.writeDone(w, output)
|
|
}
|
|
|
|
func firstString(value any, fallback string) string {
|
|
if text, ok := value.(string); ok && text != "" {
|
|
return text
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func int64FromAny(value any) int64 {
|
|
switch typed := value.(type) {
|
|
case int64:
|
|
return typed
|
|
case int:
|
|
return int64(typed)
|
|
case float64:
|
|
return int64(typed)
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
func isChatCompletionChunk(event map[string]any) bool {
|
|
object, _ := event["object"].(string)
|
|
if object == "chat.completion.chunk" {
|
|
return true
|
|
}
|
|
_, hasChoices := event["choices"].([]any)
|
|
return hasChoices
|
|
}
|
|
|
|
func chunkHasRole(choices []any) bool {
|
|
for _, rawChoice := range choices {
|
|
choice, _ := rawChoice.(map[string]any)
|
|
delta, _ := choice["delta"].(map[string]any)
|
|
if role, ok := delta["role"].(string); ok && role != "" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func chunkHasFinishReason(choices []any) bool {
|
|
for _, rawChoice := range choices {
|
|
choice, _ := rawChoice.(map[string]any)
|
|
if reason, ok := choice["finish_reason"].(string); ok && reason != "" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func finishReasonFromOutput(output map[string]any) string {
|
|
choices, _ := output["choices"].([]any)
|
|
for _, rawChoice := range choices {
|
|
choice, _ := rawChoice.(map[string]any)
|
|
if reason, ok := choice["finish_reason"].(string); ok && reason != "" {
|
|
return reason
|
|
}
|
|
}
|
|
return "stop"
|
|
}
|
|
|
|
func cloneMap(value map[string]any) map[string]any {
|
|
out := map[string]any{}
|
|
for key, item := range value {
|
|
out[key] = item
|
|
}
|
|
return out
|
|
}
|
|
|
|
func extractOutputText(output map[string]any) string {
|
|
if text, ok := output["output_text"].(string); ok {
|
|
return text
|
|
}
|
|
choices, _ := output["choices"].([]any)
|
|
for _, rawChoice := range choices {
|
|
choice, _ := rawChoice.(map[string]any)
|
|
message, _ := choice["message"].(map[string]any)
|
|
if content, ok := message["content"].(string); ok {
|
|
return content
|
|
}
|
|
}
|
|
return ""
|
|
}
|