From 0379eff0b56a5a22c145815d861e08f109497411 Mon Sep 17 00:00:00 2001 From: bigcat88 Date: Wed, 27 Aug 2025 21:18:26 +0300 Subject: [PATCH] allow Upload Asset endpoint to accept hash (as documentation requires) --- app/api/assets_routes.py | 117 ++++++++++++++++++++++++++++++++------- app/api/schemas_in.py | 19 +++++++ app/assets_manager.py | 49 ++++++++++++++-- 3 files changed, 162 insertions(+), 23 deletions(-) diff --git a/app/api/assets_routes.py b/app/api/assets_routes.py index b5c25dcec..bdbb36167 100644 --- a/app/api/assets_routes.py +++ b/app/api/assets_routes.py @@ -1,3 +1,4 @@ +import contextlib import os import uuid import urllib.parse @@ -115,29 +116,63 @@ async def upload_asset(request: web.Request) -> web.Response: reader = await request.multipart() - file_field = None + file_present = False file_client_name: Optional[str] = None tags_raw: list[str] = [] provided_name: Optional[str] = None user_metadata_raw: Optional[str] = None - file_written = 0 + provided_hash: Optional[str] = None + provided_hash_exists: Optional[bool] = None + file_written = 0 tmp_path: Optional[str] = None while True: field = await reader.next() if field is None: break - fname = getattr(field, "name", None) or "" - if fname == "file": - # Save to temp + fname = getattr(field, "name", "") or "" + + if fname == "hash": + try: + s = ((await field.text()) or "").strip().lower() + except Exception: + return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") + + if s: + if ":" not in s: + return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") + algo, digest = s.split(":", 1) + if algo != "blake3" or not digest or any(c for c in digest if c not in "0123456789abcdef"): + return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") + provided_hash = f"{algo}:{digest}" + try: + provided_hash_exists = await assets_manager.asset_exists(asset_hash=provided_hash) + except Exception: + provided_hash_exists = None # do not fail the whole request here + + elif fname == "file": + file_present = True + file_client_name = (field.filename or "").strip() + + if provided_hash and provided_hash_exists is True: + # If client supplied a hash that we know exists, drain but do not write to disk + try: + while True: + chunk = await field.read_chunk(8 * 1024 * 1024) + if not chunk: + break + file_written += len(chunk) + except Exception: + return _error_response(500, "UPLOAD_IO_ERROR", "Failed to receive uploaded file.") + continue # Do not create temp file; we will create AssetInfo from the existing content + + # Otherwise, store to temp for hashing/ingest uploads_root = os.path.join(folder_paths.get_temp_directory(), "uploads") unique_dir = os.path.join(uploads_root, uuid.uuid4().hex) os.makedirs(unique_dir, exist_ok=True) tmp_path = os.path.join(unique_dir, ".upload.part") - file_field = field - file_client_name = (field.filename or "").strip() try: with open(tmp_path, "wb") as f: while True: @@ -148,7 +183,7 @@ async def upload_asset(request: web.Request) -> web.Response: file_written += len(chunk) except Exception: try: - if os.path.exists(tmp_path): + if os.path.exists(tmp_path or ""): os.remove(tmp_path) finally: return _error_response(500, "UPLOAD_IO_ERROR", "Failed to receive and store uploaded file.") @@ -159,12 +194,15 @@ async def upload_asset(request: web.Request) -> web.Response: elif fname == "user_metadata": user_metadata_raw = (await field.text()) or None - if file_field is None: - return _error_response(400, "MISSING_FILE", "Form must include a 'file' part.") + # If client did not send file, and we are not doing a from-hash fast path -> error + if not file_present and not (provided_hash and provided_hash_exists): + return _error_response(400, "MISSING_FILE", "Form must include a 'file' part or a known 'hash'.") - if file_written == 0: + if file_present and file_written == 0 and not (provided_hash and provided_hash_exists): + # Empty upload is only acceptable if we are fast-pathing from existing hash try: - os.remove(tmp_path) + if tmp_path and os.path.exists(tmp_path): + os.remove(tmp_path) finally: return _error_response(400, "EMPTY_UPLOAD", "Uploaded file is empty.") @@ -173,29 +211,70 @@ async def upload_asset(request: web.Request) -> web.Response: "tags": tags_raw, "name": provided_name, "user_metadata": user_metadata_raw, + "hash": provided_hash, }) except ValidationError as ve: try: - os.remove(tmp_path) + if tmp_path and os.path.exists(tmp_path): + os.remove(tmp_path) finally: return _validation_error_response("INVALID_BODY", ve) - if spec.tags[0] == "models" and spec.tags[1] not in folder_paths.folder_names_and_paths: + # Validate models category against configured folders (consistent with previous behavior) + if spec.tags and spec.tags[0] == "models": + if len(spec.tags) < 2 or spec.tags[1] not in folder_paths.folder_names_and_paths: + if tmp_path and os.path.exists(tmp_path): + os.remove(tmp_path) + return _error_response( + 400, "INVALID_BODY", f"unknown models category '{spec.tags[1] if len(spec.tags) >= 2 else ''}'" + ) + + owner_id = UserManager.get_request_user_id(request) + + # Fast path: if a valid provided hash exists, create AssetInfo without writing anything + if spec.hash and provided_hash_exists is True: + try: + result = await assets_manager.create_asset_from_hash( + hash_str=spec.hash, + name=spec.name or (spec.hash.split(":", 1)[1]), + tags=spec.tags, + user_metadata=spec.user_metadata or {}, + owner_id=owner_id, + ) + except Exception: + return _error_response(500, "INTERNAL", "Unexpected server error.") + + if result is None: + return _error_response(404, "ASSET_NOT_FOUND", f"Asset content {spec.hash} does not exist") + + # Drain temp if we accidentally saved (e.g., hash field came after file) if tmp_path and os.path.exists(tmp_path): - os.remove(tmp_path) - return _error_response(400, "INVALID_BODY", f"unknown models category '{spec.tags[1]}'") + with contextlib.suppress(Exception): + os.remove(tmp_path) + + status = 200 if (not result.created_new) else 201 + return web.json_response(result.model_dump(mode="json"), status=status) + + # Otherwise, we must have a temp file path to ingest + if not tmp_path or not os.path.exists(tmp_path): + # The only case we reach here without a temp file is: client sent a hash that does not exist and no file + return _error_response(404, "ASSET_NOT_FOUND", "Provided hash not found and no file uploaded.") try: created = await assets_manager.upload_asset_from_temp_path( spec, temp_path=tmp_path, client_filename=file_client_name, - owner_id=UserManager.get_request_user_id(request), + owner_id=owner_id, ) - return web.json_response(created.model_dump(mode="json"), status=201) - except ValueError: + status = 201 if created.created_new else 200 + return web.json_response(created.model_dump(mode="json"), status=status) + except ValueError as e: if tmp_path and os.path.exists(tmp_path): os.remove(tmp_path) + msg = str(e) + if "HASH_MISMATCH" in msg or msg.strip().upper() == "HASH_MISMATCH": + return _error_response(400, "HASH_MISMATCH", "Uploaded file hash does not match provided hash.") return _error_response(400, "BAD_REQUEST", "Invalid inputs.") except Exception: if tmp_path and os.path.exists(tmp_path): diff --git a/app/api/schemas_in.py b/app/api/schemas_in.py index 9694a67a6..412b72e3a 100644 --- a/app/api/schemas_in.py +++ b/app/api/schemas_in.py @@ -180,12 +180,31 @@ class UploadAssetSpec(BaseModel): if root == 'models', second must be a valid category from folder_paths.folder_names_and_paths - name: desired filename (optional); fallback will be the file hash - user_metadata: arbitrary JSON object (optional) + - hash: optional canonical 'blake3:' provided by the client for validation / fast-path """ model_config = ConfigDict(extra="ignore", str_strip_whitespace=True) tags: list[str] = Field(..., min_length=1) name: Optional[str] = Field(default=None, max_length=512) user_metadata: dict[str, Any] = Field(default_factory=dict) + hash: Optional[str] = Field(default=None) + + @field_validator("hash", mode="before") + @classmethod + def _parse_hash(cls, v): + if v is None: + return None + s = str(v).strip().lower() + if not s: + return None + if ":" not in s: + raise ValueError("hash must be 'blake3:'") + algo, digest = s.split(":", 1) + if algo != "blake3": + raise ValueError("only canonical 'blake3:' is accepted here") + if not digest or any(c for c in digest if c not in "0123456789abcdef"): + raise ValueError("hash digest must be lowercase hex") + return f"{algo}:{digest}" @field_validator("tags", mode="before") @classmethod diff --git a/app/assets_manager.py b/app/assets_manager.py index 8cdf1fffc..bb2762497 100644 --- a/app/assets_manager.py +++ b/app/assets_manager.py @@ -1,3 +1,4 @@ +import contextlib import logging import mimetypes import os @@ -208,13 +209,14 @@ async def upload_asset_from_temp_path( temp_path: str, client_filename: Optional[str] = None, owner_id: str = "", + expected_asset_hash: Optional[str] = None, ) -> schemas_out.AssetCreated: """ Finalize an uploaded temp file: - compute blake3 hash - - resolve destination from tags - - decide filename (spec.name or client filename or hash) - - move file atomically + - if expected_asset_hash provided, verify equality (400 on mismatch at caller) + - if an Asset with the same hash exists: discard temp, create AssetInfo only (no write) + - else resolve destination from tags and atomically move into place - ingest into DB (assets, locator state, asset_info + tags) Returns a populated AssetCreated payload. """ @@ -225,7 +227,46 @@ async def upload_asset_from_temp_path( raise RuntimeError(f"failed to hash uploaded file: {e}") asset_hash = "blake3:" + digest - # Resolve destination + if expected_asset_hash and asset_hash != expected_asset_hash.strip().lower(): + raise ValueError("HASH_MISMATCH") + + # Fast path: content already known --> no writes, just create a reference + async with await create_session() as session: + existing = await get_asset_by_hash(session, asset_hash=asset_hash) + if existing is not None: + with contextlib.suppress(Exception): + if temp_path and os.path.exists(temp_path): + os.remove(temp_path) + + desired_name = _safe_filename(spec.name or (client_filename or ""), fallback=digest) + info = await create_asset_info_for_existing_asset( + session, + asset_hash=asset_hash, + name=desired_name, + user_metadata=spec.user_metadata or {}, + tags=spec.tags or [], + tag_origin="manual", + added_by=None, + owner_id=owner_id, + ) + tag_names = await get_asset_tags(session, asset_info_id=info.id) + await session.commit() + + return schemas_out.AssetCreated( + id=info.id, + name=info.name, + asset_hash=info.asset_hash, + size=int(existing.size_bytes) if existing.size_bytes is not None else None, + mime_type=existing.mime_type, + tags=tag_names, + user_metadata=info.user_metadata or {}, + preview_hash=info.preview_hash, + created_at=info.created_at, + last_access_time=info.last_access_time, + created_new=False, + ) + + # Resolve destination (only for truly new content) base_dir, subdirs = resolve_destination_from_tags(spec.tags) dest_dir = os.path.join(base_dir, *subdirs) if subdirs else base_dir os.makedirs(dest_dir, exist_ok=True)