diff --git a/app/assets/database/queries/__init__.py b/app/assets/database/queries/__init__.py index 874ae3bf2..3a5b51b8a 100644 --- a/app/assets/database/queries/__init__.py +++ b/app/assets/database/queries/__init__.py @@ -1,29 +1,30 @@ # Re-export public API from query modules -# Maintains backward compatibility with old flat queries.py imports +# Pure atomic database queries only - no business logic or orchestration from app.assets.database.queries.asset import ( asset_exists_by_hash, get_asset_by_hash, + upsert_asset, ) from app.assets.database.queries.asset_info import ( asset_info_exists_for_asset_id, get_asset_info_by_id, + insert_asset_info, + get_or_create_asset_info, + update_asset_info_timestamps, list_asset_infos_page, fetch_asset_info_asset_and_tags, fetch_asset_info_and_asset, touch_asset_info_by_id, - create_asset_info_for_existing_asset, replace_asset_info_metadata_projection, - ingest_fs_asset, - update_asset_info_full, delete_asset_info_by_id, set_asset_info_preview, ) from app.assets.database.queries.cache_state import ( list_cache_states_by_asset_id, - pick_best_live_path, + upsert_cache_state, prune_orphaned_assets, fast_db_consistency_pass, ) @@ -43,22 +44,23 @@ __all__ = [ # asset.py "asset_exists_by_hash", "get_asset_by_hash", + "upsert_asset", # asset_info.py "asset_info_exists_for_asset_id", "get_asset_info_by_id", + "insert_asset_info", + "get_or_create_asset_info", + "update_asset_info_timestamps", "list_asset_infos_page", "fetch_asset_info_asset_and_tags", "fetch_asset_info_and_asset", "touch_asset_info_by_id", - "create_asset_info_for_existing_asset", "replace_asset_info_metadata_projection", - "ingest_fs_asset", - "update_asset_info_full", "delete_asset_info_by_id", "set_asset_info_preview", # cache_state.py "list_cache_states_by_asset_id", - "pick_best_live_path", + "upsert_cache_state", "prune_orphaned_assets", "fast_db_consistency_pass", # tags.py diff --git a/app/assets/database/queries/asset.py b/app/assets/database/queries/asset.py index 5d00991a0..5a7fe88ee 100644 --- a/app/assets/database/queries/asset.py +++ b/app/assets/database/queries/asset.py @@ -29,3 +29,46 @@ def get_asset_by_hash( return ( session.execute(select(Asset).where(Asset.hash == asset_hash).limit(1)) ).scalars().first() + + +def upsert_asset( + session: Session, + *, + asset_hash: str, + size_bytes: int, + mime_type: str | None = None, +) -> tuple[Asset, bool, bool]: + """Upsert an Asset by hash. Returns (asset, created, updated).""" + from sqlalchemy.dialects import sqlite + + vals = {"hash": asset_hash, "size_bytes": int(size_bytes)} + if mime_type: + vals["mime_type"] = mime_type + + ins = ( + sqlite.insert(Asset) + .values(**vals) + .on_conflict_do_nothing(index_elements=[Asset.hash]) + ) + res = session.execute(ins) + created = int(res.rowcount or 0) > 0 + + asset = session.execute( + select(Asset).where(Asset.hash == asset_hash).limit(1) + ).scalars().first() + if not asset: + raise RuntimeError("Asset row not found after upsert.") + + updated = False + if not created: + changed = False + if asset.size_bytes != int(size_bytes) and int(size_bytes) > 0: + asset.size_bytes = int(size_bytes) + changed = True + if mime_type and asset.mime_type != mime_type: + asset.mime_type = mime_type + changed = True + if changed: + updated = True + + return asset, created, updated diff --git a/app/assets/database/queries/asset_info.py b/app/assets/database/queries/asset_info.py index f238138b3..703b62ee2 100644 --- a/app/assets/database/queries/asset_info.py +++ b/app/assets/database/queries/asset_info.py @@ -1,24 +1,22 @@ -import os -import logging +""" +Pure atomic database queries for AssetInfo operations. + +This module contains only atomic DB operations - no business logic, +no filesystem operations, no orchestration across multiple tables. +""" from collections import defaultdict from datetime import datetime -from typing import Any, Sequence +from typing import Sequence import sqlalchemy as sa from sqlalchemy import select, delete, exists -from sqlalchemy.dialects import sqlite from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session, contains_eager, noload from app.assets.database.models import ( - Asset, AssetInfo, AssetCacheState, AssetInfoMeta, AssetInfoTag, Tag + Asset, AssetInfo, AssetInfoMeta, AssetInfoTag, Tag ) -from app.assets.helpers import ( - compute_relative_filename, escape_like_prefix, normalize_tags, project_kv, utcnow -) -from app.assets.database.queries.asset import get_asset_by_hash -from app.assets.database.queries.cache_state import list_cache_states_by_asset_id, pick_best_live_path -from app.assets.database.queries.tags import ensure_tags_exist, set_asset_info_tags, remove_missing_tag_for_asset_id +from app.assets.helpers import escape_like_prefix, normalize_tags, project_kv, utcnow def _visible_owner_clause(owner_id: str) -> sa.sql.ClauseElement: @@ -131,6 +129,83 @@ def get_asset_info_by_id( return session.get(AssetInfo, asset_info_id) +def insert_asset_info( + session: Session, + *, + asset_id: str, + owner_id: str, + name: str, + preview_id: str | None = None, +) -> AssetInfo | None: + """Insert a new AssetInfo. Returns None if unique constraint violated.""" + now = utcnow() + try: + with session.begin_nested(): + info = AssetInfo( + owner_id=owner_id, + name=name, + asset_id=asset_id, + preview_id=preview_id, + created_at=now, + updated_at=now, + last_access_time=now, + ) + session.add(info) + session.flush() + return info + except IntegrityError: + return None + + +def get_or_create_asset_info( + session: Session, + *, + asset_id: str, + owner_id: str, + name: str, + preview_id: str | None = None, +) -> tuple[AssetInfo, bool]: + """Get existing or create new AssetInfo. Returns (info, created).""" + info = insert_asset_info( + session, + asset_id=asset_id, + owner_id=owner_id, + name=name, + preview_id=preview_id, + ) + if info: + return info, True + + existing = session.execute( + select(AssetInfo) + .where( + AssetInfo.asset_id == asset_id, + AssetInfo.name == name, + AssetInfo.owner_id == owner_id, + ) + .limit(1) + ).unique().scalar_one_or_none() + if not existing: + raise RuntimeError("Failed to find AssetInfo after insert conflict.") + return existing, False + + +def update_asset_info_timestamps( + session: Session, + *, + asset_info: AssetInfo, + preview_id: str | None = None, +) -> None: + """Update timestamps and optionally preview_id on existing AssetInfo.""" + now = utcnow() + if preview_id and asset_info.preview_id != preview_id: + asset_info.preview_id = preview_id + asset_info.updated_at = now + if asset_info.last_access_time < now: + asset_info.last_access_time = now + session.flush() + + def list_asset_infos_page( session: Session, owner_id: str = "", @@ -273,79 +348,6 @@ def touch_asset_info_by_id( session.execute(stmt.values(last_access_time=ts)) -def create_asset_info_for_existing_asset( - session: Session, - *, - asset_hash: str, - name: str, - user_metadata: dict | None = None, - tags: Sequence[str] | None = None, - tag_origin: str = "manual", - owner_id: str = "", -) -> AssetInfo: - """Create or return an existing AssetInfo for an Asset identified by asset_hash.""" - now = utcnow() - asset = get_asset_by_hash(session, asset_hash=asset_hash) - if not asset: - raise ValueError(f"Unknown asset hash {asset_hash}") - - info = AssetInfo( - owner_id=owner_id, - name=name, - asset_id=asset.id, - preview_id=None, - created_at=now, - updated_at=now, - last_access_time=now, - ) - try: - with session.begin_nested(): - session.add(info) - session.flush() - except IntegrityError: - existing = ( - session.execute( - select(AssetInfo) - .options(noload(AssetInfo.tags)) - .where( - AssetInfo.asset_id == asset.id, - AssetInfo.name == name, - AssetInfo.owner_id == owner_id, - ) - .limit(1) - ) - ).unique().scalars().first() - if not existing: - raise RuntimeError("AssetInfo upsert failed to find existing row after conflict.") - return existing - - new_meta = dict(user_metadata or {}) - computed_filename = None - try: - p = pick_best_live_path(list_cache_states_by_asset_id(session, asset_id=asset.id)) - if p: - computed_filename = compute_relative_filename(p) - except Exception: - computed_filename = None - if computed_filename: - new_meta["filename"] = computed_filename - if new_meta: - replace_asset_info_metadata_projection( - session, - asset_info_id=info.id, - user_metadata=new_meta, - ) - - if tags is not None: - set_asset_info_tags( - session, - asset_info_id=info.id, - tags=tags, - origin=tag_origin, - ) - return info - - def replace_asset_info_metadata_projection( session: Session, *, @@ -385,277 +387,6 @@ def replace_asset_info_metadata_projection( session.flush() -def ingest_fs_asset( - session: Session, - *, - asset_hash: str, - abs_path: str, - size_bytes: int, - mtime_ns: int, - mime_type: str | None = None, - info_name: str | None = None, - owner_id: str = "", - preview_id: str | None = None, - user_metadata: dict | None = None, - tags: Sequence[str] = (), - tag_origin: str = "manual", - require_existing_tags: bool = False, -) -> dict: - """ - Idempotently upsert: - - Asset by content hash (create if missing) - - AssetCacheState(file_path) pointing to asset_id - - Optionally AssetInfo + tag links and metadata projection - Returns flags and ids. - """ - locator = os.path.abspath(abs_path) - now = utcnow() - - if preview_id: - if not session.get(Asset, preview_id): - preview_id = None - - out: dict[str, Any] = { - "asset_created": False, - "asset_updated": False, - "state_created": False, - "state_updated": False, - "asset_info_id": None, - } - - asset = ( - session.execute(select(Asset).where(Asset.hash == asset_hash).limit(1)) - ).scalars().first() - if not asset: - vals = { - "hash": asset_hash, - "size_bytes": int(size_bytes), - "mime_type": mime_type, - "created_at": now, - } - res = session.execute( - sqlite.insert(Asset) - .values(**vals) - .on_conflict_do_nothing(index_elements=[Asset.hash]) - ) - if int(res.rowcount or 0) > 0: - out["asset_created"] = True - asset = ( - session.execute( - select(Asset).where(Asset.hash == asset_hash).limit(1) - ) - ).scalars().first() - if not asset: - raise RuntimeError("Asset row not found after upsert.") - else: - changed = False - if asset.size_bytes != int(size_bytes) and int(size_bytes) > 0: - asset.size_bytes = int(size_bytes) - changed = True - if mime_type and asset.mime_type != mime_type: - asset.mime_type = mime_type - changed = True - if changed: - out["asset_updated"] = True - - vals = { - "asset_id": asset.id, - "file_path": locator, - "mtime_ns": int(mtime_ns), - } - ins = ( - sqlite.insert(AssetCacheState) - .values(**vals) - .on_conflict_do_nothing(index_elements=[AssetCacheState.file_path]) - ) - - res = session.execute(ins) - if int(res.rowcount or 0) > 0: - out["state_created"] = True - else: - upd = ( - sa.update(AssetCacheState) - .where(AssetCacheState.file_path == locator) - .where( - sa.or_( - AssetCacheState.asset_id != asset.id, - AssetCacheState.mtime_ns.is_(None), - AssetCacheState.mtime_ns != int(mtime_ns), - ) - ) - .values(asset_id=asset.id, mtime_ns=int(mtime_ns)) - ) - res2 = session.execute(upd) - if int(res2.rowcount or 0) > 0: - out["state_updated"] = True - - if info_name: - try: - with session.begin_nested(): - info = AssetInfo( - owner_id=owner_id, - name=info_name, - asset_id=asset.id, - preview_id=preview_id, - created_at=now, - updated_at=now, - last_access_time=now, - ) - session.add(info) - session.flush() - out["asset_info_id"] = info.id - except IntegrityError: - pass - - existing_info = ( - session.execute( - select(AssetInfo) - .where( - AssetInfo.asset_id == asset.id, - AssetInfo.name == info_name, - (AssetInfo.owner_id == owner_id), - ) - .limit(1) - ) - ).unique().scalar_one_or_none() - if not existing_info: - raise RuntimeError("Failed to update or insert AssetInfo.") - - if preview_id and existing_info.preview_id != preview_id: - existing_info.preview_id = preview_id - - existing_info.updated_at = now - if existing_info.last_access_time < now: - existing_info.last_access_time = now - session.flush() - out["asset_info_id"] = existing_info.id - - norm = [t.strip().lower() for t in (tags or []) if (t or "").strip()] - if norm and out["asset_info_id"] is not None: - if not require_existing_tags: - ensure_tags_exist(session, norm, tag_type="user") - - existing_tag_names = set( - name for (name,) in (session.execute(select(Tag.name).where(Tag.name.in_(norm)))).all() - ) - missing = [t for t in norm if t not in existing_tag_names] - if missing and require_existing_tags: - raise ValueError(f"Unknown tags: {missing}") - - existing_links = set( - tag_name - for (tag_name,) in ( - session.execute( - select(AssetInfoTag.tag_name).where(AssetInfoTag.asset_info_id == out["asset_info_id"]) - ) - ).all() - ) - to_add = [t for t in norm if t in existing_tag_names and t not in existing_links] - if to_add: - session.add_all( - [ - AssetInfoTag( - asset_info_id=out["asset_info_id"], - tag_name=t, - origin=tag_origin, - added_at=now, - ) - for t in to_add - ] - ) - session.flush() - - if out["asset_info_id"] is not None: - primary_path = pick_best_live_path(list_cache_states_by_asset_id(session, asset_id=asset.id)) - computed_filename = compute_relative_filename(primary_path) if primary_path else None - - current_meta = existing_info.user_metadata or {} - new_meta = dict(current_meta) - if user_metadata is not None: - for k, v in user_metadata.items(): - new_meta[k] = v - if computed_filename: - new_meta["filename"] = computed_filename - - if new_meta != current_meta: - replace_asset_info_metadata_projection( - session, - asset_info_id=out["asset_info_id"], - user_metadata=new_meta, - ) - - try: - remove_missing_tag_for_asset_id(session, asset_id=asset.id) - except Exception: - logging.exception("Failed to clear 'missing' tag for asset %s", asset.id) - return out - - -def update_asset_info_full( - session: Session, - *, - asset_info_id: str, - name: str | None = None, - tags: Sequence[str] | None = None, - user_metadata: dict | None = None, - tag_origin: str = "manual", - asset_info_row: Any = None, -) -> AssetInfo: - if not asset_info_row: - info = session.get(AssetInfo, asset_info_id) - if not info: - raise ValueError(f"AssetInfo {asset_info_id} not found") - else: - info = asset_info_row - - touched = False - if name is not None and name != info.name: - info.name = name - touched = True - - computed_filename = None - try: - p = pick_best_live_path(list_cache_states_by_asset_id(session, asset_id=info.asset_id)) - if p: - computed_filename = compute_relative_filename(p) - except Exception: - computed_filename = None - - if user_metadata is not None: - new_meta = dict(user_metadata) - if computed_filename: - new_meta["filename"] = computed_filename - replace_asset_info_metadata_projection( - session, asset_info_id=asset_info_id, user_metadata=new_meta - ) - touched = True - else: - if computed_filename: - current_meta = info.user_metadata or {} - if current_meta.get("filename") != computed_filename: - new_meta = dict(current_meta) - new_meta["filename"] = computed_filename - replace_asset_info_metadata_projection( - session, asset_info_id=asset_info_id, user_metadata=new_meta - ) - touched = True - - if tags is not None: - set_asset_info_tags( - session, - asset_info_id=asset_info_id, - tags=tags, - origin=tag_origin, - ) - touched = True - - if touched and user_metadata is None: - info.updated_at = utcnow() - session.flush() - - return info - - def delete_asset_info_by_id( session: Session, *, diff --git a/app/assets/database/queries/cache_state.py b/app/assets/database/queries/cache_state.py index 6ed58717a..4ca3cc888 100644 --- a/app/assets/database/queries/cache_state.py +++ b/app/assets/database/queries/cache_state.py @@ -21,20 +21,47 @@ def list_cache_states_by_asset_id( ).scalars().all() -def pick_best_live_path(states: Sequence[AssetCacheState]) -> str: - """ - Return the best on-disk path among cache states: - 1) Prefer a path that exists with needs_verify == False (already verified). - 2) Otherwise, pick the first path that exists. - 3) Otherwise return empty string. - """ - alive = [s for s in states if getattr(s, "file_path", None) and os.path.isfile(s.file_path)] - if not alive: - return "" - for s in alive: - if not getattr(s, "needs_verify", False): - return s.file_path - return alive[0].file_path +def upsert_cache_state( + session: Session, + *, + asset_id: str, + file_path: str, + mtime_ns: int, +) -> tuple[bool, bool]: + """Upsert a cache state by file_path. Returns (created, updated).""" + from sqlalchemy.dialects import sqlite + + vals = { + "asset_id": asset_id, + "file_path": file_path, + "mtime_ns": int(mtime_ns), + } + ins = ( + sqlite.insert(AssetCacheState) + .values(**vals) + .on_conflict_do_nothing(index_elements=[AssetCacheState.file_path]) + ) + res = session.execute(ins) + created = int(res.rowcount or 0) > 0 + + if created: + return True, False + + upd = ( + sa.update(AssetCacheState) + .where(AssetCacheState.file_path == file_path) + .where( + sa.or_( + AssetCacheState.asset_id != asset_id, + AssetCacheState.mtime_ns.is_(None), + AssetCacheState.mtime_ns != int(mtime_ns), + ) + ) + .values(asset_id=asset_id, mtime_ns=int(mtime_ns)) + ) + res2 = session.execute(upd) + updated = int(res2.rowcount or 0) > 0 + return False, updated def prune_orphaned_assets(session: Session, roots: tuple[str, ...], prefixes_for_root_fn) -> int: diff --git a/app/assets/helpers.py b/app/assets/helpers.py index 5030b123a..847597f6e 100644 --- a/app/assets/helpers.py +++ b/app/assets/helpers.py @@ -4,11 +4,27 @@ from decimal import Decimal from aiohttp import web from datetime import datetime, timezone from pathlib import Path -from typing import Literal, Any +from typing import Literal, Any, Sequence import folder_paths +def pick_best_live_path(states: Sequence) -> str: + """ + Return the best on-disk path among cache states: + 1) Prefer a path that exists with needs_verify == False (already verified). + 2) Otherwise, pick the first path that exists. + 3) Otherwise return empty string. + """ + alive = [s for s in states if getattr(s, "file_path", None) and os.path.isfile(s.file_path)] + if not alive: + return "" + for s in alive: + if not getattr(s, "needs_verify", False): + return s.file_path + return alive[0].file_path + + RootType = Literal["models", "input", "output"] ALLOWED_ROOTS: tuple[RootType, ...] = ("models", "input", "output") diff --git a/app/assets/manager.py b/app/assets/manager.py index a68c8c8ae..e79b3b6a7 100644 --- a/app/assets/manager.py +++ b/app/assets/manager.py @@ -1,3 +1,12 @@ +""" +Asset manager - thin API adapter layer. + +This module transforms API schemas to/from service layer calls. +It should NOT contain business logic or direct SQLAlchemy usage. + +Architecture: + API Routes -> manager.py (schema transformation) -> services/ (business logic) -> queries/ (DB ops) +""" import os import mimetypes import contextlib @@ -7,27 +16,31 @@ from app.database.db import create_session from app.assets.api import schemas_out, schemas_in from app.assets.database.queries import ( asset_exists_by_hash, - asset_info_exists_for_asset_id, + fetch_asset_info_and_asset, + fetch_asset_info_asset_and_tags, get_asset_by_hash, get_asset_info_by_id, - fetch_asset_info_asset_and_tags, - fetch_asset_info_and_asset, - create_asset_info_for_existing_asset, - touch_asset_info_by_id, - update_asset_info_full, - delete_asset_info_by_id, - list_cache_states_by_asset_id, - list_asset_infos_page, - list_tags_with_usage, get_asset_tags, - add_tags_to_asset_info, - remove_tags_from_asset_info, - pick_best_live_path, - ingest_fs_asset, - set_asset_info_preview, + list_asset_infos_page, + list_cache_states_by_asset_id, + touch_asset_info_by_id, ) -from app.assets.helpers import resolve_destination_from_tags, ensure_within_base -from app.assets.database.models import Asset +from app.assets.helpers import ( + ensure_within_base, + pick_best_live_path, + resolve_destination_from_tags, +) +from app.assets.services import ( + apply_tags, + delete_asset_reference as svc_delete_asset_reference, + get_asset_detail, + ingest_file_from_path, + register_existing_asset, + remove_tags, + set_asset_preview as svc_set_asset_preview, + update_asset_metadata, +) +from app.assets.services.tagging import list_tags as svc_list_tags def _safe_sort_field(requested: str | None) -> str: @@ -52,9 +65,6 @@ def _safe_filename(name: str | None, fallback: str) -> str: def asset_exists(*, asset_hash: str) -> bool: - """ - Check if an asset with a given hash exists in database. - """ with create_session() as session: return asset_exists_by_hash(session, asset_hash=asset_hash) @@ -118,12 +128,13 @@ def get_asset( asset_info_id: str, owner_id: str = "", ) -> schemas_out.AssetDetail: - with create_session() as session: - res = fetch_asset_info_asset_and_tags(session, asset_info_id=asset_info_id, owner_id=owner_id) - if not res: - raise ValueError(f"AssetInfo {asset_info_id} not found") - info, asset, tag_names = res - preview_id = info.preview_id + result = get_asset_detail(asset_info_id=asset_info_id, owner_id=owner_id) + if not result: + raise ValueError(f"AssetInfo {asset_info_id} not found") + + info = result["info"] + asset = result["asset"] + tag_names = result["tags"] return schemas_out.AssetDetail( id=info.id, @@ -133,7 +144,7 @@ def get_asset( mime_type=asset.mime_type if asset else None, tags=tag_names, user_metadata=info.user_metadata or {}, - preview_id=preview_id, + preview_id=info.preview_id, created_at=info.created_at, last_access_time=info.last_access_time, ) @@ -171,11 +182,7 @@ def upload_asset_from_temp_path( owner_id: str = "", expected_asset_hash: str | None = None, ) -> schemas_out.AssetCreated: - """ - Create new asset or update existing asset from a temporary file path. - """ try: - # NOTE: blake3 is not required right now, so this will fail if blake3 is not installed in local environment import app.assets.hashing as hashing digest = hashing.blake3_hash(temp_path) except Exception as e: @@ -185,40 +192,43 @@ def upload_asset_from_temp_path( if expected_asset_hash and asset_hash != expected_asset_hash.strip().lower(): raise ValueError("HASH_MISMATCH") + # Check if asset already exists by hash with create_session() as session: existing = get_asset_by_hash(session, asset_hash=asset_hash) - if existing is not None: - with contextlib.suppress(Exception): - if temp_path and os.path.exists(temp_path): - os.remove(temp_path) - display_name = _safe_filename(spec.name or (client_filename or ""), fallback=digest) - info = create_asset_info_for_existing_asset( - session, - asset_hash=asset_hash, - name=display_name, - user_metadata=spec.user_metadata or {}, - tags=spec.tags or [], - tag_origin="manual", - owner_id=owner_id, - ) - tag_names = get_asset_tags(session, asset_info_id=info.id) - session.commit() + if existing is not None: + with contextlib.suppress(Exception): + if temp_path and os.path.exists(temp_path): + os.remove(temp_path) - return schemas_out.AssetCreated( - id=info.id, - name=info.name, - asset_hash=existing.hash, - size=int(existing.size_bytes) if existing.size_bytes is not None else None, - mime_type=existing.mime_type, - tags=tag_names, - user_metadata=info.user_metadata or {}, - preview_id=info.preview_id, - created_at=info.created_at, - last_access_time=info.last_access_time, - created_new=False, - ) + display_name = _safe_filename(spec.name or (client_filename or ""), fallback=digest) + result = register_existing_asset( + asset_hash=asset_hash, + name=display_name, + user_metadata=spec.user_metadata or {}, + tags=spec.tags or [], + tag_origin="manual", + owner_id=owner_id, + ) + info = result["info"] + asset = result["asset"] + tag_names = result["tags"] + return schemas_out.AssetCreated( + id=info.id, + name=info.name, + asset_hash=asset.hash, + size=int(asset.size_bytes) if asset.size_bytes is not None else None, + mime_type=asset.mime_type, + tags=tag_names, + user_metadata=info.user_metadata or {}, + preview_id=info.preview_id, + created_at=info.created_at, + last_access_time=info.last_access_time, + created_new=False, + ) + + # New asset - move file to destination base_dir, subdirs = resolve_destination_from_tags(spec.tags) dest_dir = os.path.join(base_dir, *subdirs) if subdirs else base_dir os.makedirs(dest_dir, exist_ok=True) @@ -246,47 +256,44 @@ def upload_asset_from_temp_path( except OSError as e: raise RuntimeError(f"failed to stat destination file: {e}") - with create_session() as session: - result = ingest_fs_asset( - session, - asset_hash=asset_hash, - abs_path=dest_abs, - size_bytes=size_bytes, - mtime_ns=mtime_ns, - mime_type=content_type, - info_name=_safe_filename(spec.name or (client_filename or ""), fallback=digest), - owner_id=owner_id, - preview_id=None, - user_metadata=spec.user_metadata or {}, - tags=spec.tags, - tag_origin="manual", - require_existing_tags=False, - ) - info_id = result["asset_info_id"] - if not info_id: - raise RuntimeError("failed to create asset metadata") + result = ingest_file_from_path( + asset_hash=asset_hash, + abs_path=dest_abs, + size_bytes=size_bytes, + mtime_ns=mtime_ns, + mime_type=content_type, + info_name=_safe_filename(spec.name or (client_filename or ""), fallback=digest), + owner_id=owner_id, + preview_id=None, + user_metadata=spec.user_metadata or {}, + tags=spec.tags, + tag_origin="manual", + require_existing_tags=False, + ) + info_id = result["asset_info_id"] + if not info_id: + raise RuntimeError("failed to create asset metadata") + with create_session() as session: pair = fetch_asset_info_and_asset(session, asset_info_id=info_id, owner_id=owner_id) if not pair: raise RuntimeError("inconsistent DB state after ingest") info, asset = pair tag_names = get_asset_tags(session, asset_info_id=info.id) - created_result = schemas_out.AssetCreated( - id=info.id, - name=info.name, - asset_hash=asset.hash, - size=int(asset.size_bytes), - mime_type=asset.mime_type, - tags=tag_names, - user_metadata=info.user_metadata or {}, - preview_id=info.preview_id, - created_at=info.created_at, - last_access_time=info.last_access_time, - created_new=result["asset_created"], - ) - session.commit() - return created_result + return schemas_out.AssetCreated( + id=info.id, + name=info.name, + asset_hash=asset.hash, + size=int(asset.size_bytes), + mime_type=asset.mime_type, + tags=tag_names, + user_metadata=info.user_metadata or {}, + preview_id=info.preview_id, + created_at=info.created_at, + last_access_time=info.last_access_time, + created_new=result["asset_created"], + ) def update_asset( @@ -297,35 +304,26 @@ def update_asset( user_metadata: dict | None = None, owner_id: str = "", ) -> schemas_out.AssetUpdated: - with create_session() as session: - info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id) - if not info_row: - raise ValueError(f"AssetInfo {asset_info_id} not found") - if info_row.owner_id and info_row.owner_id != owner_id: - raise PermissionError("not owner") + result = update_asset_metadata( + asset_info_id=asset_info_id, + name=name, + tags=tags, + user_metadata=user_metadata, + tag_origin="manual", + owner_id=owner_id, + ) + info = result["info"] + asset = result["asset"] + tag_names = result["tags"] - info = update_asset_info_full( - session, - asset_info_id=asset_info_id, - name=name, - tags=tags, - user_metadata=user_metadata, - tag_origin="manual", - asset_info_row=info_row, - ) - - tag_names = get_asset_tags(session, asset_info_id=asset_info_id) - result = schemas_out.AssetUpdated( - id=info.id, - name=info.name, - asset_hash=info.asset.hash if info.asset else None, - tags=tag_names, - user_metadata=info.user_metadata or {}, - updated_at=info.updated_at, - ) - session.commit() - - return result + return schemas_out.AssetUpdated( + id=info.id, + name=info.name, + asset_hash=asset.hash if asset else None, + tags=tag_names, + user_metadata=info.user_metadata or {}, + updated_at=info.updated_at, + ) def set_asset_preview( @@ -334,71 +332,35 @@ def set_asset_preview( preview_asset_id: str | None = None, owner_id: str = "", ) -> schemas_out.AssetDetail: - with create_session() as session: - info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id) - if not info_row: - raise ValueError(f"AssetInfo {asset_info_id} not found") - if info_row.owner_id and info_row.owner_id != owner_id: - raise PermissionError("not owner") + result = svc_set_asset_preview( + asset_info_id=asset_info_id, + preview_asset_id=preview_asset_id, + owner_id=owner_id, + ) + info = result["info"] + asset = result["asset"] + tags = result["tags"] - set_asset_info_preview( - session, - asset_info_id=asset_info_id, - preview_asset_id=preview_asset_id, - ) - - res = fetch_asset_info_asset_and_tags(session, asset_info_id=asset_info_id, owner_id=owner_id) - if not res: - raise RuntimeError("State changed during preview update") - info, asset, tags = res - result = schemas_out.AssetDetail( - id=info.id, - name=info.name, - asset_hash=asset.hash if asset else None, - size=int(asset.size_bytes) if asset and asset.size_bytes is not None else None, - mime_type=asset.mime_type if asset else None, - tags=tags, - user_metadata=info.user_metadata or {}, - preview_id=info.preview_id, - created_at=info.created_at, - last_access_time=info.last_access_time, - ) - session.commit() - - return result + return schemas_out.AssetDetail( + id=info.id, + name=info.name, + asset_hash=asset.hash if asset else None, + size=int(asset.size_bytes) if asset and asset.size_bytes is not None else None, + mime_type=asset.mime_type if asset else None, + tags=tags, + user_metadata=info.user_metadata or {}, + preview_id=info.preview_id, + created_at=info.created_at, + last_access_time=info.last_access_time, + ) def delete_asset_reference(*, asset_info_id: str, owner_id: str, delete_content_if_orphan: bool = True) -> bool: - with create_session() as session: - info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id) - asset_id = info_row.asset_id if info_row else None - deleted = delete_asset_info_by_id(session, asset_info_id=asset_info_id, owner_id=owner_id) - if not deleted: - session.commit() - return False - - if not delete_content_if_orphan or not asset_id: - session.commit() - return True - - still_exists = asset_info_exists_for_asset_id(session, asset_id=asset_id) - if still_exists: - session.commit() - return True - - states = list_cache_states_by_asset_id(session, asset_id=asset_id) - file_paths = [s.file_path for s in (states or []) if getattr(s, "file_path", None)] - - asset_row = session.get(Asset, asset_id) - if asset_row is not None: - session.delete(asset_row) - - session.commit() - for p in file_paths: - with contextlib.suppress(Exception): - if p and os.path.isfile(p): - os.remove(p) - return True + return svc_delete_asset_reference( + asset_info_id=asset_info_id, + owner_id=owner_id, + delete_content_if_orphan=delete_content_if_orphan, + ) def create_asset_from_hash( @@ -410,37 +372,37 @@ def create_asset_from_hash( owner_id: str = "", ) -> schemas_out.AssetCreated | None: canonical = hash_str.strip().lower() + with create_session() as session: asset = get_asset_by_hash(session, asset_hash=canonical) if not asset: return None - info = create_asset_info_for_existing_asset( - session, - asset_hash=canonical, - name=_safe_filename(name, fallback=canonical.split(":", 1)[1]), - user_metadata=user_metadata or {}, - tags=tags or [], - tag_origin="manual", - owner_id=owner_id, - ) - tag_names = get_asset_tags(session, asset_info_id=info.id) - result = schemas_out.AssetCreated( - id=info.id, - name=info.name, - asset_hash=asset.hash, - size=int(asset.size_bytes), - mime_type=asset.mime_type, - tags=tag_names, - user_metadata=info.user_metadata or {}, - preview_id=info.preview_id, - created_at=info.created_at, - last_access_time=info.last_access_time, - created_new=False, - ) - session.commit() + result = register_existing_asset( + asset_hash=canonical, + name=_safe_filename(name, fallback=canonical.split(":", 1)[1] if ":" in canonical else canonical), + user_metadata=user_metadata or {}, + tags=tags or [], + tag_origin="manual", + owner_id=owner_id, + ) + info = result["info"] + asset = result["asset"] + tag_names = result["tags"] - return result + return schemas_out.AssetCreated( + id=info.id, + name=info.name, + asset_hash=asset.hash, + size=int(asset.size_bytes), + mime_type=asset.mime_type, + tags=tag_names, + user_metadata=info.user_metadata or {}, + preview_id=info.preview_id, + created_at=info.created_at, + last_access_time=info.last_access_time, + created_new=result["created"], + ) def add_tags_to_asset( @@ -450,21 +412,12 @@ def add_tags_to_asset( origin: str = "manual", owner_id: str = "", ) -> schemas_out.TagsAdd: - with create_session() as session: - info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id) - if not info_row: - raise ValueError(f"AssetInfo {asset_info_id} not found") - if info_row.owner_id and info_row.owner_id != owner_id: - raise PermissionError("not owner") - data = add_tags_to_asset_info( - session, - asset_info_id=asset_info_id, - tags=tags, - origin=origin, - create_if_missing=True, - asset_info_row=info_row, - ) - session.commit() + data = apply_tags( + asset_info_id=asset_info_id, + tags=tags, + origin=origin, + owner_id=owner_id, + ) return schemas_out.TagsAdd(**data) @@ -474,19 +427,11 @@ def remove_tags_from_asset( tags: list[str], owner_id: str = "", ) -> schemas_out.TagsRemove: - with create_session() as session: - info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id) - if not info_row: - raise ValueError(f"AssetInfo {asset_info_id} not found") - if info_row.owner_id and info_row.owner_id != owner_id: - raise PermissionError("not owner") - - data = remove_tags_from_asset_info( - session, - asset_info_id=asset_info_id, - tags=tags, - ) - session.commit() + data = remove_tags( + asset_info_id=asset_info_id, + tags=tags, + owner_id=owner_id, + ) return schemas_out.TagsRemove(**data) @@ -498,19 +443,14 @@ def list_tags( include_zero: bool = True, owner_id: str = "", ) -> schemas_out.TagsList: - limit = max(1, min(1000, limit)) - offset = max(0, offset) - - with create_session() as session: - rows, total = list_tags_with_usage( - session, - prefix=prefix, - limit=limit, - offset=offset, - include_zero=include_zero, - order=order, - owner_id=owner_id, - ) + rows, total = svc_list_tags( + prefix=prefix, + limit=limit, + offset=offset, + order=order, + include_zero=include_zero, + owner_id=owner_id, + ) tags = [schemas_out.TagUsage(name=name, count=count, type=tag_type) for (name, tag_type, count) in rows] return schemas_out.TagsList(tags=tags, total=total, has_more=(offset + len(tags)) < total) diff --git a/app/assets/services/__init__.py b/app/assets/services/__init__.py new file mode 100644 index 000000000..c5535bac3 --- /dev/null +++ b/app/assets/services/__init__.py @@ -0,0 +1,34 @@ +# Asset services layer +# Business logic that orchestrates database queries and filesystem operations +# Services own session lifecycle via create_session() + +from app.assets.services.ingest import ( + ingest_file_from_path, + register_existing_asset, +) +from app.assets.services.asset_management import ( + get_asset_detail, + update_asset_metadata, + delete_asset_reference, + set_asset_preview, +) +from app.assets.services.tagging import ( + apply_tags, + remove_tags, + list_tags, +) + +__all__ = [ + # ingest.py + "ingest_file_from_path", + "register_existing_asset", + # asset_management.py + "get_asset_detail", + "update_asset_metadata", + "delete_asset_reference", + "set_asset_preview", + # tagging.py + "apply_tags", + "remove_tags", + "list_tags", +] diff --git a/app/assets/services/asset_management.py b/app/assets/services/asset_management.py new file mode 100644 index 000000000..50a6c68d0 --- /dev/null +++ b/app/assets/services/asset_management.py @@ -0,0 +1,229 @@ +""" +Asset management services - CRUD operations on assets. + +Business logic for: +- get_asset_detail: Fetch full asset details with tags +- update_asset_metadata: Update name, tags, and/or metadata +- delete_asset_reference: Delete AssetInfo and optionally orphaned content +- set_asset_preview: Set or clear preview on an asset +""" +import contextlib +import os +from typing import Sequence + +from app.database.db import create_session +from app.assets.helpers import ( + compute_relative_filename, + pick_best_live_path, + utcnow, +) +from app.assets.database.queries import ( + asset_info_exists_for_asset_id, + delete_asset_info_by_id, + fetch_asset_info_and_asset, + fetch_asset_info_asset_and_tags, + get_asset_info_by_id, + list_cache_states_by_asset_id, + replace_asset_info_metadata_projection, + set_asset_info_preview, + set_asset_info_tags, +) + + +def get_asset_detail( + *, + asset_info_id: str, + owner_id: str = "", +) -> dict | None: + """ + Fetch full asset details including tags. + Returns dict with info, asset, and tags, or None if not found. + """ + with create_session() as session: + result = fetch_asset_info_asset_and_tags( + session, + asset_info_id=asset_info_id, + owner_id=owner_id, + ) + if not result: + return None + + info, asset, tags = result + return { + "info": info, + "asset": asset, + "tags": tags, + } + + +def update_asset_metadata( + *, + asset_info_id: str, + name: str | None = None, + tags: Sequence[str] | None = None, + user_metadata: dict | None = None, + tag_origin: str = "manual", + owner_id: str = "", +) -> dict: + """ + Update name, tags, and/or metadata on an AssetInfo. + Returns updated info dict with tags. + """ + with create_session() as session: + info = get_asset_info_by_id(session, asset_info_id=asset_info_id) + if not info: + raise ValueError(f"AssetInfo {asset_info_id} not found") + if info.owner_id and info.owner_id != owner_id: + raise PermissionError("not owner") + + touched = False + if name is not None and name != info.name: + info.name = name + touched = True + + # Compute filename from best live path + computed_filename = _compute_filename_for_asset(session, info.asset_id) + + if user_metadata is not None: + new_meta = dict(user_metadata) + if computed_filename: + new_meta["filename"] = computed_filename + replace_asset_info_metadata_projection( + session, asset_info_id=asset_info_id, user_metadata=new_meta + ) + touched = True + else: + if computed_filename: + current_meta = info.user_metadata or {} + if current_meta.get("filename") != computed_filename: + new_meta = dict(current_meta) + new_meta["filename"] = computed_filename + replace_asset_info_metadata_projection( + session, asset_info_id=asset_info_id, user_metadata=new_meta + ) + touched = True + + if tags is not None: + set_asset_info_tags( + session, + asset_info_id=asset_info_id, + tags=tags, + origin=tag_origin, + ) + touched = True + + if touched and user_metadata is None: + info.updated_at = utcnow() + session.flush() + + # Fetch updated info with tags + result = fetch_asset_info_asset_and_tags( + session, + asset_info_id=asset_info_id, + owner_id=owner_id, + ) + session.commit() + + if not result: + raise RuntimeError("State changed during update") + + info, asset, tag_list = result + return { + "info": info, + "asset": asset, + "tags": tag_list, + } + + +def delete_asset_reference( + *, + asset_info_id: str, + owner_id: str, + delete_content_if_orphan: bool = True, +) -> bool: + """ + Delete an AssetInfo reference. + If delete_content_if_orphan is True and no other AssetInfos reference the asset, + also delete the Asset and its cached files. + """ + with create_session() as session: + info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id) + asset_id = info_row.asset_id if info_row else None + + deleted = delete_asset_info_by_id(session, asset_info_id=asset_info_id, owner_id=owner_id) + if not deleted: + session.commit() + return False + + if not delete_content_if_orphan or not asset_id: + session.commit() + return True + + still_exists = asset_info_exists_for_asset_id(session, asset_id=asset_id) + if still_exists: + session.commit() + return True + + # Orphaned asset - delete it and its files + states = list_cache_states_by_asset_id(session, asset_id=asset_id) + file_paths = [s.file_path for s in (states or []) if getattr(s, "file_path", None)] + + from app.assets.database.models import Asset + asset_row = session.get(Asset, asset_id) + if asset_row is not None: + session.delete(asset_row) + + session.commit() + + # Delete files after commit + for p in file_paths: + with contextlib.suppress(Exception): + if p and os.path.isfile(p): + os.remove(p) + + return True + + +def set_asset_preview( + *, + asset_info_id: str, + preview_asset_id: str | None = None, + owner_id: str = "", +) -> dict: + """ + Set or clear preview_id on an AssetInfo. + Returns updated asset detail dict. + """ + with create_session() as session: + info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id) + if not info_row: + raise ValueError(f"AssetInfo {asset_info_id} not found") + if info_row.owner_id and info_row.owner_id != owner_id: + raise PermissionError("not owner") + + set_asset_info_preview( + session, + asset_info_id=asset_info_id, + preview_asset_id=preview_asset_id, + ) + + result = fetch_asset_info_asset_and_tags( + session, asset_info_id=asset_info_id, owner_id=owner_id + ) + if not result: + raise RuntimeError("State changed during preview update") + + info, asset, tags = result + session.commit() + + return { + "info": info, + "asset": asset, + "tags": tags, + } + + +def _compute_filename_for_asset(session, asset_id: str) -> str | None: + """Compute the relative filename for an asset from its cache states.""" + primary_path = pick_best_live_path(list_cache_states_by_asset_id(session, asset_id=asset_id)) + return compute_relative_filename(primary_path) if primary_path else None diff --git a/app/assets/services/ingest.py b/app/assets/services/ingest.py new file mode 100644 index 000000000..7d621e467 --- /dev/null +++ b/app/assets/services/ingest.py @@ -0,0 +1,255 @@ +""" +Ingest services - handles ingesting files into the asset database. + +Business logic for: +- ingest_file_from_path: Ingest a file from filesystem path (upsert asset, cache state, info) +- register_existing_asset: Create AssetInfo for an asset that already exists by hash +""" +import logging +import os +from typing import Sequence + +from app.database.db import create_session +from app.assets.helpers import ( + compute_relative_filename, + normalize_tags, + pick_best_live_path, + utcnow, +) +from app.assets.database.queries import ( + get_asset_by_hash, + get_or_create_asset_info, + list_cache_states_by_asset_id, + remove_missing_tag_for_asset_id, + replace_asset_info_metadata_projection, + set_asset_info_tags, + update_asset_info_timestamps, + upsert_asset, + upsert_cache_state, + add_tags_to_asset_info, + ensure_tags_exist, + get_asset_tags, +) + + +def ingest_file_from_path( + *, + abs_path: str, + asset_hash: str, + size_bytes: int, + mtime_ns: int, + mime_type: str | None = None, + info_name: str | None = None, + owner_id: str = "", + preview_id: str | None = None, + user_metadata: dict | None = None, + tags: Sequence[str] = (), + tag_origin: str = "manual", + require_existing_tags: bool = False, +) -> dict: + """ + Idempotently upsert: + - Asset by content hash (create if missing) + - AssetCacheState(file_path) pointing to asset_id + - Optionally AssetInfo + tag links and metadata projection + Returns flags and ids. + """ + locator = os.path.abspath(abs_path) + + out: dict = { + "asset_created": False, + "asset_updated": False, + "state_created": False, + "state_updated": False, + "asset_info_id": None, + } + + with create_session() as session: + # Validate preview_id if provided + if preview_id: + from app.assets.database.models import Asset + if not session.get(Asset, preview_id): + preview_id = None + + # 1. Upsert Asset + asset, created, updated = upsert_asset( + session, + asset_hash=asset_hash, + size_bytes=size_bytes, + mime_type=mime_type, + ) + out["asset_created"] = created + out["asset_updated"] = updated + + # 2. Upsert CacheState + state_created, state_updated = upsert_cache_state( + session, + asset_id=asset.id, + file_path=locator, + mtime_ns=mtime_ns, + ) + out["state_created"] = state_created + out["state_updated"] = state_updated + + # 3. Optionally create/update AssetInfo + if info_name: + info, info_created = get_or_create_asset_info( + session, + asset_id=asset.id, + owner_id=owner_id, + name=info_name, + preview_id=preview_id, + ) + if info_created: + out["asset_info_id"] = info.id + else: + update_asset_info_timestamps(session, asset_info=info, preview_id=preview_id) + out["asset_info_id"] = info.id + + # 4. Handle tags + norm = normalize_tags(list(tags)) + if norm and out["asset_info_id"]: + if require_existing_tags: + _validate_tags_exist(session, norm) + add_tags_to_asset_info( + session, + asset_info_id=out["asset_info_id"], + tags=norm, + origin=tag_origin, + create_if_missing=not require_existing_tags, + ) + + # 5. Update metadata with computed filename + if out["asset_info_id"]: + _update_metadata_with_filename( + session, + asset_info_id=out["asset_info_id"], + asset_id=asset.id, + info=info, + user_metadata=user_metadata, + ) + + # 6. Remove missing tag + try: + remove_missing_tag_for_asset_id(session, asset_id=asset.id) + except Exception: + logging.exception("Failed to clear 'missing' tag for asset %s", asset.id) + + session.commit() + + return out + + +def register_existing_asset( + *, + asset_hash: str, + name: str, + user_metadata: dict | None = None, + tags: list[str] | None = None, + tag_origin: str = "manual", + owner_id: str = "", +) -> dict: + """ + Create or return existing AssetInfo for an asset that already exists by hash. + + Returns dict with asset and info details, or raises ValueError if hash not found. + """ + with create_session() as session: + asset = get_asset_by_hash(session, asset_hash=asset_hash) + if not asset: + raise ValueError(f"No asset with hash {asset_hash}") + + info, info_created = get_or_create_asset_info( + session, + asset_id=asset.id, + owner_id=owner_id, + name=name, + preview_id=None, + ) + + if not info_created: + # Return existing info + tag_names = get_asset_tags(session, asset_info_id=info.id) + session.commit() + return { + "info": info, + "asset": asset, + "tags": tag_names, + "created": False, + } + + # New info - apply metadata and tags + new_meta = dict(user_metadata or {}) + computed_filename = _compute_filename_for_asset(session, asset.id) + if computed_filename: + new_meta["filename"] = computed_filename + + if new_meta: + replace_asset_info_metadata_projection( + session, + asset_info_id=info.id, + user_metadata=new_meta, + ) + + if tags is not None: + set_asset_info_tags( + session, + asset_info_id=info.id, + tags=tags, + origin=tag_origin, + ) + + tag_names = get_asset_tags(session, asset_info_id=info.id) + session.commit() + + return { + "info": info, + "asset": asset, + "tags": tag_names, + "created": True, + } + + +def _validate_tags_exist(session, tags: list[str]) -> None: + """Raise ValueError if any tags don't exist.""" + from sqlalchemy import select + from app.assets.database.models import Tag + existing_tag_names = set( + name for (name,) in session.execute(select(Tag.name).where(Tag.name.in_(tags))).all() + ) + missing = [t for t in tags if t not in existing_tag_names] + if missing: + raise ValueError(f"Unknown tags: {missing}") + + +def _compute_filename_for_asset(session, asset_id: str) -> str | None: + """Compute the relative filename for an asset from its cache states.""" + primary_path = pick_best_live_path(list_cache_states_by_asset_id(session, asset_id=asset_id)) + return compute_relative_filename(primary_path) if primary_path else None + + +def _update_metadata_with_filename( + session, + *, + asset_info_id: str, + asset_id: str, + info, + user_metadata: dict | None, +) -> None: + """Update metadata projection with computed filename.""" + computed_filename = _compute_filename_for_asset(session, asset_id) + + current_meta = info.user_metadata or {} + new_meta = dict(current_meta) + if user_metadata: + for k, v in user_metadata.items(): + new_meta[k] = v + if computed_filename: + new_meta["filename"] = computed_filename + + if new_meta != current_meta: + replace_asset_info_metadata_projection( + session, + asset_info_id=asset_info_id, + user_metadata=new_meta, + ) diff --git a/app/assets/services/tagging.py b/app/assets/services/tagging.py new file mode 100644 index 000000000..135b460f3 --- /dev/null +++ b/app/assets/services/tagging.py @@ -0,0 +1,102 @@ +""" +Tagging services - manage tags on assets. + +Business logic for: +- apply_tags: Add tags to an asset +- remove_tags: Remove tags from an asset +- list_tags: List tags with usage counts +""" +from app.database.db import create_session +from app.assets.database.queries import ( + add_tags_to_asset_info, + get_asset_info_by_id, + list_tags_with_usage, + remove_tags_from_asset_info, +) + + +def apply_tags( + *, + asset_info_id: str, + tags: list[str], + origin: str = "manual", + owner_id: str = "", +) -> dict: + """ + Add tags to an asset. + Returns dict with added, already_present, and total_tags lists. + """ + with create_session() as session: + info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id) + if not info_row: + raise ValueError(f"AssetInfo {asset_info_id} not found") + if info_row.owner_id and info_row.owner_id != owner_id: + raise PermissionError("not owner") + + data = add_tags_to_asset_info( + session, + asset_info_id=asset_info_id, + tags=tags, + origin=origin, + create_if_missing=True, + asset_info_row=info_row, + ) + session.commit() + + return data + + +def remove_tags( + *, + asset_info_id: str, + tags: list[str], + owner_id: str = "", +) -> dict: + """ + Remove tags from an asset. + Returns dict with removed, not_present, and total_tags lists. + """ + with create_session() as session: + info_row = get_asset_info_by_id(session, asset_info_id=asset_info_id) + if not info_row: + raise ValueError(f"AssetInfo {asset_info_id} not found") + if info_row.owner_id and info_row.owner_id != owner_id: + raise PermissionError("not owner") + + data = remove_tags_from_asset_info( + session, + asset_info_id=asset_info_id, + tags=tags, + ) + session.commit() + + return data + + +def list_tags( + prefix: str | None = None, + limit: int = 100, + offset: int = 0, + order: str = "count_desc", + include_zero: bool = True, + owner_id: str = "", +) -> tuple[list[tuple[str, str, int]], int]: + """ + List tags with usage counts. + Returns (rows, total) where rows are (name, tag_type, count) tuples. + """ + limit = max(1, min(1000, limit)) + offset = max(0, offset) + + with create_session() as session: + rows, total = list_tags_with_usage( + session, + prefix=prefix, + limit=limit, + offset=offset, + include_zero=include_zero, + order=order, + owner_id=owner_id, + ) + + return rows, total diff --git a/tests-unit/assets_test/queries/test_cache_state.py b/tests-unit/assets_test/queries/test_cache_state.py index 6e7d768eb..bd5ea4a92 100644 --- a/tests-unit/assets_test/queries/test_cache_state.py +++ b/tests-unit/assets_test/queries/test_cache_state.py @@ -2,10 +2,8 @@ from sqlalchemy.orm import Session from app.assets.database.models import Asset, AssetCacheState -from app.assets.database.queries import ( - list_cache_states_by_asset_id, - pick_best_live_path, -) +from app.assets.database.queries import list_cache_states_by_asset_id +from app.assets.helpers import pick_best_live_path def _make_asset(session: Session, hash_val: str | None = None, size: int = 1024) -> Asset: diff --git a/tests-unit/assets_test/services/__init__.py b/tests-unit/assets_test/services/__init__.py new file mode 100644 index 000000000..d0213422e --- /dev/null +++ b/tests-unit/assets_test/services/__init__.py @@ -0,0 +1 @@ +# Service layer tests diff --git a/tests-unit/assets_test/services/conftest.py b/tests-unit/assets_test/services/conftest.py new file mode 100644 index 000000000..d48e2fbe9 --- /dev/null +++ b/tests-unit/assets_test/services/conftest.py @@ -0,0 +1,49 @@ +import os +import tempfile +from pathlib import Path +from unittest.mock import patch + +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import Session + +from app.assets.database.models import Base + + +@pytest.fixture +def db_engine(): + """In-memory SQLite engine for fast unit tests.""" + engine = create_engine("sqlite:///:memory:") + Base.metadata.create_all(engine) + return engine + + +@pytest.fixture +def session(db_engine): + """Session fixture for tests that need direct DB access.""" + with Session(db_engine) as sess: + yield sess + + +@pytest.fixture +def mock_create_session(db_engine): + """Patch create_session to use our in-memory database.""" + from contextlib import contextmanager + from sqlalchemy.orm import Session as SASession + + @contextmanager + def _create_session(): + with SASession(db_engine) as sess: + yield sess + + with patch("app.assets.services.ingest.create_session", _create_session), \ + patch("app.assets.services.asset_management.create_session", _create_session), \ + patch("app.assets.services.tagging.create_session", _create_session): + yield _create_session + + +@pytest.fixture +def temp_dir(): + """Temporary directory for file operations.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) diff --git a/tests-unit/assets_test/services/test_asset_management.py b/tests-unit/assets_test/services/test_asset_management.py new file mode 100644 index 000000000..bf2b33194 --- /dev/null +++ b/tests-unit/assets_test/services/test_asset_management.py @@ -0,0 +1,264 @@ +"""Tests for asset_management services.""" +import pytest +from sqlalchemy.orm import Session + +from app.assets.database.models import Asset, AssetInfo, Tag +from app.assets.database.queries import ensure_tags_exist, add_tags_to_asset_info +from app.assets.helpers import utcnow +from app.assets.services import ( + get_asset_detail, + update_asset_metadata, + delete_asset_reference, + set_asset_preview, +) + + +def _make_asset(session: Session, hash_val: str = "blake3:test", size: int = 1024) -> Asset: + asset = Asset(hash=hash_val, size_bytes=size, mime_type="application/octet-stream") + session.add(asset) + session.flush() + return asset + + +def _make_asset_info( + session: Session, + asset: Asset, + name: str = "test", + owner_id: str = "", +) -> AssetInfo: + now = utcnow() + info = AssetInfo( + owner_id=owner_id, + name=name, + asset_id=asset.id, + created_at=now, + updated_at=now, + last_access_time=now, + ) + session.add(info) + session.flush() + return info + + +class TestGetAssetDetail: + def test_returns_none_for_nonexistent(self, mock_create_session): + result = get_asset_detail(asset_info_id="nonexistent") + assert result is None + + def test_returns_asset_with_tags(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset, name="test.bin") + ensure_tags_exist(session, ["alpha", "beta"]) + add_tags_to_asset_info(session, asset_info_id=info.id, tags=["alpha", "beta"]) + session.commit() + + result = get_asset_detail(asset_info_id=info.id) + + assert result is not None + assert result["info"].id == info.id + assert result["asset"].id == asset.id + assert set(result["tags"]) == {"alpha", "beta"} + + def test_respects_owner_visibility(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset, owner_id="user1") + session.commit() + + # Wrong owner cannot see + result = get_asset_detail(asset_info_id=info.id, owner_id="user2") + assert result is None + + # Correct owner can see + result = get_asset_detail(asset_info_id=info.id, owner_id="user1") + assert result is not None + + +class TestUpdateAssetMetadata: + def test_updates_name(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset, name="old_name.bin") + info_id = info.id + session.commit() + + update_asset_metadata( + asset_info_id=info_id, + name="new_name.bin", + ) + + # Verify by re-fetching from DB + session.expire_all() + updated_info = session.get(AssetInfo, info_id) + assert updated_info.name == "new_name.bin" + + def test_updates_tags(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset) + ensure_tags_exist(session, ["old"]) + add_tags_to_asset_info(session, asset_info_id=info.id, tags=["old"]) + session.commit() + + result = update_asset_metadata( + asset_info_id=info.id, + tags=["new1", "new2"], + ) + + assert set(result["tags"]) == {"new1", "new2"} + assert "old" not in result["tags"] + + def test_updates_user_metadata(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset) + info_id = info.id + session.commit() + + update_asset_metadata( + asset_info_id=info_id, + user_metadata={"key": "value", "num": 42}, + ) + + # Verify by re-fetching from DB + session.expire_all() + updated_info = session.get(AssetInfo, info_id) + assert updated_info.user_metadata["key"] == "value" + assert updated_info.user_metadata["num"] == 42 + + def test_raises_for_nonexistent(self, mock_create_session): + with pytest.raises(ValueError, match="not found"): + update_asset_metadata(asset_info_id="nonexistent", name="fail") + + def test_raises_for_wrong_owner(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset, owner_id="user1") + session.commit() + + with pytest.raises(PermissionError, match="not owner"): + update_asset_metadata( + asset_info_id=info.id, + name="new", + owner_id="user2", + ) + + +class TestDeleteAssetReference: + def test_deletes_asset_info(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset) + info_id = info.id + session.commit() + + result = delete_asset_reference( + asset_info_id=info_id, + owner_id="", + delete_content_if_orphan=False, + ) + + assert result is True + assert session.get(AssetInfo, info_id) is None + + def test_returns_false_for_nonexistent(self, mock_create_session): + result = delete_asset_reference( + asset_info_id="nonexistent", + owner_id="", + ) + assert result is False + + def test_returns_false_for_wrong_owner(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset, owner_id="user1") + info_id = info.id + session.commit() + + result = delete_asset_reference( + asset_info_id=info_id, + owner_id="user2", + ) + + assert result is False + assert session.get(AssetInfo, info_id) is not None + + def test_keeps_asset_if_other_infos_exist(self, mock_create_session, session: Session): + asset = _make_asset(session) + info1 = _make_asset_info(session, asset, name="info1") + info2 = _make_asset_info(session, asset, name="info2") + asset_id = asset.id + session.commit() + + delete_asset_reference( + asset_info_id=info1.id, + owner_id="", + delete_content_if_orphan=True, + ) + + # Asset should still exist + assert session.get(Asset, asset_id) is not None + + def test_deletes_orphaned_asset(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset) + asset_id = asset.id + info_id = info.id + session.commit() + + delete_asset_reference( + asset_info_id=info_id, + owner_id="", + delete_content_if_orphan=True, + ) + + # Both info and asset should be gone + assert session.get(AssetInfo, info_id) is None + assert session.get(Asset, asset_id) is None + + +class TestSetAssetPreview: + def test_sets_preview(self, mock_create_session, session: Session): + asset = _make_asset(session, hash_val="blake3:main") + preview_asset = _make_asset(session, hash_val="blake3:preview") + info = _make_asset_info(session, asset) + info_id = info.id + preview_id = preview_asset.id + session.commit() + + set_asset_preview( + asset_info_id=info_id, + preview_asset_id=preview_id, + ) + + # Verify by re-fetching from DB + session.expire_all() + updated_info = session.get(AssetInfo, info_id) + assert updated_info.preview_id == preview_id + + def test_clears_preview(self, mock_create_session, session: Session): + asset = _make_asset(session) + preview_asset = _make_asset(session, hash_val="blake3:preview") + info = _make_asset_info(session, asset) + info.preview_id = preview_asset.id + info_id = info.id + session.commit() + + set_asset_preview( + asset_info_id=info_id, + preview_asset_id=None, + ) + + # Verify by re-fetching from DB + session.expire_all() + updated_info = session.get(AssetInfo, info_id) + assert updated_info.preview_id is None + + def test_raises_for_nonexistent_info(self, mock_create_session): + with pytest.raises(ValueError, match="not found"): + set_asset_preview(asset_info_id="nonexistent") + + def test_raises_for_wrong_owner(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset, owner_id="user1") + session.commit() + + with pytest.raises(PermissionError, match="not owner"): + set_asset_preview( + asset_info_id=info.id, + preview_asset_id=None, + owner_id="user2", + ) diff --git a/tests-unit/assets_test/services/test_ingest.py b/tests-unit/assets_test/services/test_ingest.py new file mode 100644 index 000000000..1b8fda1b5 --- /dev/null +++ b/tests-unit/assets_test/services/test_ingest.py @@ -0,0 +1,228 @@ +"""Tests for ingest services.""" +import os +from pathlib import Path + +import pytest +from sqlalchemy.orm import Session + +from app.assets.database.models import Asset, AssetCacheState, AssetInfo, Tag +from app.assets.database.queries import ensure_tags_exist, get_asset_tags +from app.assets.services import ingest_file_from_path, register_existing_asset + + +class TestIngestFileFromPath: + def test_creates_asset_and_cache_state(self, mock_create_session, temp_dir: Path, session: Session): + file_path = temp_dir / "test_file.bin" + file_path.write_bytes(b"test content") + + result = ingest_file_from_path( + abs_path=str(file_path), + asset_hash="blake3:abc123", + size_bytes=12, + mtime_ns=1234567890000000000, + mime_type="application/octet-stream", + ) + + assert result["asset_created"] is True + assert result["state_created"] is True + assert result["asset_info_id"] is None # no info_name provided + + # Verify DB state + assets = session.query(Asset).all() + assert len(assets) == 1 + assert assets[0].hash == "blake3:abc123" + + states = session.query(AssetCacheState).all() + assert len(states) == 1 + assert states[0].file_path == str(file_path) + + def test_creates_asset_info_when_name_provided(self, mock_create_session, temp_dir: Path, session: Session): + file_path = temp_dir / "model.safetensors" + file_path.write_bytes(b"model data") + + result = ingest_file_from_path( + abs_path=str(file_path), + asset_hash="blake3:def456", + size_bytes=10, + mtime_ns=1234567890000000000, + mime_type="application/octet-stream", + info_name="My Model", + owner_id="user1", + ) + + assert result["asset_created"] is True + assert result["asset_info_id"] is not None + + info = session.query(AssetInfo).first() + assert info is not None + assert info.name == "My Model" + assert info.owner_id == "user1" + + def test_creates_tags_when_provided(self, mock_create_session, temp_dir: Path, session: Session): + file_path = temp_dir / "tagged.bin" + file_path.write_bytes(b"data") + + result = ingest_file_from_path( + abs_path=str(file_path), + asset_hash="blake3:ghi789", + size_bytes=4, + mtime_ns=1234567890000000000, + info_name="Tagged Asset", + tags=["models", "checkpoints"], + ) + + assert result["asset_info_id"] is not None + + # Verify tags were created and linked + tags = session.query(Tag).all() + tag_names = {t.name for t in tags} + assert "models" in tag_names + assert "checkpoints" in tag_names + + asset_tags = get_asset_tags(session, asset_info_id=result["asset_info_id"]) + assert set(asset_tags) == {"models", "checkpoints"} + + def test_idempotent_upsert(self, mock_create_session, temp_dir: Path, session: Session): + file_path = temp_dir / "dup.bin" + file_path.write_bytes(b"content") + + # First ingest + r1 = ingest_file_from_path( + abs_path=str(file_path), + asset_hash="blake3:repeat", + size_bytes=7, + mtime_ns=1234567890000000000, + ) + assert r1["asset_created"] is True + + # Second ingest with same hash - should update, not create + r2 = ingest_file_from_path( + abs_path=str(file_path), + asset_hash="blake3:repeat", + size_bytes=7, + mtime_ns=1234567890000000001, # different mtime + ) + assert r2["asset_created"] is False + assert r2["state_updated"] is True or r2["state_created"] is False + + # Still only one asset + assets = session.query(Asset).all() + assert len(assets) == 1 + + def test_validates_preview_id(self, mock_create_session, temp_dir: Path, session: Session): + file_path = temp_dir / "with_preview.bin" + file_path.write_bytes(b"data") + + # Create a preview asset first + preview_asset = Asset(hash="blake3:preview", size_bytes=100) + session.add(preview_asset) + session.commit() + preview_id = preview_asset.id + + result = ingest_file_from_path( + abs_path=str(file_path), + asset_hash="blake3:main", + size_bytes=4, + mtime_ns=1234567890000000000, + info_name="With Preview", + preview_id=preview_id, + ) + + assert result["asset_info_id"] is not None + info = session.query(AssetInfo).filter_by(id=result["asset_info_id"]).first() + assert info.preview_id == preview_id + + def test_invalid_preview_id_is_cleared(self, mock_create_session, temp_dir: Path, session: Session): + file_path = temp_dir / "bad_preview.bin" + file_path.write_bytes(b"data") + + result = ingest_file_from_path( + abs_path=str(file_path), + asset_hash="blake3:badpreview", + size_bytes=4, + mtime_ns=1234567890000000000, + info_name="Bad Preview", + preview_id="nonexistent-uuid", + ) + + assert result["asset_info_id"] is not None + info = session.query(AssetInfo).filter_by(id=result["asset_info_id"]).first() + assert info.preview_id is None + + +class TestRegisterExistingAsset: + def test_creates_info_for_existing_asset(self, mock_create_session, session: Session): + # Create existing asset + asset = Asset(hash="blake3:existing", size_bytes=1024, mime_type="image/png") + session.add(asset) + session.commit() + + result = register_existing_asset( + asset_hash="blake3:existing", + name="Registered Asset", + user_metadata={"key": "value"}, + tags=["models"], + ) + + assert result["created"] is True + assert "models" in result["tags"] + + # Verify by re-fetching from DB + session.expire_all() + infos = session.query(AssetInfo).filter_by(name="Registered Asset").all() + assert len(infos) == 1 + + def test_returns_existing_info(self, mock_create_session, session: Session): + # Create asset and info + asset = Asset(hash="blake3:withinfo", size_bytes=512) + session.add(asset) + session.flush() + + from app.assets.helpers import utcnow + info = AssetInfo( + owner_id="", + name="Existing Info", + asset_id=asset.id, + created_at=utcnow(), + updated_at=utcnow(), + last_access_time=utcnow(), + ) + session.add(info) + session.flush() # Flush to get the ID + info_id = info.id + session.commit() + + result = register_existing_asset( + asset_hash="blake3:withinfo", + name="Existing Info", + owner_id="", + ) + + assert result["created"] is False + + # Verify only one AssetInfo exists for this name + session.expire_all() + infos = session.query(AssetInfo).filter_by(name="Existing Info").all() + assert len(infos) == 1 + assert infos[0].id == info_id + + def test_raises_for_nonexistent_hash(self, mock_create_session): + with pytest.raises(ValueError, match="No asset with hash"): + register_existing_asset( + asset_hash="blake3:doesnotexist", + name="Fail", + ) + + def test_applies_tags_to_new_info(self, mock_create_session, session: Session): + asset = Asset(hash="blake3:tagged", size_bytes=256) + session.add(asset) + session.commit() + + result = register_existing_asset( + asset_hash="blake3:tagged", + name="Tagged Info", + tags=["alpha", "beta"], + ) + + assert result["created"] is True + assert set(result["tags"]) == {"alpha", "beta"} diff --git a/tests-unit/assets_test/services/test_tagging.py b/tests-unit/assets_test/services/test_tagging.py new file mode 100644 index 000000000..625cddc98 --- /dev/null +++ b/tests-unit/assets_test/services/test_tagging.py @@ -0,0 +1,197 @@ +"""Tests for tagging services.""" +import pytest +from sqlalchemy.orm import Session + +from app.assets.database.models import Asset, AssetInfo, Tag +from app.assets.database.queries import ensure_tags_exist, add_tags_to_asset_info +from app.assets.helpers import utcnow +from app.assets.services import apply_tags, remove_tags, list_tags + + +def _make_asset(session: Session, hash_val: str = "blake3:test") -> Asset: + asset = Asset(hash=hash_val, size_bytes=1024) + session.add(asset) + session.flush() + return asset + + +def _make_asset_info( + session: Session, + asset: Asset, + name: str = "test", + owner_id: str = "", +) -> AssetInfo: + now = utcnow() + info = AssetInfo( + owner_id=owner_id, + name=name, + asset_id=asset.id, + created_at=now, + updated_at=now, + last_access_time=now, + ) + session.add(info) + session.flush() + return info + + +class TestApplyTags: + def test_adds_new_tags(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset) + session.commit() + + result = apply_tags( + asset_info_id=info.id, + tags=["alpha", "beta"], + ) + + assert set(result["added"]) == {"alpha", "beta"} + assert result["already_present"] == [] + assert set(result["total_tags"]) == {"alpha", "beta"} + + def test_reports_already_present(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset) + ensure_tags_exist(session, ["existing"]) + add_tags_to_asset_info(session, asset_info_id=info.id, tags=["existing"]) + session.commit() + + result = apply_tags( + asset_info_id=info.id, + tags=["existing", "new"], + ) + + assert result["added"] == ["new"] + assert result["already_present"] == ["existing"] + + def test_raises_for_nonexistent_info(self, mock_create_session): + with pytest.raises(ValueError, match="not found"): + apply_tags(asset_info_id="nonexistent", tags=["x"]) + + def test_raises_for_wrong_owner(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset, owner_id="user1") + session.commit() + + with pytest.raises(PermissionError, match="not owner"): + apply_tags( + asset_info_id=info.id, + tags=["new"], + owner_id="user2", + ) + + +class TestRemoveTags: + def test_removes_tags(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset) + ensure_tags_exist(session, ["a", "b", "c"]) + add_tags_to_asset_info(session, asset_info_id=info.id, tags=["a", "b", "c"]) + session.commit() + + result = remove_tags( + asset_info_id=info.id, + tags=["a", "b"], + ) + + assert set(result["removed"]) == {"a", "b"} + assert result["not_present"] == [] + assert result["total_tags"] == ["c"] + + def test_reports_not_present(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset) + ensure_tags_exist(session, ["present"]) + add_tags_to_asset_info(session, asset_info_id=info.id, tags=["present"]) + session.commit() + + result = remove_tags( + asset_info_id=info.id, + tags=["present", "absent"], + ) + + assert result["removed"] == ["present"] + assert result["not_present"] == ["absent"] + + def test_raises_for_nonexistent_info(self, mock_create_session): + with pytest.raises(ValueError, match="not found"): + remove_tags(asset_info_id="nonexistent", tags=["x"]) + + def test_raises_for_wrong_owner(self, mock_create_session, session: Session): + asset = _make_asset(session) + info = _make_asset_info(session, asset, owner_id="user1") + session.commit() + + with pytest.raises(PermissionError, match="not owner"): + remove_tags( + asset_info_id=info.id, + tags=["x"], + owner_id="user2", + ) + + +class TestListTags: + def test_returns_tags_with_counts(self, mock_create_session, session: Session): + ensure_tags_exist(session, ["used", "unused"]) + asset = _make_asset(session) + info = _make_asset_info(session, asset) + add_tags_to_asset_info(session, asset_info_id=info.id, tags=["used"]) + session.commit() + + rows, total = list_tags() + + tag_dict = {name: count for name, _, count in rows} + assert tag_dict["used"] == 1 + assert tag_dict["unused"] == 0 + assert total == 2 + + def test_excludes_zero_counts(self, mock_create_session, session: Session): + ensure_tags_exist(session, ["used", "unused"]) + asset = _make_asset(session) + info = _make_asset_info(session, asset) + add_tags_to_asset_info(session, asset_info_id=info.id, tags=["used"]) + session.commit() + + rows, total = list_tags(include_zero=False) + + tag_names = {name for name, _, _ in rows} + assert "used" in tag_names + assert "unused" not in tag_names + + def test_prefix_filter(self, mock_create_session, session: Session): + ensure_tags_exist(session, ["alpha", "beta", "alphabet"]) + session.commit() + + rows, _ = list_tags(prefix="alph") + + tag_names = {name for name, _, _ in rows} + assert tag_names == {"alpha", "alphabet"} + + def test_order_by_name(self, mock_create_session, session: Session): + ensure_tags_exist(session, ["zebra", "alpha", "middle"]) + session.commit() + + rows, _ = list_tags(order="name_asc") + + names = [name for name, _, _ in rows] + assert names == ["alpha", "middle", "zebra"] + + def test_pagination(self, mock_create_session, session: Session): + ensure_tags_exist(session, ["a", "b", "c", "d", "e"]) + session.commit() + + rows, total = list_tags(limit=2, offset=1, order="name_asc") + + assert total == 5 + assert len(rows) == 2 + names = [name for name, _, _ in rows] + assert names == ["b", "c"] + + def test_clamps_limit(self, mock_create_session, session: Session): + ensure_tags_exist(session, ["a"]) + session.commit() + + # Service should clamp limit to max 1000 + rows, _ = list_tags(limit=2000) + assert len(rows) <= 1000