easyai-ai-gateway/apps/api/internal/store/file_storage_channels.go

500 lines
14 KiB
Go

package store
import (
"context"
"encoding/json"
"strings"
"time"
"github.com/jackc/pgx/v5"
)
const defaultServerMainUploadURL = "http://127.0.0.1:3001/v1/files/upload"
const (
FileStorageSceneUpload = "upload"
FileStorageSceneImageResult = "image_result"
)
const (
FileStorageResultUploadPolicyDefault = "default"
FileStorageResultUploadPolicyUploadAll = "upload_all"
FileStorageResultUploadPolicyUploadNone = "upload_none"
)
const SystemSettingFileStorage = "file_storage"
const fileStorageChannelColumns = `
id::text, channel_key, name, provider, COALESCE(upload_url, ''), credentials,
config, retry_policy, priority, status, COALESCE(last_error, ''),
COALESCE(last_failed_at::text, ''), COALESCE(last_succeeded_at::text, ''),
created_at, updated_at`
type FileStorageChannel struct {
ID string `json:"id"`
ChannelKey string `json:"channelKey"`
Name string `json:"name"`
Provider string `json:"provider"`
UploadURL string `json:"uploadUrl,omitempty"`
APIKey string `json:"-"`
CredentialsPreview map[string]any `json:"credentialsPreview,omitempty"`
Scenes []string `json:"scenes,omitempty"`
Config map[string]any `json:"config,omitempty"`
RetryPolicy map[string]any `json:"retryPolicy,omitempty"`
Priority int `json:"priority"`
Status string `json:"status"`
LastError string `json:"lastError,omitempty"`
LastFailedAt string `json:"lastFailedAt,omitempty"`
LastSucceededAt string `json:"lastSucceededAt,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type FileStorageChannelInput struct {
ChannelKey string `json:"channelKey"`
Name string `json:"name"`
Provider string `json:"provider"`
UploadURL string `json:"uploadUrl"`
APIKey *string `json:"apiKey"`
Scenes []string `json:"scenes"`
Config map[string]any `json:"config"`
RetryPolicy map[string]any `json:"retryPolicy"`
Priority int `json:"priority"`
Status string `json:"status"`
}
type FileStorageSettings struct {
ResultUploadPolicy string `json:"resultUploadPolicy"`
}
type FileStorageSettingsInput struct {
ResultUploadPolicy string `json:"resultUploadPolicy"`
}
type fileStorageChannelScanner interface {
Scan(dest ...any) error
}
func (s *Store) ListFileStorageChannels(ctx context.Context) ([]FileStorageChannel, error) {
rows, err := s.pool.Query(ctx, `
SELECT `+fileStorageChannelColumns+`
FROM file_storage_channels
WHERE deleted_at IS NULL
ORDER BY priority ASC, created_at ASC`)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]FileStorageChannel, 0)
for rows.Next() {
item, err := scanFileStorageChannel(rows)
if err != nil {
return nil, err
}
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) ListEnabledFileStorageChannels(ctx context.Context) ([]FileStorageChannel, error) {
return s.listEnabledFileStorageChannels(ctx, "")
}
func (s *Store) ListEnabledFileStorageChannelsForScene(ctx context.Context, scene string) ([]FileStorageChannel, error) {
return s.listEnabledFileStorageChannels(ctx, normalizeFileStorageScene(scene))
}
func (s *Store) listEnabledFileStorageChannels(ctx context.Context, scene string) ([]FileStorageChannel, error) {
rows, err := s.pool.Query(ctx, `
SELECT `+fileStorageChannelColumns+`
FROM file_storage_channels
WHERE deleted_at IS NULL
AND status = 'enabled'
AND (
$1 = ''
OR NOT (config ? 'scenes')
OR jsonb_typeof(config->'scenes') <> 'array'
OR (config->'scenes') ? $1
)
ORDER BY priority ASC, created_at ASC`, scene)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]FileStorageChannel, 0)
for rows.Next() {
item, err := scanFileStorageChannel(rows)
if err != nil {
return nil, err
}
items = append(items, item)
}
return items, rows.Err()
}
func (s *Store) GetFileStorageChannel(ctx context.Context, id string) (FileStorageChannel, error) {
return scanFileStorageChannel(s.pool.QueryRow(ctx, `
SELECT `+fileStorageChannelColumns+`
FROM file_storage_channels
WHERE id = $1::uuid
AND deleted_at IS NULL`, id))
}
func (s *Store) CreateFileStorageChannel(ctx context.Context, input FileStorageChannelInput) (FileStorageChannel, error) {
input = normalizeFileStorageChannelInput(input)
credentials, _ := json.Marshal(credentialsFromFileStorageInput(input))
config, _ := json.Marshal(configFromFileStorageInput(input))
retryPolicy, _ := json.Marshal(defaultFileStorageRetryPolicyIfEmpty(input.RetryPolicy))
return scanFileStorageChannel(s.pool.QueryRow(ctx, `
INSERT INTO file_storage_channels (
channel_key, name, provider, upload_url, credentials, config, retry_policy, priority, status
)
VALUES ($1, $2, $3, NULLIF($4, ''), $5, $6, $7, $8, $9)
RETURNING `+fileStorageChannelColumns,
input.ChannelKey,
input.Name,
input.Provider,
input.UploadURL,
credentials,
config,
retryPolicy,
input.Priority,
input.Status,
))
}
func (s *Store) UpdateFileStorageChannel(ctx context.Context, id string, input FileStorageChannelInput) (FileStorageChannel, error) {
input = normalizeFileStorageChannelInput(input)
replaceCredentials := input.APIKey != nil
credentials, _ := json.Marshal(credentialsFromFileStorageInput(input))
config, _ := json.Marshal(configFromFileStorageInput(input))
retryPolicy, _ := json.Marshal(defaultFileStorageRetryPolicyIfEmpty(input.RetryPolicy))
return scanFileStorageChannel(s.pool.QueryRow(ctx, `
UPDATE file_storage_channels
SET channel_key = $2,
name = $3,
provider = $4,
upload_url = NULLIF($5, ''),
credentials = CASE WHEN $6::boolean THEN $7 ELSE credentials END,
config = $8,
retry_policy = $9,
priority = $10,
status = $11,
updated_at = now()
WHERE id = $1::uuid
AND deleted_at IS NULL
RETURNING `+fileStorageChannelColumns,
id,
input.ChannelKey,
input.Name,
input.Provider,
input.UploadURL,
replaceCredentials,
credentials,
config,
retryPolicy,
input.Priority,
input.Status,
))
}
func (s *Store) DeleteFileStorageChannel(ctx context.Context, id string) error {
result, err := s.pool.Exec(ctx, `
UPDATE file_storage_channels
SET deleted_at = now(),
status = 'disabled',
updated_at = now()
WHERE id = $1::uuid
AND deleted_at IS NULL`, id)
if err != nil {
return err
}
if result.RowsAffected() == 0 {
return pgx.ErrNoRows
}
return nil
}
func (s *Store) MarkFileStorageChannelFailure(ctx context.Context, id string, message string) error {
if strings.TrimSpace(id) == "" {
return nil
}
_, err := s.pool.Exec(ctx, `
UPDATE file_storage_channels
SET last_error = NULLIF($2, ''),
last_failed_at = now(),
updated_at = now()
WHERE id = $1::uuid
AND deleted_at IS NULL`, id, strings.TrimSpace(message))
return err
}
func (s *Store) MarkFileStorageChannelSuccess(ctx context.Context, id string) error {
if strings.TrimSpace(id) == "" {
return nil
}
_, err := s.pool.Exec(ctx, `
UPDATE file_storage_channels
SET last_error = NULL,
last_succeeded_at = now(),
updated_at = now()
WHERE id = $1::uuid
AND deleted_at IS NULL`, id)
return err
}
func scanFileStorageChannel(scanner fileStorageChannelScanner) (FileStorageChannel, error) {
var item FileStorageChannel
var credentials []byte
var config []byte
var retryPolicy []byte
if err := scanner.Scan(
&item.ID,
&item.ChannelKey,
&item.Name,
&item.Provider,
&item.UploadURL,
&credentials,
&config,
&retryPolicy,
&item.Priority,
&item.Status,
&item.LastError,
&item.LastFailedAt,
&item.LastSucceededAt,
&item.CreatedAt,
&item.UpdatedAt,
); err != nil {
return FileStorageChannel{}, err
}
credentialObject := decodeObject(credentials)
item.APIKey = stringFromObject(credentialObject, "apiKey")
item.CredentialsPreview = maskCredentialsPreview(credentials)
configObject := decodeObject(config)
item.Scenes = fileStorageScenesFromConfig(configObject)
item.Config = fileStorageConfigWithoutManagedFields(configObject)
item.RetryPolicy = decodeObject(retryPolicy)
return item, nil
}
func normalizeFileStorageChannelInput(input FileStorageChannelInput) FileStorageChannelInput {
input.ChannelKey = strings.TrimSpace(input.ChannelKey)
input.Name = strings.TrimSpace(input.Name)
input.Provider = strings.ToLower(strings.TrimSpace(input.Provider))
input.UploadURL = strings.TrimSpace(input.UploadURL)
if input.APIKey != nil {
apiKey := strings.TrimSpace(*input.APIKey)
input.APIKey = &apiKey
}
input.Scenes = normalizeFileStorageScenes(input.Scenes)
input.Status = strings.ToLower(strings.TrimSpace(input.Status))
if input.Provider == "" {
input.Provider = "server_main_openapi"
}
if input.Provider == "server_main_openapi" && input.UploadURL == "" {
input.UploadURL = defaultServerMainUploadURL
}
if input.Status == "" {
input.Status = "disabled"
}
if input.Priority <= 0 {
input.Priority = 100
}
return input
}
func credentialsFromFileStorageInput(input FileStorageChannelInput) map[string]any {
apiKey := fileStorageInputAPIKey(input)
if apiKey == "" {
return map[string]any{}
}
return map[string]any{"apiKey": apiKey}
}
func fileStorageInputAPIKey(input FileStorageChannelInput) string {
if input.APIKey == nil {
return ""
}
return strings.TrimSpace(*input.APIKey)
}
func configFromFileStorageInput(input FileStorageChannelInput) map[string]any {
config := map[string]any{}
for key, value := range emptyObjectIfNil(input.Config) {
config[key] = value
}
config["scenes"] = normalizeFileStorageScenes(input.Scenes)
return config
}
func fileStorageConfigWithoutManagedFields(config map[string]any) map[string]any {
out := map[string]any{}
for key, value := range config {
if key == "scenes" || key == "resultUploadPolicy" {
continue
}
out[key] = value
}
if len(out) == 0 {
return nil
}
return out
}
func DefaultFileStorageSettings() FileStorageSettings {
return FileStorageSettings{ResultUploadPolicy: FileStorageResultUploadPolicyDefault}
}
func (s *Store) GetFileStorageSettings(ctx context.Context) (FileStorageSettings, error) {
var value []byte
err := s.pool.QueryRow(ctx, `
SELECT value
FROM system_settings
WHERE setting_key = $1`, SystemSettingFileStorage).Scan(&value)
if err != nil {
if IsNotFound(err) {
return DefaultFileStorageSettings(), nil
}
return FileStorageSettings{}, err
}
return fileStorageSettingsFromValue(decodeObject(value)), nil
}
func (s *Store) UpdateFileStorageSettings(ctx context.Context, input FileStorageSettingsInput) (FileStorageSettings, error) {
settings := FileStorageSettings{ResultUploadPolicy: NormalizeFileStorageResultUploadPolicy(input.ResultUploadPolicy)}
value, _ := json.Marshal(settings)
var saved []byte
err := s.upsertFileStorageSettings(ctx, value, &saved)
if err != nil && IsUndefinedDatabaseObject(err) {
if ensureErr := s.ensureSystemSettingsTable(ctx); ensureErr != nil {
return FileStorageSettings{}, ensureErr
}
err = s.upsertFileStorageSettings(ctx, value, &saved)
}
if err != nil {
return FileStorageSettings{}, err
}
return fileStorageSettingsFromValue(decodeObject(saved)), nil
}
func (s *Store) upsertFileStorageSettings(ctx context.Context, value []byte, saved *[]byte) error {
return s.pool.QueryRow(ctx, `
INSERT INTO system_settings (setting_key, value)
VALUES ($1, $2)
ON CONFLICT (setting_key)
DO UPDATE SET value = EXCLUDED.value, updated_at = now()
RETURNING value`, SystemSettingFileStorage, value).Scan(saved)
}
func (s *Store) ensureSystemSettingsTable(ctx context.Context) error {
_, err := s.pool.Exec(ctx, `
CREATE TABLE IF NOT EXISTS system_settings (
setting_key text PRIMARY KEY,
value jsonb NOT NULL DEFAULT '{}'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
)`)
return err
}
func fileStorageSettingsFromValue(value map[string]any) FileStorageSettings {
settings := DefaultFileStorageSettings()
if value == nil {
return settings
}
settings.ResultUploadPolicy = NormalizeFileStorageResultUploadPolicy(stringFromAny(value["resultUploadPolicy"]))
return settings
}
func NormalizeFileStorageResultUploadPolicy(policy string) string {
normalized := strings.ToLower(strings.TrimSpace(policy))
normalized = strings.ReplaceAll(normalized, "-", "_")
switch normalized {
case "", "default", "non_link_only", "inline_only", "nonlink_only", "non_link":
return FileStorageResultUploadPolicyDefault
case "upload_all", "all", "always", "all_upload":
return FileStorageResultUploadPolicyUploadAll
case "upload_none", "none", "never", "disabled", "no_upload", "skip", "skip_all":
return FileStorageResultUploadPolicyUploadNone
default:
return FileStorageResultUploadPolicyDefault
}
}
func fileStorageScenesFromConfig(config map[string]any) []string {
if config == nil {
return defaultFileStorageScenes()
}
raw, ok := config["scenes"]
if !ok {
return defaultFileStorageScenes()
}
items, ok := raw.([]any)
if !ok {
return defaultFileStorageScenes()
}
scenes := make([]string, 0, len(items))
for _, item := range items {
if value, ok := item.(string); ok {
scenes = append(scenes, value)
}
}
return normalizeFileStorageScenes(scenes)
}
func normalizeFileStorageScenes(scenes []string) []string {
seen := map[string]bool{}
out := make([]string, 0, len(scenes))
for _, item := range scenes {
scene := normalizeFileStorageScene(item)
if scene == "" || seen[scene] {
continue
}
seen[scene] = true
out = append(out, scene)
}
if len(out) == 0 {
return defaultFileStorageScenes()
}
return out
}
func normalizeFileStorageScene(scene string) string {
return strings.ToLower(strings.TrimSpace(scene))
}
func defaultFileStorageScenes() []string {
return []string{FileStorageSceneUpload, FileStorageSceneImageResult}
}
func defaultFileStorageRetryPolicyIfEmpty(policy map[string]any) map[string]any {
if len(policy) > 0 {
return policy
}
return map[string]any{
"enabled": true,
"maxRetries": 3,
"backoffSeconds": []any{60, 120, 180},
"strategy": "exponential",
}
}
func stringFromObject(value map[string]any, key string) string {
if value == nil {
return ""
}
raw, _ := value[key].(string)
return strings.TrimSpace(raw)
}
func stringFromAny(value any) string {
switch typed := value.(type) {
case string:
return strings.TrimSpace(typed)
default:
return ""
}
}