ComfyUI/app/assets/database/queries/asset_reference.py
Luke Mino-Altherr 2bd4d82b4f
feat(assets): align local API with cloud spec (#12863)
* feat(assets): align local API with cloud spec

Unify response models, add missing fields, and align input schemas with
the cloud OpenAPI spec at cloud.comfy.org/openapi.

- Replace AssetSummary/AssetDetail/AssetUpdated with single Asset model
- Add is_immutable, metadata (system_metadata), prompt_id fields
- Support mime_type and preview_id in update endpoint
- Make CreateFromHashBody.name optional, add mime_type, require >=1 tag
- Add id/mime_type/preview_id to upload, relax tags to optional
- Rename total_tags → tags in tag add/remove responses
- Add GET /api/assets/tags/refine histogram endpoint
- Add DB migration for system_metadata and prompt_id columns

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Fix review issues: tags validation, size nullability, type annotation, hash mismatch check, and add tag histogram tests

- Remove contradictory min_length=1 from CreateFromHashBody.tags default
- Restore size field to int|None=None for proper null semantics
- Add Union type annotation to _build_asset_response result param
- Add hash mismatch validation on idempotent upload path (409 HASH_MISMATCH)
- Add unit tests for list_tag_histogram service function

Amp-Thread-ID: https://ampcode.com/threads/T-019cd993-f43c-704e-b3d7-6cfc3d4d4a80
Co-authored-by: Amp <amp@ampcode.com>

* Add preview_url to /assets API response using /api/view endpoint

For input and output assets, generate a preview_url pointing to the
existing /api/view endpoint using the asset's filename and tag-derived
type (input/output). Handles subdirectories via subfolder param and
URL-encodes filenames with spaces, unicode, and special characters.

This aligns the OSS backend response with the frontend AssetCard
expectation for thumbnail rendering.

Amp-Thread-ID: https://ampcode.com/threads/T-019cda3f-5c2c-751a-a906-ac6c9153ac5c
Co-authored-by: Amp <amp@ampcode.com>

* chore: remove unused imports from asset_reference queries

Amp-Thread-ID: https://ampcode.com/threads/T-019cda7d-cb21-77b4-a51b-b965af60208c
Co-authored-by: Amp <amp@ampcode.com>

* feat: resolve blake3 hashes in /view endpoint via asset database

Amp-Thread-ID: https://ampcode.com/threads/T-019cda7d-cb21-77b4-a51b-b965af60208c
Co-authored-by: Amp <amp@ampcode.com>

* Register uploaded images in asset database when --enable-assets is set

Add register_file_in_place() service function to ingest module for
registering already-saved files without moving them. Call it from the
/upload/image endpoint to return asset metadata in the response.

Amp-Thread-ID: https://ampcode.com/threads/T-019ce023-3384-7560-bacf-de40b0de0dd2
Co-authored-by: Amp <amp@ampcode.com>

* Exclude None fields from asset API JSON responses

Add exclude_none=True to model_dump() calls across asset routes to
keep response payloads clean by omitting unset optional fields.

Amp-Thread-ID: https://ampcode.com/threads/T-019ce023-3384-7560-bacf-de40b0de0dd2
Co-authored-by: Amp <amp@ampcode.com>

* Add comment explaining why /view resolves blake3 hashes

Amp-Thread-ID: https://ampcode.com/threads/T-019ce023-3384-7560-bacf-de40b0de0dd2
Co-authored-by: Amp <amp@ampcode.com>

* Move blake3 hash resolution to asset_management service

Extract resolve_hash_to_path() into asset_management.py and remove
_resolve_blake3_to_path from server.py. Also revert loopback origin
check to original logic.

Amp-Thread-ID: https://ampcode.com/threads/T-019ce023-3384-7560-bacf-de40b0de0dd2
Co-authored-by: Amp <amp@ampcode.com>

* Require at least one tag in UploadAssetSpec

Enforce non-empty tags at the Pydantic validation layer so uploads
with no tags are rejected with a 400 before reaching ingest. Adds
test_upload_empty_tags_rejected to cover this case.

Amp-Thread-ID: https://ampcode.com/threads/T-019ce377-8bde-7048-bc28-a9df063409f9
Co-authored-by: Amp <amp@ampcode.com>

* Add owner_id check to resolve_hash_to_path

Filter asset references by owner visibility so the /view endpoint
only resolves hashes for assets the requesting user can access.
Adds table-driven tests for owner visibility cases.

Amp-Thread-ID: https://ampcode.com/threads/T-019ce377-8bde-7048-bc28-a9df063409f9
Co-authored-by: Amp <amp@ampcode.com>

* Make ReferenceData.created_at and updated_at required

Remove None defaults and type: ignore comments. Move fields before
optional fields to satisfy dataclass ordering.

Amp-Thread-ID: https://ampcode.com/threads/T-019ce377-8bde-7048-bc28-a9df063409f9
Co-authored-by: Amp <amp@ampcode.com>

* Fix double commit in create_from_hash

Move mime_type update into _register_existing_asset so it shares a
single transaction with reference creation. Log a warning when the
hash is not found instead of silently returning None.

Amp-Thread-ID: https://ampcode.com/threads/T-019ce377-8bde-7048-bc28-a9df063409f9
Co-authored-by: Amp <amp@ampcode.com>

* Add exclude_none=True to create/upload responses

Align with get/update/list endpoints for consistent JSON output.

Amp-Thread-ID: https://ampcode.com/threads/T-019ce377-8bde-7048-bc28-a9df063409f9
Co-authored-by: Amp <amp@ampcode.com>

* Change preview_id to reference asset by reference ID, not content ID

Clients receive preview_id in API responses but could not dereference it
through public routes (which use reference IDs). Now preview_id is a
self-referential FK to asset_references.id so the value is directly
usable in the public API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Filter soft-deleted and missing refs from visibility queries

list_references_by_asset_id and list_tags_with_usage were not filtering
out deleted_at/is_missing refs, allowing /view?filename=blake3:... to
serve files through hidden references and inflating tag usage counts.
Add list_all_file_paths_by_asset_id for orphan cleanup which
intentionally needs unfiltered access.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Pass preview_id and mime_type through all asset creation fast paths

The duplicate-content upload path and hash-based creation paths were
silently dropping preview_id and mime_type. This wires both fields
through _register_existing_asset, create_from_hash, and all route
call sites so behavior is consistent regardless of whether the asset
content already exists.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Remove unimplemented client-provided ID from upload API

The `id` field on UploadAssetSpec was advertised for idempotent creation
but never actually honored when creating new references. Remove it
rather than implementing the feature.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Make asset mime_type immutable after first ingest

Prevents cross-tenant metadata mutation when multiple references share
the same content-addressed Asset row. mime_type can now only be set when
NULL (first ingest); subsequent attempts to change it are silently ignored.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Use resolved content_type from asset lookup in /view endpoint

The /view endpoint was discarding the content_type computed by
resolve_hash_to_path() and re-guessing from the filename, which
produced wrong results for extensionless files or mismatched extensions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Merge system+user metadata into filter projection

Extract rebuild_metadata_projection() to build AssetReferenceMeta rows
from {**system_metadata, **user_metadata}, so system-generated metadata
is queryable via metadata_filter and user keys override system keys.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Standardize tag ordering to alphabetical across all endpoints

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Derive subfolder tags from path in register_file_in_place

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Reject client-provided id, fix preview URLs, rename tags→total_tags

- Reject 'id' field in multipart upload with 400 UNSUPPORTED_FIELD
  instead of silently ignoring it
- Build preview URL from the preview asset's own metadata rather than
  the parent asset's
- Rename 'tags' to 'total_tags' in TagsAdd/TagsRemove response schemas
  for clarity

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: SQLite migration 0003 FK drop fails on file-backed DBs (MB-2)

Add naming_convention to Base.metadata so Alembic batch-mode reflection
can match unnamed FK constraints created by migration 0002. Pass
naming_convention and render_as_batch=True through env.py online config.

Add migration roundtrip tests (upgrade/downgrade/cycle from baseline).

Amp-Thread-ID: https://ampcode.com/threads/T-019ce466-1683-7471-b6e1-bb078223cda0
Co-authored-by: Amp <amp@ampcode.com>

* Fix missing tag count for is_missing references and update test for total_tags field

- Allow is_missing=True references to be counted in list_tags_with_usage
  when the tag is 'missing', so the missing tag count reflects all
  references that have been tagged as missing
- Add update_is_missing_by_asset_id query helper for bulk updates by asset
- Update test_add_and_remove_tags to use 'total_tags' matching the API schema

Amp-Thread-ID: https://ampcode.com/threads/T-019ce482-05e7-7324-a1b0-a56a929cc7ef
Co-authored-by: Amp <amp@ampcode.com>

* Remove unused imports in scanner.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Rename prompt_id to job_id on asset_references

Rename the column in the DB model, migration, and service schemas.
The API response emits both job_id and prompt_id (deprecated alias)
for backward compatibility with the cloud API.

Amp-Thread-ID: https://ampcode.com/threads/T-019cef41-60b0-752a-aa3c-ed7f20fda2f7
Co-authored-by: Amp <amp@ampcode.com>

* Add index on asset_references.preview_id for FK cascade performance

Amp-Thread-ID: https://ampcode.com/threads/T-019cef45-a4d2-7548-86d2-d46bcd3db419
Co-authored-by: Amp <amp@ampcode.com>

* Add clarifying comments for Asset/AssetReference naming and preview_id

Amp-Thread-ID: https://ampcode.com/threads/T-019cef49-f94e-7348-bf23-9a19ebf65e0d
Co-authored-by: Amp <amp@ampcode.com>

* Disallow all-null meta rows: add CHECK constraint, skip null values on write

- convert_metadata_to_rows returns [] for None values instead of an all-null row
- Remove dead None branch from _scalar_to_row
- Simplify null filter in common.py to just check for row absence
- Add CHECK constraint ck_asset_reference_meta_has_value to model and migration 0003

Amp-Thread-ID: https://ampcode.com/threads/T-019cef4e-5240-7749-bb25-1f17fcf9c09c
Co-authored-by: Amp <amp@ampcode.com>

* Remove dead None guards on result.asset in upload handler

register_file_in_place guarantees a non-None asset, so the
'if result.asset else None' checks were unreachable.

Amp-Thread-ID: https://ampcode.com/threads/T-019cef5b-4cf8-723c-8a98-8fb8f333c133
Co-authored-by: Amp <amp@ampcode.com>

* Remove mime_type from asset update API

Clients can no longer modify mime_type after asset creation via the
PUT /api/assets/{id} endpoint. This reduces the risk of mime_type
spoofing. The internal update_asset_hash_and_mime function remains
available for server-side use (e.g., enrichment).

Amp-Thread-ID: https://ampcode.com/threads/T-019cef5d-8d61-75cc-a1c6-2841ac395648
Co-authored-by: Amp <amp@ampcode.com>

* Fix migration constraint naming double-prefix and NULL in mixed metadata lists

- Use fully-rendered constraint names in migration 0003 to avoid the
  naming convention doubling the ck_ prefix on batch operations.
- Add table_args to downgrade so SQLite batch mode can find the CHECK
  constraint (not exposed by SQLite reflection).
- Fix model CheckConstraint name to use bare 'has_value' (convention
  auto-prefixes).
- Skip None items when converting metadata lists to rows, preventing
  all-NULL rows that violate the has_value check constraint.

Amp-Thread-ID: https://ampcode.com/threads/T-019cef87-94f9-7172-a6af-c6282290ce4f
Co-authored-by: Amp <amp@ampcode.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Amp <amp@ampcode.com>
2026-03-16 12:34:04 -07:00

1029 lines
30 KiB
Python

"""Query functions for the unified AssetReference table.
This module replaces the separate asset_info.py and cache_state.py query modules,
providing a unified interface for the merged asset_references table.
"""
from collections import defaultdict
from datetime import datetime
from decimal import Decimal
from typing import NamedTuple, Sequence
import sqlalchemy as sa
from sqlalchemy import delete, select
from sqlalchemy.dialects import sqlite
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, noload
from app.assets.database.models import (
Asset,
AssetReference,
AssetReferenceMeta,
AssetReferenceTag,
Tag,
)
from app.assets.database.queries.common import (
MAX_BIND_PARAMS,
apply_metadata_filter,
apply_tag_filters,
build_prefix_like_conditions,
build_visible_owner_clause,
calculate_rows_per_statement,
iter_chunks,
)
from app.assets.helpers import escape_sql_like_string, get_utc_now
def _check_is_scalar(v):
if v is None:
return True
if isinstance(v, bool):
return True
if isinstance(v, (int, float, Decimal, str)):
return True
return False
def _scalar_to_row(key: str, ordinal: int, value) -> dict:
"""Convert a scalar value to a typed projection row."""
if isinstance(value, bool):
return {"key": key, "ordinal": ordinal, "val_bool": bool(value)}
if isinstance(value, (int, float, Decimal)):
num = value if isinstance(value, Decimal) else Decimal(str(value))
return {"key": key, "ordinal": ordinal, "val_num": num}
if isinstance(value, str):
return {"key": key, "ordinal": ordinal, "val_str": value}
return {"key": key, "ordinal": ordinal, "val_json": value}
def convert_metadata_to_rows(key: str, value) -> list[dict]:
"""Turn a metadata key/value into typed projection rows."""
if value is None:
return []
if _check_is_scalar(value):
return [_scalar_to_row(key, 0, value)]
if isinstance(value, list):
if all(_check_is_scalar(x) for x in value):
return [_scalar_to_row(key, i, x) for i, x in enumerate(value) if x is not None]
return [{"key": key, "ordinal": i, "val_json": x} for i, x in enumerate(value) if x is not None]
return [{"key": key, "ordinal": 0, "val_json": value}]
def get_reference_by_id(
session: Session,
reference_id: str,
) -> AssetReference | None:
return session.get(AssetReference, reference_id)
def get_reference_with_owner_check(
session: Session,
reference_id: str,
owner_id: str,
) -> AssetReference:
"""Fetch a reference and verify ownership.
Raises:
ValueError: if reference not found or soft-deleted
PermissionError: if owner_id doesn't match
"""
ref = get_reference_by_id(session, reference_id=reference_id)
if not ref or ref.deleted_at is not None:
raise ValueError(f"AssetReference {reference_id} not found")
if ref.owner_id and ref.owner_id != owner_id:
raise PermissionError("not owner")
return ref
def get_reference_by_file_path(
session: Session,
file_path: str,
) -> AssetReference | None:
"""Get a reference by its file path."""
return (
session.execute(
select(AssetReference).where(AssetReference.file_path == file_path).limit(1)
)
.scalars()
.first()
)
def reference_exists_for_asset_id(
session: Session,
asset_id: str,
) -> bool:
q = (
select(sa.literal(True))
.select_from(AssetReference)
.where(AssetReference.asset_id == asset_id)
.where(AssetReference.deleted_at.is_(None))
.limit(1)
)
return session.execute(q).first() is not None
def reference_exists(
session: Session,
reference_id: str,
) -> bool:
"""Return True if a reference with the given ID exists (not soft-deleted)."""
q = (
select(sa.literal(True))
.select_from(AssetReference)
.where(AssetReference.id == reference_id)
.where(AssetReference.deleted_at.is_(None))
.limit(1)
)
return session.execute(q).first() is not None
def insert_reference(
session: Session,
asset_id: str,
name: str,
owner_id: str = "",
file_path: str | None = None,
mtime_ns: int | None = None,
preview_id: str | None = None,
) -> AssetReference | None:
"""Insert a new AssetReference. Returns None if unique constraint violated."""
now = get_utc_now()
try:
with session.begin_nested():
ref = AssetReference(
asset_id=asset_id,
name=name,
owner_id=owner_id,
file_path=file_path,
mtime_ns=mtime_ns,
preview_id=preview_id,
created_at=now,
updated_at=now,
last_access_time=now,
)
session.add(ref)
session.flush()
return ref
except IntegrityError:
return None
def get_or_create_reference(
session: Session,
asset_id: str,
name: str,
owner_id: str = "",
file_path: str | None = None,
mtime_ns: int | None = None,
preview_id: str | None = None,
) -> tuple[AssetReference, bool]:
"""Get existing or create new AssetReference.
For filesystem references (file_path is set), uniqueness is by file_path.
For API references (file_path is None), we look for matching
asset_id + owner_id + name.
Returns (reference, created).
"""
ref = insert_reference(
session,
asset_id=asset_id,
name=name,
owner_id=owner_id,
file_path=file_path,
mtime_ns=mtime_ns,
preview_id=preview_id,
)
if ref:
return ref, True
# Find existing - priority to file_path match, then name match
if file_path:
existing = get_reference_by_file_path(session, file_path)
else:
existing = (
session.execute(
select(AssetReference)
.where(
AssetReference.asset_id == asset_id,
AssetReference.name == name,
AssetReference.owner_id == owner_id,
AssetReference.file_path.is_(None),
)
.limit(1)
)
.unique()
.scalar_one_or_none()
)
if not existing:
raise RuntimeError("Failed to find AssetReference after insert conflict.")
return existing, False
def update_reference_timestamps(
session: Session,
reference: AssetReference,
preview_id: str | None = None,
) -> None:
"""Update timestamps and optionally preview_id on existing AssetReference."""
now = get_utc_now()
if preview_id and reference.preview_id != preview_id:
reference.preview_id = preview_id
reference.updated_at = now
def list_references_page(
session: Session,
owner_id: str = "",
limit: int = 100,
offset: int = 0,
name_contains: str | None = None,
include_tags: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
metadata_filter: dict | None = None,
sort: str | None = None,
order: str | None = None,
) -> tuple[list[AssetReference], dict[str, list[str]], int]:
"""List references with pagination, filtering, and sorting.
Returns (references, tag_map, total_count).
"""
base = (
select(AssetReference)
.join(Asset, Asset.id == AssetReference.asset_id)
.where(build_visible_owner_clause(owner_id))
.where(AssetReference.is_missing == False) # noqa: E712
.where(AssetReference.deleted_at.is_(None))
.options(noload(AssetReference.tags))
)
if name_contains:
escaped, esc = escape_sql_like_string(name_contains)
base = base.where(AssetReference.name.ilike(f"%{escaped}%", escape=esc))
base = apply_tag_filters(base, include_tags, exclude_tags)
base = apply_metadata_filter(base, metadata_filter)
sort = (sort or "created_at").lower()
order = (order or "desc").lower()
sort_map = {
"name": AssetReference.name,
"created_at": AssetReference.created_at,
"updated_at": AssetReference.updated_at,
"last_access_time": AssetReference.last_access_time,
"size": Asset.size_bytes,
}
sort_col = sort_map.get(sort, AssetReference.created_at)
sort_exp = sort_col.desc() if order == "desc" else sort_col.asc()
base = base.order_by(sort_exp).limit(limit).offset(offset)
count_stmt = (
select(sa.func.count())
.select_from(AssetReference)
.join(Asset, Asset.id == AssetReference.asset_id)
.where(build_visible_owner_clause(owner_id))
.where(AssetReference.is_missing == False) # noqa: E712
.where(AssetReference.deleted_at.is_(None))
)
if name_contains:
escaped, esc = escape_sql_like_string(name_contains)
count_stmt = count_stmt.where(
AssetReference.name.ilike(f"%{escaped}%", escape=esc)
)
count_stmt = apply_tag_filters(count_stmt, include_tags, exclude_tags)
count_stmt = apply_metadata_filter(count_stmt, metadata_filter)
total = int(session.execute(count_stmt).scalar_one() or 0)
refs = session.execute(base).unique().scalars().all()
id_list: list[str] = [r.id for r in refs]
tag_map: dict[str, list[str]] = defaultdict(list)
if id_list:
rows = session.execute(
select(AssetReferenceTag.asset_reference_id, Tag.name)
.join(Tag, Tag.name == AssetReferenceTag.tag_name)
.where(AssetReferenceTag.asset_reference_id.in_(id_list))
.order_by(AssetReferenceTag.tag_name.asc())
)
for ref_id, tag_name in rows.all():
tag_map[ref_id].append(tag_name)
return list(refs), tag_map, total
def fetch_reference_asset_and_tags(
session: Session,
reference_id: str,
owner_id: str = "",
) -> tuple[AssetReference, Asset, list[str]] | None:
stmt = (
select(AssetReference, Asset, Tag.name)
.join(Asset, Asset.id == AssetReference.asset_id)
.join(
AssetReferenceTag,
AssetReferenceTag.asset_reference_id == AssetReference.id,
isouter=True,
)
.join(Tag, Tag.name == AssetReferenceTag.tag_name, isouter=True)
.where(
AssetReference.id == reference_id,
AssetReference.deleted_at.is_(None),
build_visible_owner_clause(owner_id),
)
.options(noload(AssetReference.tags))
.order_by(Tag.name.asc())
)
rows = session.execute(stmt).all()
if not rows:
return None
first_ref, first_asset, _ = rows[0]
tags: list[str] = []
seen: set[str] = set()
for _ref, _asset, tag_name in rows:
if tag_name and tag_name not in seen:
seen.add(tag_name)
tags.append(tag_name)
return first_ref, first_asset, tags
def fetch_reference_and_asset(
session: Session,
reference_id: str,
owner_id: str = "",
) -> tuple[AssetReference, Asset] | None:
stmt = (
select(AssetReference, Asset)
.join(Asset, Asset.id == AssetReference.asset_id)
.where(
AssetReference.id == reference_id,
AssetReference.deleted_at.is_(None),
build_visible_owner_clause(owner_id),
)
.limit(1)
.options(noload(AssetReference.tags))
)
pair = session.execute(stmt).first()
if not pair:
return None
return pair[0], pair[1]
def update_reference_access_time(
session: Session,
reference_id: str,
ts: datetime | None = None,
only_if_newer: bool = True,
) -> None:
ts = ts or get_utc_now()
stmt = sa.update(AssetReference).where(AssetReference.id == reference_id)
if only_if_newer:
stmt = stmt.where(
sa.or_(
AssetReference.last_access_time.is_(None),
AssetReference.last_access_time < ts,
)
)
session.execute(stmt.values(last_access_time=ts))
def update_reference_name(
session: Session,
reference_id: str,
name: str,
) -> None:
"""Update the name of an AssetReference."""
now = get_utc_now()
session.execute(
sa.update(AssetReference)
.where(AssetReference.id == reference_id)
.values(name=name, updated_at=now)
)
def update_reference_updated_at(
session: Session,
reference_id: str,
ts: datetime | None = None,
) -> None:
"""Update the updated_at timestamp of an AssetReference."""
ts = ts or get_utc_now()
session.execute(
sa.update(AssetReference)
.where(AssetReference.id == reference_id)
.values(updated_at=ts)
)
def rebuild_metadata_projection(session: Session, ref: AssetReference) -> None:
"""Delete and rebuild AssetReferenceMeta rows from merged system+user metadata.
The merged dict is ``{**system_metadata, **user_metadata}`` so user keys
override system keys of the same name.
"""
session.execute(
delete(AssetReferenceMeta).where(
AssetReferenceMeta.asset_reference_id == ref.id
)
)
session.flush()
merged = {**(ref.system_metadata or {}), **(ref.user_metadata or {})}
if not merged:
return
rows: list[AssetReferenceMeta] = []
for k, v in merged.items():
for r in convert_metadata_to_rows(k, v):
rows.append(
AssetReferenceMeta(
asset_reference_id=ref.id,
key=r["key"],
ordinal=int(r["ordinal"]),
val_str=r.get("val_str"),
val_num=r.get("val_num"),
val_bool=r.get("val_bool"),
val_json=r.get("val_json"),
)
)
if rows:
session.add_all(rows)
session.flush()
def set_reference_metadata(
session: Session,
reference_id: str,
user_metadata: dict | None = None,
) -> None:
ref = session.get(AssetReference, reference_id)
if not ref:
raise ValueError(f"AssetReference {reference_id} not found")
ref.user_metadata = user_metadata or {}
ref.updated_at = get_utc_now()
session.flush()
rebuild_metadata_projection(session, ref)
def set_reference_system_metadata(
session: Session,
reference_id: str,
system_metadata: dict | None = None,
) -> None:
"""Set system_metadata on a reference and rebuild the merged projection."""
ref = session.get(AssetReference, reference_id)
if not ref:
raise ValueError(f"AssetReference {reference_id} not found")
ref.system_metadata = system_metadata or {}
ref.updated_at = get_utc_now()
session.flush()
rebuild_metadata_projection(session, ref)
def delete_reference_by_id(
session: Session,
reference_id: str,
owner_id: str,
) -> bool:
stmt = sa.delete(AssetReference).where(
AssetReference.id == reference_id,
build_visible_owner_clause(owner_id),
)
return int(session.execute(stmt).rowcount or 0) > 0
def soft_delete_reference_by_id(
session: Session,
reference_id: str,
owner_id: str,
) -> bool:
"""Mark a reference as soft-deleted by setting deleted_at timestamp.
Returns True if the reference was found and marked deleted.
"""
now = get_utc_now()
stmt = (
sa.update(AssetReference)
.where(
AssetReference.id == reference_id,
AssetReference.deleted_at.is_(None),
build_visible_owner_clause(owner_id),
)
.values(deleted_at=now)
)
return int(session.execute(stmt).rowcount or 0) > 0
def set_reference_preview(
session: Session,
reference_id: str,
preview_reference_id: str | None = None,
) -> None:
"""Set or clear preview_id and bump updated_at. Raises on unknown IDs."""
ref = session.get(AssetReference, reference_id)
if not ref:
raise ValueError(f"AssetReference {reference_id} not found")
if preview_reference_id is None:
ref.preview_id = None
else:
if not session.get(AssetReference, preview_reference_id):
raise ValueError(f"Preview AssetReference {preview_reference_id} not found")
ref.preview_id = preview_reference_id
ref.updated_at = get_utc_now()
session.flush()
class CacheStateRow(NamedTuple):
"""Row from reference query with cache state data."""
reference_id: str
file_path: str
mtime_ns: int | None
needs_verify: bool
asset_id: str
asset_hash: str | None
size_bytes: int | None
def list_references_by_asset_id(
session: Session,
asset_id: str,
) -> Sequence[AssetReference]:
return (
session.execute(
select(AssetReference)
.where(AssetReference.asset_id == asset_id)
.where(AssetReference.is_missing == False) # noqa: E712
.where(AssetReference.deleted_at.is_(None))
.order_by(AssetReference.id.asc())
)
.scalars()
.all()
)
def list_all_file_paths_by_asset_id(
session: Session,
asset_id: str,
) -> list[str]:
"""Return every file_path for an asset, including soft-deleted/missing refs.
Used for orphan cleanup where all on-disk files must be removed.
"""
return list(
session.execute(
select(AssetReference.file_path)
.where(AssetReference.asset_id == asset_id)
.where(AssetReference.file_path.isnot(None))
)
.scalars()
.all()
)
def upsert_reference(
session: Session,
asset_id: str,
file_path: str,
name: str,
mtime_ns: int,
owner_id: str = "",
) -> tuple[bool, bool]:
"""Upsert a reference by file_path. Returns (created, updated).
Also restores references that were previously marked as missing.
"""
now = get_utc_now()
vals = {
"asset_id": asset_id,
"file_path": file_path,
"name": name,
"owner_id": owner_id,
"mtime_ns": int(mtime_ns),
"is_missing": False,
"created_at": now,
"updated_at": now,
"last_access_time": now,
}
ins = (
sqlite.insert(AssetReference)
.values(**vals)
.on_conflict_do_nothing(index_elements=[AssetReference.file_path])
)
res = session.execute(ins)
created = int(res.rowcount or 0) > 0
if created:
return True, False
upd = (
sa.update(AssetReference)
.where(AssetReference.file_path == file_path)
.where(
sa.or_(
AssetReference.asset_id != asset_id,
AssetReference.mtime_ns.is_(None),
AssetReference.mtime_ns != int(mtime_ns),
AssetReference.is_missing == True, # noqa: E712
AssetReference.deleted_at.isnot(None),
)
)
.values(
asset_id=asset_id, mtime_ns=int(mtime_ns), is_missing=False,
deleted_at=None, updated_at=now,
)
)
res2 = session.execute(upd)
updated = int(res2.rowcount or 0) > 0
return False, updated
def mark_references_missing_outside_prefixes(
session: Session,
valid_prefixes: list[str],
) -> int:
"""Mark references as missing when file_path doesn't match any valid prefix.
Returns number of references marked as missing.
"""
if not valid_prefixes:
return 0
conds = build_prefix_like_conditions(valid_prefixes)
matches_valid_prefix = sa.or_(*conds)
result = session.execute(
sa.update(AssetReference)
.where(AssetReference.file_path.isnot(None))
.where(AssetReference.deleted_at.is_(None))
.where(~matches_valid_prefix)
.where(AssetReference.is_missing == False) # noqa: E712
.values(is_missing=True)
)
return result.rowcount
def restore_references_by_paths(session: Session, file_paths: list[str]) -> int:
"""Restore references that were previously marked as missing.
Returns number of references restored.
"""
if not file_paths:
return 0
total = 0
for chunk in iter_chunks(file_paths, MAX_BIND_PARAMS):
result = session.execute(
sa.update(AssetReference)
.where(AssetReference.file_path.in_(chunk))
.where(AssetReference.is_missing == True) # noqa: E712
.where(AssetReference.deleted_at.is_(None))
.values(is_missing=False)
)
total += result.rowcount
return total
def get_unreferenced_unhashed_asset_ids(session: Session) -> list[str]:
"""Get IDs of unhashed assets (hash=None) with no active references.
An asset is considered unreferenced if it has no references,
or all its references are marked as missing.
Returns list of asset IDs that are unreferenced.
"""
active_ref_exists = (
sa.select(sa.literal(1))
.where(AssetReference.asset_id == Asset.id)
.where(AssetReference.is_missing == False) # noqa: E712
.where(AssetReference.deleted_at.is_(None))
.correlate(Asset)
.exists()
)
unreferenced_subq = sa.select(Asset.id).where(
Asset.hash.is_(None), ~active_ref_exists
)
return [row[0] for row in session.execute(unreferenced_subq).all()]
def delete_assets_by_ids(session: Session, asset_ids: list[str]) -> int:
"""Delete assets and their references by ID.
Returns number of assets deleted.
"""
if not asset_ids:
return 0
total = 0
for chunk in iter_chunks(asset_ids, MAX_BIND_PARAMS):
session.execute(
sa.delete(AssetReference).where(AssetReference.asset_id.in_(chunk))
)
result = session.execute(sa.delete(Asset).where(Asset.id.in_(chunk)))
total += result.rowcount
return total
def get_references_for_prefixes(
session: Session,
prefixes: list[str],
*,
include_missing: bool = False,
) -> list[CacheStateRow]:
"""Get all references with file paths matching any of the given prefixes.
Args:
session: Database session
prefixes: List of absolute directory prefixes to match
include_missing: If False (default), exclude references marked as missing
Returns:
List of cache state rows with joined asset data
"""
if not prefixes:
return []
conds = build_prefix_like_conditions(prefixes)
query = (
sa.select(
AssetReference.id,
AssetReference.file_path,
AssetReference.mtime_ns,
AssetReference.needs_verify,
AssetReference.asset_id,
Asset.hash,
Asset.size_bytes,
)
.join(Asset, Asset.id == AssetReference.asset_id)
.where(AssetReference.file_path.isnot(None))
.where(AssetReference.deleted_at.is_(None))
.where(sa.or_(*conds))
)
if not include_missing:
query = query.where(AssetReference.is_missing == False) # noqa: E712
rows = session.execute(
query.order_by(AssetReference.asset_id.asc(), AssetReference.id.asc())
).all()
return [
CacheStateRow(
reference_id=row[0],
file_path=row[1],
mtime_ns=row[2],
needs_verify=row[3],
asset_id=row[4],
asset_hash=row[5],
size_bytes=int(row[6]) if row[6] is not None else None,
)
for row in rows
]
def bulk_update_needs_verify(
session: Session, reference_ids: list[str], value: bool
) -> int:
"""Set needs_verify flag for multiple references.
Returns: Number of rows updated
"""
if not reference_ids:
return 0
total = 0
for chunk in iter_chunks(reference_ids, MAX_BIND_PARAMS):
result = session.execute(
sa.update(AssetReference)
.where(AssetReference.id.in_(chunk))
.values(needs_verify=value)
)
total += result.rowcount
return total
def bulk_update_is_missing(
session: Session, reference_ids: list[str], value: bool
) -> int:
"""Set is_missing flag for multiple references.
Returns: Number of rows updated
"""
if not reference_ids:
return 0
total = 0
for chunk in iter_chunks(reference_ids, MAX_BIND_PARAMS):
result = session.execute(
sa.update(AssetReference)
.where(AssetReference.id.in_(chunk))
.values(is_missing=value)
)
total += result.rowcount
return total
def update_is_missing_by_asset_id(
session: Session, asset_id: str, value: bool
) -> int:
"""Set is_missing flag for ALL references belonging to an asset.
Returns: Number of rows updated
"""
result = session.execute(
sa.update(AssetReference)
.where(AssetReference.asset_id == asset_id)
.where(AssetReference.deleted_at.is_(None))
.values(is_missing=value)
)
return result.rowcount
def delete_references_by_ids(session: Session, reference_ids: list[str]) -> int:
"""Delete references by their IDs.
Returns: Number of rows deleted
"""
if not reference_ids:
return 0
total = 0
for chunk in iter_chunks(reference_ids, MAX_BIND_PARAMS):
result = session.execute(
sa.delete(AssetReference).where(AssetReference.id.in_(chunk))
)
total += result.rowcount
return total
def delete_orphaned_seed_asset(session: Session, asset_id: str) -> bool:
"""Delete a seed asset (hash is None) and its references.
Returns: True if asset was deleted, False if not found or has a hash
"""
asset = session.get(Asset, asset_id)
if not asset:
return False
if asset.hash is not None:
return False
session.execute(
sa.delete(AssetReference).where(AssetReference.asset_id == asset_id)
)
session.delete(asset)
return True
class UnenrichedReferenceRow(NamedTuple):
"""Row for references needing enrichment."""
reference_id: str
asset_id: str
file_path: str
enrichment_level: int
def get_unenriched_references(
session: Session,
prefixes: list[str],
max_level: int = 0,
limit: int = 1000,
) -> list[UnenrichedReferenceRow]:
"""Get references that need enrichment (enrichment_level <= max_level).
Args:
session: Database session
prefixes: List of absolute directory prefixes to scan
max_level: Maximum enrichment level to include (0=stubs, 1=metadata done)
limit: Maximum number of rows to return
Returns:
List of unenriched reference rows with file paths
"""
if not prefixes:
return []
conds = build_prefix_like_conditions(prefixes)
query = (
sa.select(
AssetReference.id,
AssetReference.asset_id,
AssetReference.file_path,
AssetReference.enrichment_level,
)
.where(AssetReference.file_path.isnot(None))
.where(AssetReference.deleted_at.is_(None))
.where(sa.or_(*conds))
.where(AssetReference.is_missing == False) # noqa: E712
.where(AssetReference.enrichment_level <= max_level)
.order_by(AssetReference.id.asc())
.limit(limit)
)
rows = session.execute(query).all()
return [
UnenrichedReferenceRow(
reference_id=row[0],
asset_id=row[1],
file_path=row[2],
enrichment_level=row[3],
)
for row in rows
]
def bulk_update_enrichment_level(
session: Session,
reference_ids: list[str],
level: int,
) -> int:
"""Update enrichment level for multiple references.
Returns: Number of rows updated
"""
if not reference_ids:
return 0
result = session.execute(
sa.update(AssetReference)
.where(AssetReference.id.in_(reference_ids))
.values(enrichment_level=level)
)
return result.rowcount
def bulk_insert_references_ignore_conflicts(
session: Session,
rows: list[dict],
) -> None:
"""Bulk insert reference rows with ON CONFLICT DO NOTHING on file_path.
Each dict should have: id, asset_id, file_path, name, owner_id, mtime_ns, etc.
The is_missing field is automatically set to False for new inserts.
"""
if not rows:
return
enriched_rows = [{**row, "is_missing": False} for row in rows]
ins = sqlite.insert(AssetReference).on_conflict_do_nothing(
index_elements=[AssetReference.file_path]
)
for chunk in iter_chunks(enriched_rows, calculate_rows_per_statement(14)):
session.execute(ins, chunk)
def get_references_by_paths_and_asset_ids(
session: Session,
path_to_asset: dict[str, str],
) -> set[str]:
"""Query references to find paths where our asset_id won the insert.
Args:
path_to_asset: Mapping of file_path -> asset_id we tried to insert
Returns:
Set of file_paths where our asset_id is present
"""
if not path_to_asset:
return set()
pairs = list(path_to_asset.items())
winners: set[str] = set()
# Each pair uses 2 bind params, so chunk at MAX_BIND_PARAMS // 2
for chunk in iter_chunks(pairs, MAX_BIND_PARAMS // 2):
pairwise = sa.tuple_(AssetReference.file_path, AssetReference.asset_id).in_(
chunk
)
result = session.execute(
select(AssetReference.file_path).where(pairwise)
)
winners.update(result.scalars().all())
return winners
def get_reference_ids_by_ids(
session: Session,
reference_ids: list[str],
) -> set[str]:
"""Query to find which reference IDs exist in the database."""
if not reference_ids:
return set()
found: set[str] = set()
for chunk in iter_chunks(reference_ids, MAX_BIND_PARAMS):
result = session.execute(
select(AssetReference.id).where(AssetReference.id.in_(chunk))
)
found.update(result.scalars().all())
return found