diff --git a/app/assets/api/routes.py b/app/assets/api/routes.py index c2fc2a68b..4517c3ef5 100644 --- a/app/assets/api/routes.py +++ b/app/assets/api/routes.py @@ -7,18 +7,30 @@ from typing import Any from aiohttp import web from pydantic import ValidationError -import app.assets.manager as manager +import folder_paths from app import user_manager -from app.assets.api import schemas_in +from app.assets.api import schemas_in, schemas_out from app.assets.api.schemas_in import ( - AssetNotFoundError, AssetValidationError, - DependencyMissingError, - HashMismatchError, UploadError, ) from app.assets.api.upload import parse_multipart_upload from app.assets.scanner import seed_assets as scanner_seed_assets +from app.assets.services import ( + DependencyMissingError, + HashMismatchError, + apply_tags, + asset_exists, + create_from_hash, + delete_asset_reference, + get_asset_detail, + list_assets_page, + list_tags, + remove_tags, + resolve_asset_for_download, + update_asset_metadata, + upload_from_temp_path, +) ROUTES = web.RouteTableDef() USER_MANAGER: user_manager.UserManager | None = None @@ -54,6 +66,15 @@ def _build_validation_error_response(code: str, ve: ValidationError) -> web.Resp return _build_error_response(400, code, "Validation failed.", {"errors": ve.json()}) +def _validate_sort_field(requested: str | None) -> str: + if not requested: + return "created_at" + v = requested.lower() + if v in {"name", "created_at", "updated_at", "size", "last_access_time"}: + return v + return "created_at" + + @ROUTES.head("/api/assets/hash/{hash}") async def head_asset_by_hash(request: web.Request) -> web.Response: hash_str = request.match_info.get("hash", "").strip().lower() @@ -62,12 +83,12 @@ async def head_asset_by_hash(request: web.Request) -> web.Response: algo, digest = hash_str.split(":", 1) if algo != "blake3" or not digest or any(c for c in digest if c not in "0123456789abcdef"): return _build_error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") - exists = manager.asset_exists(asset_hash=hash_str) + exists = asset_exists(hash_str) return web.Response(status=200 if exists else 404) @ROUTES.get("/api/assets") -async def list_assets(request: web.Request) -> web.Response: +async def list_assets_route(request: web.Request) -> web.Response: """ GET request to list assets. """ @@ -77,31 +98,70 @@ async def list_assets(request: web.Request) -> web.Response: except ValidationError as ve: return _build_validation_error_response("INVALID_QUERY", ve) - payload = manager.list_assets( + sort = _validate_sort_field(q.sort) + order = "desc" if (q.order or "desc").lower() not in {"asc", "desc"} else q.order.lower() + + result = list_assets_page( + owner_id=USER_MANAGER.get_request_user_id(request), include_tags=q.include_tags, exclude_tags=q.exclude_tags, name_contains=q.name_contains, metadata_filter=q.metadata_filter, limit=q.limit, offset=q.offset, - sort=q.sort, - order=q.order, - owner_id=USER_MANAGER.get_request_user_id(request), + sort=sort, + order=order, + ) + + summaries = [ + schemas_out.AssetSummary( + id=item.info.id, + name=item.info.name, + asset_hash=item.asset.hash if item.asset else None, + size=int(item.asset.size_bytes) if item.asset and item.asset.size_bytes else None, + mime_type=item.asset.mime_type if item.asset else None, + tags=item.tags, + created_at=item.info.created_at, + updated_at=item.info.updated_at, + last_access_time=item.info.last_access_time, + ) + for item in result.items + ] + + payload = schemas_out.AssetsList( + assets=summaries, + total=result.total, + has_more=(q.offset + len(summaries)) < result.total, ) return web.json_response(payload.model_dump(mode="json", exclude_none=True)) @ROUTES.get(f"/api/assets/{{id:{UUID_RE}}}") -async def get_asset(request: web.Request) -> web.Response: +async def get_asset_route(request: web.Request) -> web.Response: """ GET request to get an asset's info as JSON. """ asset_info_id = str(uuid.UUID(request.match_info["id"])) try: - result = manager.get_asset( + result = get_asset_detail( asset_info_id=asset_info_id, owner_id=USER_MANAGER.get_request_user_id(request), ) + if not result: + return _build_error_response(404, "ASSET_NOT_FOUND", f"AssetInfo {asset_info_id} not found", {"id": asset_info_id}) + + payload = schemas_out.AssetDetail( + id=result.info.id, + name=result.info.name, + asset_hash=result.asset.hash if result.asset else None, + size=int(result.asset.size_bytes) if result.asset and result.asset.size_bytes is not None else None, + mime_type=result.asset.mime_type if result.asset else None, + tags=result.tags, + user_metadata=result.info.user_metadata or {}, + preview_id=result.info.preview_id, + created_at=result.info.created_at, + last_access_time=result.info.last_access_time, + ) except ValueError as e: return _build_error_response(404, "ASSET_NOT_FOUND", str(e), {"id": asset_info_id}) except Exception: @@ -111,21 +171,23 @@ async def get_asset(request: web.Request) -> web.Response: USER_MANAGER.get_request_user_id(request), ) return _build_error_response(500, "INTERNAL", "Unexpected server error.") - return web.json_response(result.model_dump(mode="json"), status=200) + return web.json_response(payload.model_dump(mode="json"), status=200) @ROUTES.get(f"/api/assets/{{id:{UUID_RE}}}/content") async def download_asset_content(request: web.Request) -> web.Response: - # question: do we need disposition? could we just stick with one of these? disposition = request.query.get("disposition", "attachment").lower().strip() if disposition not in {"inline", "attachment"}: disposition = "attachment" try: - abs_path, content_type, filename = manager.resolve_asset_content_for_download( + result = resolve_asset_for_download( asset_info_id=str(uuid.UUID(request.match_info["id"])), owner_id=USER_MANAGER.get_request_user_id(request), ) + abs_path = result.abs_path + content_type = result.content_type + filename = result.download_name except ValueError as ve: return _build_error_response(404, "ASSET_NOT_FOUND", str(ve)) except NotImplementedError as nie: @@ -166,7 +228,7 @@ async def download_asset_content(request: web.Request) -> web.Response: @ROUTES.post("/api/assets/from-hash") -async def create_asset_from_hash(request: web.Request) -> web.Response: +async def create_asset_from_hash_route(request: web.Request) -> web.Response: try: payload = await request.json() body = schemas_in.CreateFromHashBody.model_validate(payload) @@ -175,7 +237,7 @@ async def create_asset_from_hash(request: web.Request) -> web.Response: except Exception: return _build_error_response(400, "INVALID_JSON", "Request body must be valid JSON.") - result = manager.create_asset_from_hash( + result = create_from_hash( hash_str=body.hash, name=body.name, tags=body.tags, @@ -184,39 +246,119 @@ async def create_asset_from_hash(request: web.Request) -> web.Response: ) if result is None: return _build_error_response(404, "ASSET_NOT_FOUND", f"Asset content {body.hash} does not exist") - return web.json_response(result.model_dump(mode="json"), status=201) + + payload_out = schemas_out.AssetCreated( + id=result.info.id, + name=result.info.name, + asset_hash=result.asset.hash, + size=int(result.asset.size_bytes) if result.asset.size_bytes else None, + mime_type=result.asset.mime_type, + tags=result.tags, + user_metadata=result.info.user_metadata or {}, + preview_id=result.info.preview_id, + created_at=result.info.created_at, + last_access_time=result.info.last_access_time, + created_new=result.created_new, + ) + return web.json_response(payload_out.model_dump(mode="json"), status=201) + + +def _delete_temp_file_if_exists(path: str | None) -> None: + if path and os.path.exists(path): + try: + os.remove(path) + except Exception: + pass @ROUTES.post("/api/assets") async def upload_asset(request: web.Request) -> web.Response: """Multipart/form-data endpoint for Asset uploads.""" try: - parsed = await parse_multipart_upload(request, check_hash_exists=manager.asset_exists) + parsed = await parse_multipart_upload(request, check_hash_exists=asset_exists) except UploadError as e: return _build_error_response(e.status, e.code, e.message) owner_id = USER_MANAGER.get_request_user_id(request) try: - result = manager.process_upload(parsed=parsed, owner_id=owner_id) + spec = schemas_in.UploadAssetSpec.model_validate({ + "tags": parsed.tags_raw, + "name": parsed.provided_name, + "user_metadata": parsed.user_metadata_raw, + "hash": parsed.provided_hash, + }) + except ValidationError as ve: + _delete_temp_file_if_exists(parsed.tmp_path) + return _build_error_response(400, "INVALID_BODY", f"Validation failed: {ve.json()}") + + if spec.tags and spec.tags[0] == "models": + if len(spec.tags) < 2 or spec.tags[1] not in folder_paths.folder_names_and_paths: + _delete_temp_file_if_exists(parsed.tmp_path) + category = spec.tags[1] if len(spec.tags) >= 2 else "" + return _build_error_response(400, "INVALID_BODY", f"unknown models category '{category}'") + + try: + # Fast path: if a valid provided hash exists, create AssetInfo without writing anything + if spec.hash and parsed.provided_hash_exists is True: + result = create_from_hash( + hash_str=spec.hash, + name=spec.name or (spec.hash.split(":", 1)[1]), + tags=spec.tags, + user_metadata=spec.user_metadata or {}, + owner_id=owner_id, + ) + if result is None: + _delete_temp_file_if_exists(parsed.tmp_path) + return _build_error_response(404, "ASSET_NOT_FOUND", f"Asset content {spec.hash} does not exist") + _delete_temp_file_if_exists(parsed.tmp_path) + else: + # Otherwise, we must have a temp file path to ingest + if not parsed.tmp_path or not os.path.exists(parsed.tmp_path): + return _build_error_response(404, "ASSET_NOT_FOUND", "Provided hash not found and no file uploaded.") + + result = upload_from_temp_path( + temp_path=parsed.tmp_path, + name=spec.name, + tags=spec.tags, + user_metadata=spec.user_metadata or {}, + client_filename=parsed.file_client_name, + owner_id=owner_id, + expected_hash=spec.hash, + ) except AssetValidationError as e: + _delete_temp_file_if_exists(parsed.tmp_path) return _build_error_response(400, e.code, str(e)) - except AssetNotFoundError as e: - return _build_error_response(404, "ASSET_NOT_FOUND", str(e)) except HashMismatchError as e: + _delete_temp_file_if_exists(parsed.tmp_path) return _build_error_response(400, "HASH_MISMATCH", str(e)) except DependencyMissingError as e: + _delete_temp_file_if_exists(parsed.tmp_path) return _build_error_response(503, "DEPENDENCY_MISSING", e.message) except Exception: - logging.exception("process_upload failed for owner_id=%s", owner_id) + _delete_temp_file_if_exists(parsed.tmp_path) + logging.exception("upload_asset failed for owner_id=%s", owner_id) return _build_error_response(500, "INTERNAL", "Unexpected server error.") + payload = schemas_out.AssetCreated( + id=result.info.id, + name=result.info.name, + asset_hash=result.asset.hash, + size=int(result.asset.size_bytes) if result.asset.size_bytes else None, + mime_type=result.asset.mime_type, + tags=result.tags, + user_metadata=result.info.user_metadata or {}, + preview_id=result.info.preview_id, + created_at=result.info.created_at, + last_access_time=result.info.last_access_time, + created_new=result.created_new, + ) status = 201 if result.created_new else 200 - return web.json_response(result.model_dump(mode="json"), status=status) + return web.json_response(payload.model_dump(mode="json"), status=status) @ROUTES.put(f"/api/assets/{{id:{UUID_RE}}}") -async def update_asset(request: web.Request) -> web.Response: +async def update_asset_route(request: web.Request) -> web.Response: asset_info_id = str(uuid.UUID(request.match_info["id"])) try: body = schemas_in.UpdateAssetBody.model_validate(await request.json()) @@ -226,12 +368,20 @@ async def update_asset(request: web.Request) -> web.Response: return _build_error_response(400, "INVALID_JSON", "Request body must be valid JSON.") try: - result = manager.update_asset( + result = update_asset_metadata( asset_info_id=asset_info_id, name=body.name, user_metadata=body.user_metadata, owner_id=USER_MANAGER.get_request_user_id(request), ) + payload = schemas_out.AssetUpdated( + id=result.info.id, + name=result.info.name, + asset_hash=result.asset.hash if result.asset else None, + tags=result.tags, + user_metadata=result.info.user_metadata or {}, + updated_at=result.info.updated_at, + ) except (ValueError, PermissionError) as ve: return _build_error_response(404, "ASSET_NOT_FOUND", str(ve), {"id": asset_info_id}) except Exception: @@ -241,17 +391,17 @@ async def update_asset(request: web.Request) -> web.Response: USER_MANAGER.get_request_user_id(request), ) return _build_error_response(500, "INTERNAL", "Unexpected server error.") - return web.json_response(result.model_dump(mode="json"), status=200) + return web.json_response(payload.model_dump(mode="json"), status=200) @ROUTES.delete(f"/api/assets/{{id:{UUID_RE}}}") -async def delete_asset(request: web.Request) -> web.Response: +async def delete_asset_route(request: web.Request) -> web.Response: asset_info_id = str(uuid.UUID(request.match_info["id"])) - delete_content = request.query.get("delete_content") - delete_content = True if delete_content is None else delete_content.lower() not in {"0", "false", "no"} + delete_content_param = request.query.get("delete_content") + delete_content = True if delete_content_param is None else delete_content_param.lower() not in {"0", "false", "no"} try: - deleted = manager.delete_asset_reference( + deleted = delete_asset_reference( asset_info_id=asset_info_id, owner_id=USER_MANAGER.get_request_user_id(request), delete_content_if_orphan=delete_content, @@ -284,7 +434,7 @@ async def get_tags(request: web.Request) -> web.Response: status=400, ) - result = manager.list_tags( + rows, total = list_tags( prefix=query.prefix, limit=query.limit, offset=query.offset, @@ -292,27 +442,35 @@ async def get_tags(request: web.Request) -> web.Response: include_zero=query.include_zero, owner_id=USER_MANAGER.get_request_user_id(request), ) - return web.json_response(result.model_dump(mode="json")) + + tags = [schemas_out.TagUsage(name=name, count=count, type=tag_type) for (name, tag_type, count) in rows] + payload = schemas_out.TagsList(tags=tags, total=total, has_more=(query.offset + len(tags)) < total) + return web.json_response(payload.model_dump(mode="json")) @ROUTES.post(f"/api/assets/{{id:{UUID_RE}}}/tags") async def add_asset_tags(request: web.Request) -> web.Response: asset_info_id = str(uuid.UUID(request.match_info["id"])) try: - payload = await request.json() - data = schemas_in.TagsAdd.model_validate(payload) + json_payload = await request.json() + data = schemas_in.TagsAdd.model_validate(json_payload) except ValidationError as ve: return _build_error_response(400, "INVALID_BODY", "Invalid JSON body for tags add.", {"errors": ve.errors()}) except Exception: return _build_error_response(400, "INVALID_JSON", "Request body must be valid JSON.") try: - result = manager.add_tags_to_asset( + result = apply_tags( asset_info_id=asset_info_id, tags=data.tags, origin="manual", owner_id=USER_MANAGER.get_request_user_id(request), ) + payload = schemas_out.TagsAdd( + added=result.added, + already_present=result.already_present, + total_tags=result.total_tags, + ) except (ValueError, PermissionError) as ve: return _build_error_response(404, "ASSET_NOT_FOUND", str(ve), {"id": asset_info_id}) except Exception: @@ -323,26 +481,31 @@ async def add_asset_tags(request: web.Request) -> web.Response: ) return _build_error_response(500, "INTERNAL", "Unexpected server error.") - return web.json_response(result.model_dump(mode="json"), status=200) + return web.json_response(payload.model_dump(mode="json"), status=200) @ROUTES.delete(f"/api/assets/{{id:{UUID_RE}}}/tags") async def delete_asset_tags(request: web.Request) -> web.Response: asset_info_id = str(uuid.UUID(request.match_info["id"])) try: - payload = await request.json() - data = schemas_in.TagsRemove.model_validate(payload) + json_payload = await request.json() + data = schemas_in.TagsRemove.model_validate(json_payload) except ValidationError as ve: return _build_error_response(400, "INVALID_BODY", "Invalid JSON body for tags remove.", {"errors": ve.errors()}) except Exception: return _build_error_response(400, "INVALID_JSON", "Request body must be valid JSON.") try: - result = manager.remove_tags_from_asset( + result = remove_tags( asset_info_id=asset_info_id, tags=data.tags, owner_id=USER_MANAGER.get_request_user_id(request), ) + payload = schemas_out.TagsRemove( + removed=result.removed, + not_present=result.not_present, + total_tags=result.total_tags, + ) except ValueError as ve: return _build_error_response(404, "ASSET_NOT_FOUND", str(ve), {"id": asset_info_id}) except Exception: @@ -353,7 +516,7 @@ async def delete_asset_tags(request: web.Request) -> web.Response: ) return _build_error_response(500, "INTERNAL", "Unexpected server error.") - return web.json_response(result.model_dump(mode="json"), status=200) + return web.json_response(payload.model_dump(mode="json"), status=200) @ROUTES.post("/api/assets/seed") diff --git a/app/assets/manager.py b/app/assets/manager.py deleted file mode 100644 index 3d9416310..000000000 --- a/app/assets/manager.py +++ /dev/null @@ -1,526 +0,0 @@ -import contextlib -import mimetypes -import os -from typing import Sequence - -from pydantic import ValidationError - -import app.assets.services.hashing as hashing -import folder_paths -from app.assets.api import schemas_in, schemas_out -from app.assets.api.schemas_in import ( - AssetNotFoundError, - AssetValidationError, - DependencyMissingError, - HashMismatchError, - ParsedUpload, -) -from app.assets.api.upload import _delete_temp_file_if_exists -from app.assets.database.queries import ( - asset_exists_by_hash, - fetch_asset_info_and_asset, - get_asset_by_hash, - get_asset_tags, - list_asset_infos_page, - list_cache_states_by_asset_id, - update_asset_info_access_time, -) -from app.assets.helpers import select_best_live_path -from app.assets.services import ( - apply_tags, - delete_asset_reference as svc_delete_asset_reference, - get_asset_detail, - ingest_file_from_path, - list_tags as svc_list_tags, - register_existing_asset, - remove_tags, - set_asset_preview as svc_set_asset_preview, - update_asset_metadata, -) -from app.assets.services.path_utils import ( - resolve_destination_from_tags, - validate_path_within_base, -) -from app.database.db import create_session - - -def _validate_sort_field(requested: str | None) -> str: - if not requested: - return "created_at" - v = requested.lower() - if v in {"name", "created_at", "updated_at", "size", "last_access_time"}: - return v - return "created_at" - - -def _get_size_mtime_ns(path: str) -> tuple[int, int]: - st = os.stat(path, follow_symlinks=True) - return st.st_size, getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000)) - - -def _sanitize_filename(name: str | None, fallback: str) -> str: - n = os.path.basename((name or "").strip() or fallback) - if n: - return n - return fallback - - -def asset_exists(asset_hash: str) -> bool: - with create_session() as session: - return asset_exists_by_hash(session, asset_hash=asset_hash) - - -def list_assets( - include_tags: Sequence[str] | None = None, - exclude_tags: Sequence[str] | None = None, - name_contains: str | None = None, - metadata_filter: dict | None = None, - limit: int = 20, - offset: int = 0, - sort: str = "created_at", - order: str = "desc", - owner_id: str = "", -) -> schemas_out.AssetsList: - sort = _validate_sort_field(sort) - order = "desc" if (order or "desc").lower() not in {"asc", "desc"} else order.lower() - - with create_session() as session: - infos, tag_map, total = list_asset_infos_page( - session, - owner_id=owner_id, - include_tags=include_tags, - exclude_tags=exclude_tags, - name_contains=name_contains, - metadata_filter=metadata_filter, - limit=limit, - offset=offset, - sort=sort, - order=order, - ) - - summaries: list[schemas_out.AssetSummary] = [] - for info in infos: - asset = info.asset - tags = tag_map.get(info.id, []) - summaries.append( - schemas_out.AssetSummary( - id=info.id, - name=info.name, - asset_hash=asset.hash if asset else None, - size=int(asset.size_bytes) if asset else None, - mime_type=asset.mime_type if asset else None, - tags=tags, - created_at=info.created_at, - updated_at=info.updated_at, - last_access_time=info.last_access_time, - ) - ) - - return schemas_out.AssetsList( - assets=summaries, - total=total, - has_more=(offset + len(summaries)) < total, - ) - - -def get_asset( - asset_info_id: str, - owner_id: str = "", -) -> schemas_out.AssetDetail: - result = get_asset_detail(asset_info_id=asset_info_id, owner_id=owner_id) - if not result: - raise ValueError(f"AssetInfo {asset_info_id} not found") - - info = result.info - asset = result.asset - - return schemas_out.AssetDetail( - id=info.id, - name=info.name, - asset_hash=asset.hash if asset else None, - size=int(asset.size_bytes) if asset and asset.size_bytes is not None else None, - mime_type=asset.mime_type if asset else None, - tags=result.tags, - user_metadata=info.user_metadata or {}, - preview_id=info.preview_id, - created_at=info.created_at, - last_access_time=info.last_access_time, - ) - - -def resolve_asset_content_for_download( - asset_info_id: str, - owner_id: str = "", -) -> tuple[str, str, str]: - with create_session() as session: - pair = fetch_asset_info_and_asset(session, asset_info_id=asset_info_id, owner_id=owner_id) - if not pair: - raise ValueError(f"AssetInfo {asset_info_id} not found") - - info, asset = pair - states = list_cache_states_by_asset_id(session, asset_id=asset.id) - abs_path = select_best_live_path(states) - if not abs_path: - raise FileNotFoundError - - update_asset_info_access_time(session, asset_info_id=asset_info_id) - session.commit() - - ctype = asset.mime_type or mimetypes.guess_type(info.name or abs_path)[0] or "application/octet-stream" - download_name = info.name or os.path.basename(abs_path) - return abs_path, ctype, download_name - - -def upload_asset_from_temp_path( - spec: schemas_in.UploadAssetSpec, - temp_path: str, - client_filename: str | None = None, - owner_id: str = "", - expected_asset_hash: str | None = None, -) -> schemas_out.AssetCreated: - try: - digest = hashing.compute_blake3_hash(temp_path) - except ImportError as e: - raise DependencyMissingError(str(e)) - except Exception as e: - raise RuntimeError(f"failed to hash uploaded file: {e}") - asset_hash = "blake3:" + digest - - if expected_asset_hash and asset_hash != expected_asset_hash.strip().lower(): - raise ValueError("HASH_MISMATCH") - - with create_session() as session: - existing = get_asset_by_hash(session, asset_hash=asset_hash) - - if existing is not None: - with contextlib.suppress(Exception): - if temp_path and os.path.exists(temp_path): - os.remove(temp_path) - - display_name = _sanitize_filename(spec.name or (client_filename or ""), fallback=digest) - result = register_existing_asset( - asset_hash=asset_hash, - name=display_name, - user_metadata=spec.user_metadata or {}, - tags=spec.tags or [], - tag_origin="manual", - owner_id=owner_id, - ) - info = result.info - asset = result.asset - - return schemas_out.AssetCreated( - id=info.id, - name=info.name, - asset_hash=asset.hash, - size=int(asset.size_bytes) if asset.size_bytes is not None else None, - mime_type=asset.mime_type, - tags=result.tags, - user_metadata=info.user_metadata or {}, - preview_id=info.preview_id, - created_at=info.created_at, - last_access_time=info.last_access_time, - created_new=False, - ) - - base_dir, subdirs = resolve_destination_from_tags(spec.tags) - dest_dir = os.path.join(base_dir, *subdirs) if subdirs else base_dir - os.makedirs(dest_dir, exist_ok=True) - - src_for_ext = (client_filename or spec.name or "").strip() - _ext = os.path.splitext(os.path.basename(src_for_ext))[1] if src_for_ext else "" - ext = _ext if 0 < len(_ext) <= 16 else "" - hashed_basename = f"{digest}{ext}" - dest_abs = os.path.abspath(os.path.join(dest_dir, hashed_basename)) - validate_path_within_base(dest_abs, base_dir) - - content_type = ( - mimetypes.guess_type(os.path.basename(src_for_ext), strict=False)[0] - or mimetypes.guess_type(hashed_basename, strict=False)[0] - or "application/octet-stream" - ) - - try: - os.replace(temp_path, dest_abs) - except Exception as e: - raise RuntimeError(f"failed to move uploaded file into place: {e}") - - try: - size_bytes, mtime_ns = _get_size_mtime_ns(dest_abs) - except OSError as e: - raise RuntimeError(f"failed to stat destination file: {e}") - - result = ingest_file_from_path( - asset_hash=asset_hash, - abs_path=dest_abs, - size_bytes=size_bytes, - mtime_ns=mtime_ns, - mime_type=content_type, - info_name=_sanitize_filename(spec.name or (client_filename or ""), fallback=digest), - owner_id=owner_id, - preview_id=None, - user_metadata=spec.user_metadata or {}, - tags=spec.tags, - tag_origin="manual", - require_existing_tags=False, - ) - info_id = result.asset_info_id - if not info_id: - raise RuntimeError("failed to create asset metadata") - - with create_session() as session: - pair = fetch_asset_info_and_asset(session, asset_info_id=info_id, owner_id=owner_id) - if not pair: - raise RuntimeError("inconsistent DB state after ingest") - info, asset = pair - tag_names = get_asset_tags(session, asset_info_id=info.id) - - return schemas_out.AssetCreated( - id=info.id, - name=info.name, - asset_hash=asset.hash, - size=int(asset.size_bytes), - mime_type=asset.mime_type, - tags=tag_names, - user_metadata=info.user_metadata or {}, - preview_id=info.preview_id, - created_at=info.created_at, - last_access_time=info.last_access_time, - created_new=result.asset_created, - ) - - -def process_upload( - parsed: ParsedUpload, - owner_id: str = "", -) -> schemas_out.AssetCreated: - """ - Process a parsed multipart upload. - - Args: - parsed: The parsed upload data from parse_multipart_upload - owner_id: The owner ID for the asset - - Returns: - AssetCreated response (check created_new to determine if new asset was created) - - Raises: - UploadError: On validation or processing errors - """ - try: - spec = schemas_in.UploadAssetSpec.model_validate({ - "tags": parsed.tags_raw, - "name": parsed.provided_name, - "user_metadata": parsed.user_metadata_raw, - "hash": parsed.provided_hash, - }) - except ValidationError as ve: - _delete_temp_file_if_exists(parsed.tmp_path) - raise AssetValidationError("INVALID_BODY", f"Validation failed: {ve.json()}") - - if spec.tags and spec.tags[0] == "models": - if len(spec.tags) < 2 or spec.tags[1] not in folder_paths.folder_names_and_paths: - _delete_temp_file_if_exists(parsed.tmp_path) - category = spec.tags[1] if len(spec.tags) >= 2 else "" - raise AssetValidationError("INVALID_BODY", f"unknown models category '{category}'") - - # Fast path: if a valid provided hash exists, create AssetInfo without writing anything - if spec.hash and parsed.provided_hash_exists is True: - result = create_asset_from_hash( - hash_str=spec.hash, - name=spec.name or (spec.hash.split(":", 1)[1]), - tags=spec.tags, - user_metadata=spec.user_metadata or {}, - owner_id=owner_id, - ) - - if result is None: - raise AssetNotFoundError(f"Asset content {spec.hash} does not exist") - - # Drain temp if we accidentally saved (e.g., hash field came after file) - _delete_temp_file_if_exists(parsed.tmp_path) - return result - - # Otherwise, we must have a temp file path to ingest - if not parsed.tmp_path or not os.path.exists(parsed.tmp_path): - raise AssetNotFoundError("Provided hash not found and no file uploaded.") - - try: - return upload_asset_from_temp_path( - spec, - temp_path=parsed.tmp_path, - client_filename=parsed.file_client_name, - owner_id=owner_id, - expected_asset_hash=spec.hash, - ) - except ValueError as e: - _delete_temp_file_if_exists(parsed.tmp_path) - msg = str(e) - if "HASH_MISMATCH" in msg or msg.strip().upper() == "HASH_MISMATCH": - raise HashMismatchError("Uploaded file hash does not match provided hash.") - raise AssetValidationError("BAD_REQUEST", "Invalid inputs.") - except Exception: - _delete_temp_file_if_exists(parsed.tmp_path) - raise - - -def update_asset( - asset_info_id: str, - name: str | None = None, - tags: list[str] | None = None, - user_metadata: dict | None = None, - owner_id: str = "", -) -> schemas_out.AssetUpdated: - result = update_asset_metadata( - asset_info_id=asset_info_id, - name=name, - tags=tags, - user_metadata=user_metadata, - tag_origin="manual", - owner_id=owner_id, - ) - info = result.info - asset = result.asset - - return schemas_out.AssetUpdated( - id=info.id, - name=info.name, - asset_hash=asset.hash if asset else None, - tags=result.tags, - user_metadata=info.user_metadata or {}, - updated_at=info.updated_at, - ) - - -def set_asset_preview( - asset_info_id: str, - preview_asset_id: str | None = None, - owner_id: str = "", -) -> schemas_out.AssetDetail: - result = svc_set_asset_preview( - asset_info_id=asset_info_id, - preview_asset_id=preview_asset_id, - owner_id=owner_id, - ) - info = result.info - asset = result.asset - - return schemas_out.AssetDetail( - id=info.id, - name=info.name, - asset_hash=asset.hash if asset else None, - size=int(asset.size_bytes) if asset and asset.size_bytes is not None else None, - mime_type=asset.mime_type if asset else None, - tags=result.tags, - user_metadata=info.user_metadata or {}, - preview_id=info.preview_id, - created_at=info.created_at, - last_access_time=info.last_access_time, - ) - - -def delete_asset_reference(asset_info_id: str, owner_id: str, delete_content_if_orphan: bool = True) -> bool: - return svc_delete_asset_reference( - asset_info_id=asset_info_id, - owner_id=owner_id, - delete_content_if_orphan=delete_content_if_orphan, - ) - - -def create_asset_from_hash( - hash_str: str, - name: str, - tags: list[str] | None = None, - user_metadata: dict | None = None, - owner_id: str = "", -) -> schemas_out.AssetCreated | None: - canonical = hash_str.strip().lower() - - with create_session() as session: - asset = get_asset_by_hash(session, asset_hash=canonical) - if not asset: - return None - - result = register_existing_asset( - asset_hash=canonical, - name=_sanitize_filename(name, fallback=canonical.split(":", 1)[1] if ":" in canonical else canonical), - user_metadata=user_metadata or {}, - tags=tags or [], - tag_origin="manual", - owner_id=owner_id, - ) - info = result.info - asset = result.asset - - # created_new indicates whether new CONTENT was created, not whether a new AssetInfo was created. - # Since we're referencing an existing hash, the content already exists. - return schemas_out.AssetCreated( - id=info.id, - name=info.name, - asset_hash=asset.hash, - size=int(asset.size_bytes), - mime_type=asset.mime_type, - tags=result.tags, - user_metadata=info.user_metadata or {}, - preview_id=info.preview_id, - created_at=info.created_at, - last_access_time=info.last_access_time, - created_new=False, - ) - - -def add_tags_to_asset( - asset_info_id: str, - tags: list[str], - origin: str = "manual", - owner_id: str = "", -) -> schemas_out.TagsAdd: - result = apply_tags( - asset_info_id=asset_info_id, - tags=tags, - origin=origin, - owner_id=owner_id, - ) - return schemas_out.TagsAdd( - added=result.added, - already_present=result.already_present, - total_tags=result.total_tags, - ) - - -def remove_tags_from_asset( - asset_info_id: str, - tags: list[str], - owner_id: str = "", -) -> schemas_out.TagsRemove: - result = remove_tags( - asset_info_id=asset_info_id, - tags=tags, - owner_id=owner_id, - ) - return schemas_out.TagsRemove( - removed=result.removed, - not_present=result.not_present, - total_tags=result.total_tags, - ) - - -def list_tags( - prefix: str | None = None, - limit: int = 100, - offset: int = 0, - order: str = "count_desc", - include_zero: bool = True, - owner_id: str = "", -) -> schemas_out.TagsList: - rows, total = svc_list_tags( - prefix=prefix, - limit=limit, - offset=offset, - order=order, - include_zero=include_zero, - owner_id=owner_id, - ) - - tags = [schemas_out.TagUsage(name=name, count=count, type=tag_type) for (name, tag_type, count) in rows] - return schemas_out.TagsList(tags=tags, total=total, has_more=(offset + len(tags)) < total) diff --git a/app/assets/services/__init__.py b/app/assets/services/__init__.py index 7e4758a5f..5b1f8f1ab 100644 --- a/app/assets/services/__init__.py +++ b/app/assets/services/__init__.py @@ -1,23 +1,36 @@ from app.assets.services.asset_management import ( + asset_exists, delete_asset_reference, + get_asset_by_hash, get_asset_detail, + get_asset_info_with_tags, + list_assets_page, + resolve_asset_for_download, set_asset_preview, update_asset_metadata, ) from app.assets.services.ingest import ( + DependencyMissingError, + HashMismatchError, + create_from_hash, ingest_file_from_path, register_existing_asset, + upload_from_temp_path, ) from app.assets.services.schemas import ( AddTagsResult, AssetData, AssetDetailResult, AssetInfoData, + AssetSummaryData, + DownloadResolutionResult, IngestResult, + ListAssetsResult, RegisterAssetResult, RemoveTagsResult, SetTagsResult, TagUsage, + UploadResult, UserMetadata, ) from app.assets.services.tagging import ( @@ -29,7 +42,16 @@ from app.assets.services.tagging import ( __all__ = [ "ingest_file_from_path", "register_existing_asset", + "upload_from_temp_path", + "create_from_hash", + "HashMismatchError", + "DependencyMissingError", + "asset_exists", + "get_asset_by_hash", "get_asset_detail", + "get_asset_info_with_tags", + "list_assets_page", + "resolve_asset_for_download", "update_asset_metadata", "delete_asset_reference", "set_asset_preview", @@ -40,10 +62,14 @@ __all__ = [ "AssetData", "AssetDetailResult", "AssetInfoData", + "AssetSummaryData", + "DownloadResolutionResult", "IngestResult", + "ListAssetsResult", "RegisterAssetResult", "RemoveTagsResult", "SetTagsResult", "TagUsage", + "UploadResult", "UserMetadata", ] diff --git a/app/assets/services/asset_management.py b/app/assets/services/asset_management.py index e42cc728f..97bea19b4 100644 --- a/app/assets/services/asset_management.py +++ b/app/assets/services/asset_management.py @@ -1,4 +1,5 @@ import contextlib +import mimetypes import os from typing import Sequence @@ -6,21 +7,31 @@ from sqlalchemy.orm import Session from app.assets.database.models import Asset from app.assets.database.queries import ( + asset_exists_by_hash, asset_info_exists_for_asset_id, delete_asset_info_by_id, + fetch_asset_info_and_asset, fetch_asset_info_asset_and_tags, + get_asset_by_hash as queries_get_asset_by_hash, get_asset_info_by_id, + get_asset_tags, + list_asset_infos_page, list_cache_states_by_asset_id, set_asset_info_metadata, set_asset_info_preview, set_asset_info_tags, + update_asset_info_access_time, update_asset_info_name, update_asset_info_updated_at, ) from app.assets.helpers import select_best_live_path from app.assets.services.path_utils import compute_relative_filename from app.assets.services.schemas import ( + AssetData, AssetDetailResult, + AssetSummaryData, + DownloadResolutionResult, + ListAssetsResult, UserMetadata, extract_asset_data, extract_info_data, @@ -198,3 +209,96 @@ def set_asset_preview( def _compute_filename_for_asset(session: Session, asset_id: str) -> str | None: primary_path = select_best_live_path(list_cache_states_by_asset_id(session, asset_id=asset_id)) return compute_relative_filename(primary_path) if primary_path else None + + +def asset_exists(asset_hash: str) -> bool: + with create_session() as session: + return asset_exists_by_hash(session, asset_hash=asset_hash) + + +def get_asset_by_hash(asset_hash: str) -> AssetData | None: + with create_session() as session: + asset = queries_get_asset_by_hash(session, asset_hash=asset_hash) + return extract_asset_data(asset) + + +def list_assets_page( + owner_id: str = "", + include_tags: Sequence[str] | None = None, + exclude_tags: Sequence[str] | None = None, + name_contains: str | None = None, + metadata_filter: dict | None = None, + limit: int = 20, + offset: int = 0, + sort: str = "created_at", + order: str = "desc", +) -> ListAssetsResult: + with create_session() as session: + infos, tag_map, total = list_asset_infos_page( + session, + owner_id=owner_id, + include_tags=include_tags, + exclude_tags=exclude_tags, + name_contains=name_contains, + metadata_filter=metadata_filter, + limit=limit, + offset=offset, + sort=sort, + order=order, + ) + + items: list[AssetSummaryData] = [] + for info in infos: + items.append( + AssetSummaryData( + info=extract_info_data(info), + asset=extract_asset_data(info.asset), + tags=tag_map.get(info.id, []), + ) + ) + + return ListAssetsResult(items=items, total=total) + + +def resolve_asset_for_download( + asset_info_id: str, + owner_id: str = "", +) -> DownloadResolutionResult: + with create_session() as session: + pair = fetch_asset_info_and_asset(session, asset_info_id=asset_info_id, owner_id=owner_id) + if not pair: + raise ValueError(f"AssetInfo {asset_info_id} not found") + + info, asset = pair + states = list_cache_states_by_asset_id(session, asset_id=asset.id) + abs_path = select_best_live_path(states) + if not abs_path: + raise FileNotFoundError + + update_asset_info_access_time(session, asset_info_id=asset_info_id) + session.commit() + + ctype = asset.mime_type or mimetypes.guess_type(info.name or abs_path)[0] or "application/octet-stream" + download_name = info.name or os.path.basename(abs_path) + return DownloadResolutionResult( + abs_path=abs_path, + content_type=ctype, + download_name=download_name, + ) + + +def get_asset_info_with_tags( + asset_info_id: str, + owner_id: str = "", +) -> AssetDetailResult | None: + with create_session() as session: + pair = fetch_asset_info_and_asset(session, asset_info_id=asset_info_id, owner_id=owner_id) + if not pair: + return None + info, asset = pair + tags = get_asset_tags(session, asset_info_id=asset_info_id) + return AssetDetailResult( + info=extract_info_data(info), + asset=extract_asset_data(asset), + tags=tags, + ) diff --git a/app/assets/services/ingest.py b/app/assets/services/ingest.py index 745f27704..f6963b44b 100644 --- a/app/assets/services/ingest.py +++ b/app/assets/services/ingest.py @@ -1,13 +1,17 @@ +import contextlib import logging +import mimetypes import os from typing import Sequence from sqlalchemy import select from sqlalchemy.orm import Session +import app.assets.services.hashing as hashing from app.assets.database.models import Asset, AssetInfo, Tag from app.assets.database.queries import ( add_tags_to_asset_info, + fetch_asset_info_and_asset, get_asset_by_hash, get_asset_tags, get_or_create_asset_info, @@ -20,10 +24,15 @@ from app.assets.database.queries import ( upsert_cache_state, ) from app.assets.helpers import normalize_tags, select_best_live_path -from app.assets.services.path_utils import compute_relative_filename +from app.assets.services.path_utils import ( + compute_relative_filename, + resolve_destination_from_tags, + validate_path_within_base, +) from app.assets.services.schemas import ( IngestResult, RegisterAssetResult, + UploadResult, UserMetadata, extract_asset_data, extract_info_data, @@ -225,3 +234,158 @@ def _update_metadata_with_filename( asset_info_id=asset_info_id, user_metadata=new_meta, ) + + +def _get_size_mtime_ns(path: str) -> tuple[int, int]: + st = os.stat(path, follow_symlinks=True) + return st.st_size, getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000)) + + +def _sanitize_filename(name: str | None, fallback: str) -> str: + n = os.path.basename((name or "").strip() or fallback) + return n if n else fallback + + +class HashMismatchError(Exception): + pass + + +class DependencyMissingError(Exception): + def __init__(self, message: str): + self.message = message + super().__init__(message) + + +def upload_from_temp_path( + temp_path: str, + name: str | None = None, + tags: list[str] | None = None, + user_metadata: dict | None = None, + client_filename: str | None = None, + owner_id: str = "", + expected_hash: str | None = None, +) -> UploadResult: + try: + digest = hashing.compute_blake3_hash(temp_path) + except ImportError as e: + raise DependencyMissingError(str(e)) + except Exception as e: + raise RuntimeError(f"failed to hash uploaded file: {e}") + asset_hash = "blake3:" + digest + + if expected_hash and asset_hash != expected_hash.strip().lower(): + raise HashMismatchError("Uploaded file hash does not match provided hash.") + + with create_session() as session: + existing = get_asset_by_hash(session, asset_hash=asset_hash) + + if existing is not None: + with contextlib.suppress(Exception): + if temp_path and os.path.exists(temp_path): + os.remove(temp_path) + + display_name = _sanitize_filename(name or client_filename, fallback=digest) + result = register_existing_asset( + asset_hash=asset_hash, + name=display_name, + user_metadata=user_metadata or {}, + tags=tags or [], + tag_origin="manual", + owner_id=owner_id, + ) + return UploadResult( + info=result.info, + asset=result.asset, + tags=result.tags, + created_new=False, + ) + + base_dir, subdirs = resolve_destination_from_tags(tags) + dest_dir = os.path.join(base_dir, *subdirs) if subdirs else base_dir + os.makedirs(dest_dir, exist_ok=True) + + src_for_ext = (client_filename or name or "").strip() + _ext = os.path.splitext(os.path.basename(src_for_ext))[1] if src_for_ext else "" + ext = _ext if 0 < len(_ext) <= 16 else "" + hashed_basename = f"{digest}{ext}" + dest_abs = os.path.abspath(os.path.join(dest_dir, hashed_basename)) + validate_path_within_base(dest_abs, base_dir) + + content_type = ( + mimetypes.guess_type(os.path.basename(src_for_ext), strict=False)[0] + or mimetypes.guess_type(hashed_basename, strict=False)[0] + or "application/octet-stream" + ) + + try: + os.replace(temp_path, dest_abs) + except Exception as e: + raise RuntimeError(f"failed to move uploaded file into place: {e}") + + try: + size_bytes, mtime_ns = _get_size_mtime_ns(dest_abs) + except OSError as e: + raise RuntimeError(f"failed to stat destination file: {e}") + + ingest_result = ingest_file_from_path( + asset_hash=asset_hash, + abs_path=dest_abs, + size_bytes=size_bytes, + mtime_ns=mtime_ns, + mime_type=content_type, + info_name=_sanitize_filename(name or client_filename, fallback=digest), + owner_id=owner_id, + preview_id=None, + user_metadata=user_metadata or {}, + tags=tags, + tag_origin="manual", + require_existing_tags=False, + ) + info_id = ingest_result.asset_info_id + if not info_id: + raise RuntimeError("failed to create asset metadata") + + with create_session() as session: + pair = fetch_asset_info_and_asset(session, asset_info_id=info_id, owner_id=owner_id) + if not pair: + raise RuntimeError("inconsistent DB state after ingest") + info, asset = pair + tag_names = get_asset_tags(session, asset_info_id=info.id) + + return UploadResult( + info=extract_info_data(info), + asset=extract_asset_data(asset), + tags=tag_names, + created_new=ingest_result.asset_created, + ) + + +def create_from_hash( + hash_str: str, + name: str, + tags: list[str] | None = None, + user_metadata: dict | None = None, + owner_id: str = "", +) -> UploadResult | None: + canonical = hash_str.strip().lower() + + with create_session() as session: + asset = get_asset_by_hash(session, asset_hash=canonical) + if not asset: + return None + + result = register_existing_asset( + asset_hash=canonical, + name=_sanitize_filename(name, fallback=canonical.split(":", 1)[1] if ":" in canonical else canonical), + user_metadata=user_metadata or {}, + tags=tags or [], + tag_origin="manual", + owner_id=owner_id, + ) + + return UploadResult( + info=result.info, + asset=result.asset, + tags=result.tags, + created_new=False, + ) diff --git a/app/assets/services/schemas.py b/app/assets/services/schemas.py index 8727f5732..10ebc2031 100644 --- a/app/assets/services/schemas.py +++ b/app/assets/services/schemas.py @@ -76,6 +76,34 @@ class TagUsage(NamedTuple): count: int +@dataclass(frozen=True) +class AssetSummaryData: + info: AssetInfoData + asset: AssetData | None + tags: list[str] + + +@dataclass(frozen=True) +class ListAssetsResult: + items: list[AssetSummaryData] + total: int + + +@dataclass(frozen=True) +class DownloadResolutionResult: + abs_path: str + content_type: str + download_name: str + + +@dataclass(frozen=True) +class UploadResult: + info: AssetInfoData + asset: AssetData + tags: list[str] + created_new: bool + + def extract_info_data(info: AssetInfo) -> AssetInfoData: return AssetInfoData( id=info.id,