import logging import os from typing import Sequence from sqlalchemy import select from sqlalchemy.orm import Session from app.assets.database.models import Asset, AssetInfo, Tag from app.assets.database.queries import ( add_tags_to_asset_info, get_asset_by_hash, get_asset_tags, get_or_create_asset_info, list_cache_states_by_asset_id, remove_missing_tag_for_asset_id, set_asset_info_metadata, set_asset_info_tags, update_asset_info_timestamps, upsert_asset, upsert_cache_state, ) from app.assets.helpers import normalize_tags, select_best_live_path from app.assets.services.path_utils import compute_relative_filename from app.assets.services.schemas import ( IngestResult, RegisterAssetResult, UserMetadata, extract_asset_data, extract_info_data, ) from app.database.db import create_session def ingest_file_from_path( abs_path: str, asset_hash: str, size_bytes: int, mtime_ns: int, mime_type: str | None = None, info_name: str | None = None, owner_id: str = "", preview_id: str | None = None, user_metadata: UserMetadata = None, tags: Sequence[str] = (), tag_origin: str = "manual", require_existing_tags: bool = False, ) -> IngestResult: locator = os.path.abspath(abs_path) asset_created = False asset_updated = False state_created = False state_updated = False asset_info_id: str | None = None with create_session() as session: if preview_id: if not session.get(Asset, preview_id): preview_id = None asset, asset_created, asset_updated = upsert_asset( session, asset_hash=asset_hash, size_bytes=size_bytes, mime_type=mime_type, ) state_created, state_updated = upsert_cache_state( session, asset_id=asset.id, file_path=locator, mtime_ns=mtime_ns, ) if info_name: info, info_created = get_or_create_asset_info( session, asset_id=asset.id, owner_id=owner_id, name=info_name, preview_id=preview_id, ) if info_created: asset_info_id = info.id else: update_asset_info_timestamps(session, asset_info=info, preview_id=preview_id) asset_info_id = info.id norm = normalize_tags(list(tags)) if norm and asset_info_id: if require_existing_tags: _validate_tags_exist(session, norm) add_tags_to_asset_info( session, asset_info_id=asset_info_id, tags=norm, origin=tag_origin, create_if_missing=not require_existing_tags, ) if asset_info_id: _update_metadata_with_filename( session, asset_info_id=asset_info_id, asset_id=asset.id, info=info, user_metadata=user_metadata, ) try: remove_missing_tag_for_asset_id(session, asset_id=asset.id) except Exception: logging.exception("Failed to clear 'missing' tag for asset %s", asset.id) session.commit() return IngestResult( asset_created=asset_created, asset_updated=asset_updated, state_created=state_created, state_updated=state_updated, asset_info_id=asset_info_id, ) def register_existing_asset( asset_hash: str, name: str, user_metadata: UserMetadata = None, tags: list[str] | None = None, tag_origin: str = "manual", owner_id: str = "", ) -> RegisterAssetResult: with create_session() as session: asset = get_asset_by_hash(session, asset_hash=asset_hash) if not asset: raise ValueError(f"No asset with hash {asset_hash}") info, info_created = get_or_create_asset_info( session, asset_id=asset.id, owner_id=owner_id, name=name, preview_id=None, ) if not info_created: tag_names = get_asset_tags(session, asset_info_id=info.id) result = RegisterAssetResult( info=extract_info_data(info), asset=extract_asset_data(asset), tags=tag_names, created=False, ) session.commit() return result new_meta = dict(user_metadata or {}) computed_filename = _compute_filename_for_asset(session, asset.id) if computed_filename: new_meta["filename"] = computed_filename if new_meta: set_asset_info_metadata( session, asset_info_id=info.id, user_metadata=new_meta, ) if tags is not None: set_asset_info_tags( session, asset_info_id=info.id, tags=tags, origin=tag_origin, ) tag_names = get_asset_tags(session, asset_info_id=info.id) session.refresh(info) result = RegisterAssetResult( info=extract_info_data(info), asset=extract_asset_data(asset), tags=tag_names, created=True, ) session.commit() return result def _validate_tags_exist(session: Session, tags: list[str]) -> None: existing_tag_names = set( name for (name,) in session.execute(select(Tag.name).where(Tag.name.in_(tags))).all() ) missing = [t for t in tags if t not in existing_tag_names] if missing: raise ValueError(f"Unknown tags: {missing}") def _compute_filename_for_asset(session: Session, asset_id: str) -> str | None: primary_path = select_best_live_path(list_cache_states_by_asset_id(session, asset_id=asset_id)) return compute_relative_filename(primary_path) if primary_path else None def _update_metadata_with_filename( session: Session, asset_info_id: str, asset_id: str, info: AssetInfo, user_metadata: UserMetadata, ) -> None: computed_filename = _compute_filename_for_asset(session, asset_id) current_meta = info.user_metadata or {} new_meta = dict(current_meta) if user_metadata: for k, v in user_metadata.items(): new_meta[k] = v if computed_filename: new_meta["filename"] = computed_filename if new_meta != current_meta: set_asset_info_metadata( session, asset_info_id=asset_info_id, user_metadata=new_meta, )