131 lines
3.8 KiB
Go
131 lines
3.8 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
type RequestAsset struct {
|
|
ID string `json:"id"`
|
|
SHA256 string `json:"sha256"`
|
|
ContentType string `json:"contentType"`
|
|
ByteSize int64 `json:"byteSize"`
|
|
URL string `json:"url"`
|
|
StorageProvider string `json:"storageProvider"`
|
|
LocalPath string `json:"localPath,omitempty"`
|
|
ExpiresAt *time.Time `json:"expiresAt,omitempty"`
|
|
ExpiredAt *time.Time `json:"expiredAt,omitempty"`
|
|
RefCount int `json:"refCount"`
|
|
CreatedAt time.Time `json:"createdAt"`
|
|
UpdatedAt time.Time `json:"updatedAt"`
|
|
}
|
|
|
|
type RequestAssetInput struct {
|
|
SHA256 string
|
|
ContentType string
|
|
ByteSize int64
|
|
URL string
|
|
StorageProvider string
|
|
LocalPath string
|
|
ExpiresAt *time.Time
|
|
}
|
|
|
|
func (s *Store) FindRequestAsset(ctx context.Context, sha256 string, contentType string) (RequestAsset, bool, error) {
|
|
asset, err := scanRequestAsset(s.pool.QueryRow(ctx, `
|
|
SELECT id::text, sha256, content_type, byte_size, url, storage_provider,
|
|
COALESCE(local_path, ''), expires_at, expired_at, ref_count, created_at, updated_at
|
|
FROM gateway_request_assets
|
|
WHERE sha256 = $1 AND content_type = $2`, sha256, contentType))
|
|
if err != nil {
|
|
if err == pgx.ErrNoRows {
|
|
return RequestAsset{}, false, nil
|
|
}
|
|
return RequestAsset{}, false, err
|
|
}
|
|
return asset, true, nil
|
|
}
|
|
|
|
func (s *Store) UpsertRequestAsset(ctx context.Context, input RequestAssetInput) (RequestAsset, error) {
|
|
return scanRequestAsset(s.pool.QueryRow(ctx, `
|
|
INSERT INTO gateway_request_assets (
|
|
sha256, content_type, byte_size, url, storage_provider, local_path, expires_at, expired_at, ref_count
|
|
)
|
|
VALUES ($1, $2, $3, $4, $5, NULLIF($6, ''), $7, NULL, 1)
|
|
ON CONFLICT (sha256, content_type) DO UPDATE
|
|
SET byte_size = EXCLUDED.byte_size,
|
|
url = EXCLUDED.url,
|
|
storage_provider = EXCLUDED.storage_provider,
|
|
local_path = EXCLUDED.local_path,
|
|
expires_at = EXCLUDED.expires_at,
|
|
expired_at = NULL,
|
|
ref_count = gateway_request_assets.ref_count + 1,
|
|
updated_at = now()
|
|
RETURNING id::text, sha256, content_type, byte_size, url, storage_provider,
|
|
COALESCE(local_path, ''), expires_at, expired_at, ref_count, created_at, updated_at`,
|
|
input.SHA256,
|
|
input.ContentType,
|
|
input.ByteSize,
|
|
input.URL,
|
|
input.StorageProvider,
|
|
input.LocalPath,
|
|
input.ExpiresAt,
|
|
))
|
|
}
|
|
|
|
func (s *Store) IncrementRequestAssetRefCount(ctx context.Context, sha256 string, contentType string) error {
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE gateway_request_assets
|
|
SET ref_count = ref_count + 1,
|
|
updated_at = now()
|
|
WHERE sha256 = $1 AND content_type = $2`, sha256, contentType)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) MarkRequestAssetExpiredByLocalPath(ctx context.Context, localPath string, expiredAt time.Time) error {
|
|
if localPath == "" {
|
|
return nil
|
|
}
|
|
_, err := s.pool.Exec(ctx, `
|
|
UPDATE gateway_request_assets
|
|
SET expired_at = COALESCE(expired_at, $2),
|
|
updated_at = now()
|
|
WHERE local_path = $1
|
|
AND storage_provider = 'local_static'
|
|
AND expired_at IS NULL`, localPath, expiredAt)
|
|
return err
|
|
}
|
|
|
|
func scanRequestAsset(scanner interface{ Scan(dest ...any) error }) (RequestAsset, error) {
|
|
var asset RequestAsset
|
|
var localPath string
|
|
var expiresAt sql.NullTime
|
|
var expiredAt sql.NullTime
|
|
if err := scanner.Scan(
|
|
&asset.ID,
|
|
&asset.SHA256,
|
|
&asset.ContentType,
|
|
&asset.ByteSize,
|
|
&asset.URL,
|
|
&asset.StorageProvider,
|
|
&localPath,
|
|
&expiresAt,
|
|
&expiredAt,
|
|
&asset.RefCount,
|
|
&asset.CreatedAt,
|
|
&asset.UpdatedAt,
|
|
); err != nil {
|
|
return RequestAsset{}, err
|
|
}
|
|
asset.LocalPath = localPath
|
|
if expiresAt.Valid {
|
|
asset.ExpiresAt = &expiresAt.Time
|
|
}
|
|
if expiredAt.Valid {
|
|
asset.ExpiredAt = &expiredAt.Time
|
|
}
|
|
return asset, nil
|
|
}
|