easyai-ai-gateway/apps/api/internal/httpapi/streaming.go

279 lines
7.3 KiB
Go

package httpapi
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/easyai/easyai-ai-gateway/apps/api/internal/clients"
)
func prepareCompatibleStream(w http.ResponseWriter) http.Flusher {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, _ := w.(http.Flusher)
return flusher
}
type compatibleStreamWriter struct {
kind string
model string
includeUsage bool
id string
created int64
systemFingerprint any
sentRole bool
sentFinish bool
sentUsage bool
}
func newCompatibleStreamWriter(kind string, model string, includeUsage bool) *compatibleStreamWriter {
return &compatibleStreamWriter{
kind: kind,
model: model,
includeUsage: includeUsage,
id: "chatcmpl-stream",
created: time.Now().Unix(),
}
}
func (s *compatibleStreamWriter) writeDelta(w http.ResponseWriter, event clients.StreamDeltaEvent) {
if s.kind == "responses" {
if event.Text != "" {
sendSSE(w, "response.output_text.delta", map[string]any{"type": "response.output_text.delta", "delta": event.Text})
}
return
}
if event.Event != nil && isChatCompletionChunk(event.Event) {
s.writeChatChunk(w, event.Event)
return
}
if event.Text == "" && event.ReasoningContent == "" {
return
}
s.ensureRoleChunk(w)
if event.ReasoningContent != "" {
s.writeChatData(w, s.chatChunk([]any{map[string]any{"index": 0, "delta": map[string]any{"reasoning_content": event.ReasoningContent}, "finish_reason": nil}}, nil))
}
if event.Text != "" {
s.writeChatData(w, s.chatChunk([]any{map[string]any{"index": 0, "delta": map[string]any{"content": event.Text}, "finish_reason": nil}}, nil))
}
}
func (s *compatibleStreamWriter) writeDone(w http.ResponseWriter, output map[string]any) {
if s.kind == "responses" {
sendSSE(w, "response.completed", map[string]any{"type": "response.completed", "response": output})
return
}
s.captureOutputMetadata(output)
if !s.sentRole {
s.ensureRoleChunk(w)
}
if !s.sentFinish {
s.writeChatData(w, s.chatChunk([]any{map[string]any{"index": 0, "delta": map[string]any{}, "finish_reason": finishReasonFromOutput(output)}}, nil))
s.sentFinish = true
}
if s.includeUsage && !s.sentUsage {
if usage, ok := output["usage"].(map[string]any); ok && len(usage) > 0 {
s.writeChatData(w, s.chatChunk([]any{}, usage))
s.sentUsage = true
}
}
s.writeDoneMarker(w)
}
func (s *compatibleStreamWriter) writeChatChunk(w http.ResponseWriter, chunk map[string]any) {
chunk = clients.NormalizeChatCompletionStreamEvent(chunk)
s.captureChunkMetadata(chunk)
choices, _ := chunk["choices"].([]any)
usage, hasUsage := chunk["usage"].(map[string]any)
if len(choices) == 0 && hasUsage {
if !s.includeUsage {
return
}
s.writeChatData(w, s.chatChunk([]any{}, usage))
s.sentUsage = true
return
}
if len(choices) > 0 && !chunkHasRole(choices) && !s.sentRole {
s.ensureRoleChunk(w)
}
if chunkHasRole(choices) {
s.sentRole = true
}
if chunkHasFinishReason(choices) {
s.sentFinish = true
}
normalized := cloneMap(chunk)
normalized["id"] = s.id
normalized["object"] = "chat.completion.chunk"
normalized["created"] = s.created
normalized["model"] = firstString(normalized["model"], s.model)
normalized["system_fingerprint"] = s.systemFingerprint
s.writeChatData(w, normalized)
}
func (s *compatibleStreamWriter) ensureRoleChunk(w http.ResponseWriter) {
if s.sentRole {
return
}
s.writeChatData(w, s.chatChunk([]any{map[string]any{"index": 0, "delta": map[string]any{"role": "assistant"}, "finish_reason": nil}}, nil))
s.sentRole = true
}
func (s *compatibleStreamWriter) chatChunk(choices []any, usage map[string]any) map[string]any {
chunk := map[string]any{
"id": s.id,
"object": "chat.completion.chunk",
"created": s.created,
"model": s.model,
"system_fingerprint": s.systemFingerprint,
"choices": choices,
}
if usage != nil {
chunk["usage"] = usage
} else {
chunk["usage"] = nil
}
return chunk
}
func (s *compatibleStreamWriter) writeChatData(w http.ResponseWriter, payload map[string]any) {
bytes, _ := json.Marshal(payload)
_, _ = fmt.Fprintf(w, "data: %s\n\n", bytes)
}
func (s *compatibleStreamWriter) writeDoneMarker(w http.ResponseWriter) {
_, _ = fmt.Fprint(w, "data: [DONE]\n\n")
}
func (s *compatibleStreamWriter) captureChunkMetadata(chunk map[string]any) {
if id := firstString(chunk["id"], ""); id != "" {
s.id = id
}
if model := firstString(chunk["model"], ""); model != "" {
s.model = model
}
if created := int64FromAny(chunk["created"]); created > 0 {
s.created = created
}
if value, ok := chunk["system_fingerprint"]; ok {
s.systemFingerprint = value
}
}
func (s *compatibleStreamWriter) captureOutputMetadata(output map[string]any) {
if id := firstString(output["id"], ""); id != "" {
s.id = id
}
if model := firstString(output["model"], ""); model != "" {
s.model = model
}
if created := int64FromAny(output["created"]); created > 0 {
s.created = created
}
if value, ok := output["system_fingerprint"]; ok {
s.systemFingerprint = value
}
}
func writeCompatibleStream(w http.ResponseWriter, kind string, model string, output map[string]any) {
prepareCompatibleStream(w)
writer := newCompatibleStreamWriter(kind, model, true)
content := extractOutputText(output)
if content == "" {
content = "done"
}
writer.writeDelta(w, clients.StreamDeltaEvent{Text: content})
writer.writeDone(w, output)
}
func firstString(value any, fallback string) string {
if text, ok := value.(string); ok && text != "" {
return text
}
return fallback
}
func int64FromAny(value any) int64 {
switch typed := value.(type) {
case int64:
return typed
case int:
return int64(typed)
case float64:
return int64(typed)
default:
return 0
}
}
func isChatCompletionChunk(event map[string]any) bool {
object, _ := event["object"].(string)
if object == "chat.completion.chunk" {
return true
}
_, hasChoices := event["choices"].([]any)
return hasChoices
}
func chunkHasRole(choices []any) bool {
for _, rawChoice := range choices {
choice, _ := rawChoice.(map[string]any)
delta, _ := choice["delta"].(map[string]any)
if role, ok := delta["role"].(string); ok && role != "" {
return true
}
}
return false
}
func chunkHasFinishReason(choices []any) bool {
for _, rawChoice := range choices {
choice, _ := rawChoice.(map[string]any)
if reason, ok := choice["finish_reason"].(string); ok && reason != "" {
return true
}
}
return false
}
func finishReasonFromOutput(output map[string]any) string {
choices, _ := output["choices"].([]any)
for _, rawChoice := range choices {
choice, _ := rawChoice.(map[string]any)
if reason, ok := choice["finish_reason"].(string); ok && reason != "" {
return reason
}
}
return "stop"
}
func cloneMap(value map[string]any) map[string]any {
out := map[string]any{}
for key, item := range value {
out[key] = item
}
return out
}
func extractOutputText(output map[string]any) string {
if text, ok := output["output_text"].(string); ok {
return text
}
choices, _ := output["choices"].([]any)
for _, rawChoice := range choices {
choice, _ := rawChoice.(map[string]any)
message, _ := choice["message"].(map[string]any)
if content, ok := message["content"].(string); ok {
return content
}
}
return ""
}