diff --git a/app/assets/api/routes.py b/app/assets/api/routes.py index 3e537404a..7c27cdfea 100644 --- a/app/assets/api/routes.py +++ b/app/assets/api/routes.py @@ -10,6 +10,13 @@ from pydantic import ValidationError import app.assets.manager as manager from app import user_manager from app.assets.api import schemas_in +from app.assets.api.schemas_in import ( + AssetNotFoundError, + AssetValidationError, + HashMismatchError, + UploadError, +) +from app.assets.api.upload import parse_multipart_upload from app.assets.services.scanner import seed_assets from typing import Any @@ -185,183 +192,28 @@ async def create_asset_from_hash(request: web.Request) -> web.Response: @ROUTES.post("/api/assets") async def upload_asset(request: web.Request) -> web.Response: """Multipart/form-data endpoint for Asset uploads.""" - if not (request.content_type or "").lower().startswith("multipart/"): - return _error_response(415, "UNSUPPORTED_MEDIA_TYPE", "Use multipart/form-data for uploads.") - - reader = await request.multipart() - - file_present = False - file_client_name: str | None = None - tags_raw: list[str] = [] - provided_name: str | None = None - user_metadata_raw: str | None = None - provided_hash: str | None = None - provided_hash_exists: bool | None = None - - file_written = 0 - tmp_path: str | None = None - while True: - field = await reader.next() - if field is None: - break - - fname = getattr(field, "name", "") or "" - - if fname == "hash": - try: - s = ((await field.text()) or "").strip().lower() - except Exception: - return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") - - if s: - if ":" not in s: - return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") - algo, digest = s.split(":", 1) - if algo != "blake3" or not digest or any(c for c in digest if c not in "0123456789abcdef"): - return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") - provided_hash = f"{algo}:{digest}" - try: - provided_hash_exists = manager.asset_exists(asset_hash=provided_hash) - except Exception: - provided_hash_exists = None # do not fail the whole request here - - elif fname == "file": - file_present = True - file_client_name = (field.filename or "").strip() - - if provided_hash and provided_hash_exists is True: - # If client supplied a hash that we know exists, drain but do not write to disk - try: - while True: - chunk = await field.read_chunk(8 * 1024 * 1024) - if not chunk: - break - file_written += len(chunk) - except Exception: - return _error_response(500, "UPLOAD_IO_ERROR", "Failed to receive uploaded file.") - continue # Do not create temp file; we will create AssetInfo from the existing content - - # Otherwise, store to temp for hashing/ingest - uploads_root = os.path.join(folder_paths.get_temp_directory(), "uploads") - unique_dir = os.path.join(uploads_root, uuid.uuid4().hex) - os.makedirs(unique_dir, exist_ok=True) - tmp_path = os.path.join(unique_dir, ".upload.part") - - try: - with open(tmp_path, "wb") as f: - while True: - chunk = await field.read_chunk(8 * 1024 * 1024) - if not chunk: - break - f.write(chunk) - file_written += len(chunk) - except Exception: - try: - if os.path.exists(tmp_path or ""): - os.remove(tmp_path) - finally: - return _error_response(500, "UPLOAD_IO_ERROR", "Failed to receive and store uploaded file.") - elif fname == "tags": - tags_raw.append((await field.text()) or "") - elif fname == "name": - provided_name = (await field.text()) or None - elif fname == "user_metadata": - user_metadata_raw = (await field.text()) or None - - # If client did not send file, and we are not doing a from-hash fast path -> error - if not file_present and not (provided_hash and provided_hash_exists): - return _error_response(400, "MISSING_FILE", "Form must include a 'file' part or a known 'hash'.") - - if file_present and file_written == 0 and not (provided_hash and provided_hash_exists): - # Empty upload is only acceptable if we are fast-pathing from existing hash - try: - if tmp_path and os.path.exists(tmp_path): - os.remove(tmp_path) - finally: - return _error_response(400, "EMPTY_UPLOAD", "Uploaded file is empty.") - try: - spec = schemas_in.UploadAssetSpec.model_validate({ - "tags": tags_raw, - "name": provided_name, - "user_metadata": user_metadata_raw, - "hash": provided_hash, - }) - except ValidationError as ve: - try: - if tmp_path and os.path.exists(tmp_path): - os.remove(tmp_path) - finally: - return _validation_error_response("INVALID_BODY", ve) - - # Validate models category against configured folders (consistent with previous behavior) - if spec.tags and spec.tags[0] == "models": - if len(spec.tags) < 2 or spec.tags[1] not in folder_paths.folder_names_and_paths: - if tmp_path and os.path.exists(tmp_path): - os.remove(tmp_path) - return _error_response( - 400, "INVALID_BODY", f"unknown models category '{spec.tags[1] if len(spec.tags) >= 2 else ''}'" - ) + parsed = await parse_multipart_upload(request, check_hash_exists=manager.asset_exists) + except UploadError as e: + return _error_response(e.status, e.code, e.message) owner_id = USER_MANAGER.get_request_user_id(request) - # Fast path: if a valid provided hash exists, create AssetInfo without writing anything - if spec.hash and provided_hash_exists is True: - try: - result = manager.create_asset_from_hash( - hash_str=spec.hash, - name=spec.name or (spec.hash.split(":", 1)[1]), - tags=spec.tags, - user_metadata=spec.user_metadata or {}, - owner_id=owner_id, - ) - except Exception: - logging.exception("create_asset_from_hash failed for hash=%s, owner_id=%s", spec.hash, owner_id) - return _error_response(500, "INTERNAL", "Unexpected server error.") - - if result is None: - return _error_response(404, "ASSET_NOT_FOUND", f"Asset content {spec.hash} does not exist") - - # Drain temp if we accidentally saved (e.g., hash field came after file) - if tmp_path and os.path.exists(tmp_path): - with contextlib.suppress(Exception): - os.remove(tmp_path) - - status = 200 if (not result.created_new) else 201 - return web.json_response(result.model_dump(mode="json"), status=status) - - # Otherwise, we must have a temp file path to ingest - if not tmp_path or not os.path.exists(tmp_path): - # The only case we reach here without a temp file is: client sent a hash that does not exist and no file - return _error_response(404, "ASSET_NOT_FOUND", "Provided hash not found and no file uploaded.") - try: - created = manager.upload_asset_from_temp_path( - spec, - temp_path=tmp_path, - client_filename=file_client_name, - owner_id=owner_id, - expected_asset_hash=spec.hash, - ) - status = 201 if created.created_new else 200 - return web.json_response(created.model_dump(mode="json"), status=status) - except ValueError as e: - if tmp_path and os.path.exists(tmp_path): - os.remove(tmp_path) - msg = str(e) - if "HASH_MISMATCH" in msg or msg.strip().upper() == "HASH_MISMATCH": - return _error_response( - 400, - "HASH_MISMATCH", - "Uploaded file hash does not match provided hash.", - ) - return _error_response(400, "BAD_REQUEST", "Invalid inputs.") + result = manager.process_upload(parsed=parsed, owner_id=owner_id) + except AssetValidationError as e: + return _error_response(400, e.code, str(e)) + except AssetNotFoundError as e: + return _error_response(404, "ASSET_NOT_FOUND", str(e)) + except HashMismatchError as e: + return _error_response(400, "HASH_MISMATCH", str(e)) except Exception: - if tmp_path and os.path.exists(tmp_path): - os.remove(tmp_path) - logging.exception("upload_asset_from_temp_path failed for tmp_path=%s, owner_id=%s", tmp_path, owner_id) + logging.exception("process_upload failed for owner_id=%s", owner_id) return _error_response(500, "INTERNAL", "Unexpected server error.") + status = 201 if result.created_new else 200 + return web.json_response(result.model_dump(mode="json"), status=status) + @ROUTES.put(f"/api/assets/{{id:{UUID_RE}}}") async def update_asset(request: web.Request) -> web.Response: diff --git a/app/assets/api/schemas_in.py b/app/assets/api/schemas_in.py index 6707ffb0c..b141e957c 100644 --- a/app/assets/api/schemas_in.py +++ b/app/assets/api/schemas_in.py @@ -1,4 +1,5 @@ import json +from dataclasses import dataclass from typing import Any, Literal from pydantic import ( @@ -10,6 +11,52 @@ from pydantic import ( model_validator, ) + +class UploadError(Exception): + """Error during upload parsing with HTTP status and code (used in HTTP layer only).""" + + def __init__(self, status: int, code: str, message: str): + super().__init__(message) + self.status = status + self.code = code + self.message = message + + +class AssetValidationError(Exception): + """Validation error in asset processing (invalid tags, metadata, etc.).""" + + def __init__(self, code: str, message: str): + super().__init__(message) + self.code = code + + +class AssetNotFoundError(Exception): + """Asset or asset content not found.""" + + def __init__(self, message: str): + super().__init__(message) + + +class HashMismatchError(Exception): + """Uploaded file hash does not match provided hash.""" + + pass + + +@dataclass +class ParsedUpload: + """Result of parsing a multipart upload request.""" + + file_present: bool + file_written: int + file_client_name: str | None + tmp_path: str | None + tags_raw: list[str] + provided_name: str | None + user_metadata_raw: str | None + provided_hash: str | None + provided_hash_exists: bool | None + class ListAssetsQuery(BaseModel): include_tags: list[str] = Field(default_factory=list) exclude_tags: list[str] = Field(default_factory=list) diff --git a/app/assets/api/upload.py b/app/assets/api/upload.py new file mode 100644 index 000000000..3be3e9cec --- /dev/null +++ b/app/assets/api/upload.py @@ -0,0 +1,155 @@ +""" +Multipart upload parsing for asset uploads. + +This module handles the HTTP-specific concerns of parsing multipart form data, +streaming file uploads to temp storage, and validating hash fields. +""" +import os +import uuid + +from aiohttp import web + +import folder_paths +from app.assets.api.schemas_in import ParsedUpload, UploadError + + +def validate_hash_format(s: str) -> str: + """ + Validate and normalize a hash string. + + Returns canonical 'blake3:' or raises UploadError. + """ + s = s.strip().lower() + if not s: + raise UploadError(400, "INVALID_HASH", "hash must be like 'blake3:'") + if ":" not in s: + raise UploadError(400, "INVALID_HASH", "hash must be like 'blake3:'") + algo, digest = s.split(":", 1) + if algo != "blake3" or not digest or any(c for c in digest if c not in "0123456789abcdef"): + raise UploadError(400, "INVALID_HASH", "hash must be like 'blake3:'") + return f"{algo}:{digest}" + + +async def parse_multipart_upload( + request: web.Request, + check_hash_exists: callable, +) -> ParsedUpload: + """ + Parse a multipart/form-data upload request. + + Args: + request: The aiohttp request + check_hash_exists: Callable(hash_str) -> bool to check if a hash exists + + Returns: + ParsedUpload with parsed fields and temp file path + + Raises: + UploadError: On validation or I/O errors + """ + if not (request.content_type or "").lower().startswith("multipart/"): + raise UploadError(415, "UNSUPPORTED_MEDIA_TYPE", "Use multipart/form-data for uploads.") + + reader = await request.multipart() + + file_present = False + file_client_name: str | None = None + tags_raw: list[str] = [] + provided_name: str | None = None + user_metadata_raw: str | None = None + provided_hash: str | None = None + provided_hash_exists: bool | None = None + + file_written = 0 + tmp_path: str | None = None + + while True: + field = await reader.next() + if field is None: + break + + fname = getattr(field, "name", "") or "" + + if fname == "hash": + try: + s = ((await field.text()) or "").strip().lower() + except Exception: + raise UploadError(400, "INVALID_HASH", "hash must be like 'blake3:'") + + if s: + provided_hash = validate_hash_format(s) + try: + provided_hash_exists = check_hash_exists(provided_hash) + except Exception: + provided_hash_exists = None # do not fail the whole request here + + elif fname == "file": + file_present = True + file_client_name = (field.filename or "").strip() + + if provided_hash and provided_hash_exists is True: + # If client supplied a hash that we know exists, drain but do not write to disk + try: + while True: + chunk = await field.read_chunk(8 * 1024 * 1024) + if not chunk: + break + file_written += len(chunk) + except Exception: + raise UploadError(500, "UPLOAD_IO_ERROR", "Failed to receive uploaded file.") + continue # Do not create temp file; we will create AssetInfo from the existing content + + # Otherwise, store to temp for hashing/ingest + uploads_root = os.path.join(folder_paths.get_temp_directory(), "uploads") + unique_dir = os.path.join(uploads_root, uuid.uuid4().hex) + os.makedirs(unique_dir, exist_ok=True) + tmp_path = os.path.join(unique_dir, ".upload.part") + + try: + with open(tmp_path, "wb") as f: + while True: + chunk = await field.read_chunk(8 * 1024 * 1024) + if not chunk: + break + f.write(chunk) + file_written += len(chunk) + except Exception: + _cleanup_temp(tmp_path) + raise UploadError(500, "UPLOAD_IO_ERROR", "Failed to receive and store uploaded file.") + + elif fname == "tags": + tags_raw.append((await field.text()) or "") + elif fname == "name": + provided_name = (await field.text()) or None + elif fname == "user_metadata": + user_metadata_raw = (await field.text()) or None + + # Validate we have either a file or a known hash + if not file_present and not (provided_hash and provided_hash_exists): + raise UploadError(400, "MISSING_FILE", "Form must include a 'file' part or a known 'hash'.") + + if file_present and file_written == 0 and not (provided_hash and provided_hash_exists): + _cleanup_temp(tmp_path) + raise UploadError(400, "EMPTY_UPLOAD", "Uploaded file is empty.") + + return ParsedUpload( + file_present=file_present, + file_written=file_written, + file_client_name=file_client_name, + tmp_path=tmp_path, + tags_raw=tags_raw, + provided_name=provided_name, + user_metadata_raw=user_metadata_raw, + provided_hash=provided_hash, + provided_hash_exists=provided_hash_exists, + ) + + +def _cleanup_temp(tmp_path: str | None) -> None: + """Safely remove a temp file if it exists.""" + if tmp_path: + try: + if os.path.exists(tmp_path): + os.remove(tmp_path) + except Exception: + pass diff --git a/app/assets/manager.py b/app/assets/manager.py index b972204ad..efb94af71 100644 --- a/app/assets/manager.py +++ b/app/assets/manager.py @@ -12,9 +12,19 @@ import mimetypes import contextlib from typing import Sequence +from pydantic import ValidationError + +import folder_paths import app.assets.services.hashing as hashing from app.database.db import create_session from app.assets.api import schemas_out, schemas_in +from app.assets.api.schemas_in import ( + AssetNotFoundError, + AssetValidationError, + HashMismatchError, + ParsedUpload, +) +from app.assets.api.upload import _cleanup_temp from app.assets.database.queries import ( asset_exists_by_hash, fetch_asset_info_and_asset, @@ -292,6 +302,81 @@ def upload_asset_from_temp_path( ) +def process_upload( + parsed: ParsedUpload, + owner_id: str = "", +) -> schemas_out.AssetCreated: + """ + Process a parsed multipart upload. + + Args: + parsed: The parsed upload data from parse_multipart_upload + owner_id: The owner ID for the asset + + Returns: + AssetCreated response (check created_new to determine if new asset was created) + + Raises: + UploadError: On validation or processing errors + """ + try: + spec = schemas_in.UploadAssetSpec.model_validate({ + "tags": parsed.tags_raw, + "name": parsed.provided_name, + "user_metadata": parsed.user_metadata_raw, + "hash": parsed.provided_hash, + }) + except ValidationError as ve: + _cleanup_temp(parsed.tmp_path) + raise AssetValidationError("INVALID_BODY", f"Validation failed: {ve.json()}") + + # Validate models category against configured folders + if spec.tags and spec.tags[0] == "models": + if len(spec.tags) < 2 or spec.tags[1] not in folder_paths.folder_names_and_paths: + _cleanup_temp(parsed.tmp_path) + category = spec.tags[1] if len(spec.tags) >= 2 else "" + raise AssetValidationError("INVALID_BODY", f"unknown models category '{category}'") + + # Fast path: if a valid provided hash exists, create AssetInfo without writing anything + if spec.hash and parsed.provided_hash_exists is True: + result = create_asset_from_hash( + hash_str=spec.hash, + name=spec.name or (spec.hash.split(":", 1)[1]), + tags=spec.tags, + user_metadata=spec.user_metadata or {}, + owner_id=owner_id, + ) + + if result is None: + raise AssetNotFoundError(f"Asset content {spec.hash} does not exist") + + # Drain temp if we accidentally saved (e.g., hash field came after file) + _cleanup_temp(parsed.tmp_path) + return result + + # Otherwise, we must have a temp file path to ingest + if not parsed.tmp_path or not os.path.exists(parsed.tmp_path): + raise AssetNotFoundError("Provided hash not found and no file uploaded.") + + try: + return upload_asset_from_temp_path( + spec, + temp_path=parsed.tmp_path, + client_filename=parsed.file_client_name, + owner_id=owner_id, + expected_asset_hash=spec.hash, + ) + except ValueError as e: + _cleanup_temp(parsed.tmp_path) + msg = str(e) + if "HASH_MISMATCH" in msg or msg.strip().upper() == "HASH_MISMATCH": + raise HashMismatchError("Uploaded file hash does not match provided hash.") + raise AssetValidationError("BAD_REQUEST", "Invalid inputs.") + except Exception: + _cleanup_temp(parsed.tmp_path) + raise + + def update_asset( asset_info_id: str, name: str | None = None, @@ -332,7 +417,7 @@ def set_asset_preview( owner_id=owner_id, ) info = result["info"] - asset = result["asset"] + asset = result["asset"]T tags = result["tags"] return schemas_out.AssetDetail(