easyai-ai-gateway/apps/api/internal/store/request_assets.go

131 lines
3.8 KiB
Go

package store
import (
"context"
"database/sql"
"time"
"github.com/jackc/pgx/v5"
)
type RequestAsset struct {
ID string `json:"id"`
SHA256 string `json:"sha256"`
ContentType string `json:"contentType"`
ByteSize int64 `json:"byteSize"`
URL string `json:"url"`
StorageProvider string `json:"storageProvider"`
LocalPath string `json:"localPath,omitempty"`
ExpiresAt *time.Time `json:"expiresAt,omitempty"`
ExpiredAt *time.Time `json:"expiredAt,omitempty"`
RefCount int `json:"refCount"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type RequestAssetInput struct {
SHA256 string
ContentType string
ByteSize int64
URL string
StorageProvider string
LocalPath string
ExpiresAt *time.Time
}
func (s *Store) FindRequestAsset(ctx context.Context, sha256 string, contentType string) (RequestAsset, bool, error) {
asset, err := scanRequestAsset(s.pool.QueryRow(ctx, `
SELECT id::text, sha256, content_type, byte_size, url, storage_provider,
COALESCE(local_path, ''), expires_at, expired_at, ref_count, created_at, updated_at
FROM gateway_request_assets
WHERE sha256 = $1 AND content_type = $2`, sha256, contentType))
if err != nil {
if err == pgx.ErrNoRows {
return RequestAsset{}, false, nil
}
return RequestAsset{}, false, err
}
return asset, true, nil
}
func (s *Store) UpsertRequestAsset(ctx context.Context, input RequestAssetInput) (RequestAsset, error) {
return scanRequestAsset(s.pool.QueryRow(ctx, `
INSERT INTO gateway_request_assets (
sha256, content_type, byte_size, url, storage_provider, local_path, expires_at, expired_at, ref_count
)
VALUES ($1, $2, $3, $4, $5, NULLIF($6, ''), $7, NULL, 1)
ON CONFLICT (sha256, content_type) DO UPDATE
SET byte_size = EXCLUDED.byte_size,
url = EXCLUDED.url,
storage_provider = EXCLUDED.storage_provider,
local_path = EXCLUDED.local_path,
expires_at = EXCLUDED.expires_at,
expired_at = NULL,
ref_count = gateway_request_assets.ref_count + 1,
updated_at = now()
RETURNING id::text, sha256, content_type, byte_size, url, storage_provider,
COALESCE(local_path, ''), expires_at, expired_at, ref_count, created_at, updated_at`,
input.SHA256,
input.ContentType,
input.ByteSize,
input.URL,
input.StorageProvider,
input.LocalPath,
input.ExpiresAt,
))
}
func (s *Store) IncrementRequestAssetRefCount(ctx context.Context, sha256 string, contentType string) error {
_, err := s.pool.Exec(ctx, `
UPDATE gateway_request_assets
SET ref_count = ref_count + 1,
updated_at = now()
WHERE sha256 = $1 AND content_type = $2`, sha256, contentType)
return err
}
func (s *Store) MarkRequestAssetExpiredByLocalPath(ctx context.Context, localPath string, expiredAt time.Time) error {
if localPath == "" {
return nil
}
_, err := s.pool.Exec(ctx, `
UPDATE gateway_request_assets
SET expired_at = COALESCE(expired_at, $2),
updated_at = now()
WHERE local_path = $1
AND storage_provider = 'local_static'
AND expired_at IS NULL`, localPath, expiredAt)
return err
}
func scanRequestAsset(scanner interface{ Scan(dest ...any) error }) (RequestAsset, error) {
var asset RequestAsset
var localPath string
var expiresAt sql.NullTime
var expiredAt sql.NullTime
if err := scanner.Scan(
&asset.ID,
&asset.SHA256,
&asset.ContentType,
&asset.ByteSize,
&asset.URL,
&asset.StorageProvider,
&localPath,
&expiresAt,
&expiredAt,
&asset.RefCount,
&asset.CreatedAt,
&asset.UpdatedAt,
); err != nil {
return RequestAsset{}, err
}
asset.LocalPath = localPath
if expiresAt.Valid {
asset.ExpiresAt = &expiresAt.Time
}
if expiredAt.Valid {
asset.ExpiredAt = &expiredAt.Time
}
return asset, nil
}