refactor: extract multipart upload parsing from routes

- Add app/assets/api/upload.py with parse_multipart_upload() for HTTP parsing
- Add ParsedUpload dataclass to schemas_in.py
- Add domain exceptions (AssetValidationError, AssetNotFoundError, HashMismatchError)
- Add manager.process_upload() with domain exceptions (no HTTP status codes)
- Routes map domain exceptions to HTTP responses
- Slim down upload_asset route to ~20 lines (was ~150)

Amp-Thread-ID: https://ampcode.com/threads/T-019c2519-abe1-738a-ad2e-29ece17c0e42
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr 2026-02-03 12:12:25 -08:00
parent e987bd268f
commit 9f9db2c2c2
4 changed files with 309 additions and 170 deletions

View File

@ -10,6 +10,13 @@ from pydantic import ValidationError
import app.assets.manager as manager import app.assets.manager as manager
from app import user_manager from app import user_manager
from app.assets.api import schemas_in from app.assets.api import schemas_in
from app.assets.api.schemas_in import (
AssetNotFoundError,
AssetValidationError,
HashMismatchError,
UploadError,
)
from app.assets.api.upload import parse_multipart_upload
from app.assets.services.scanner import seed_assets from app.assets.services.scanner import seed_assets
from typing import Any from typing import Any
@ -185,183 +192,28 @@ async def create_asset_from_hash(request: web.Request) -> web.Response:
@ROUTES.post("/api/assets") @ROUTES.post("/api/assets")
async def upload_asset(request: web.Request) -> web.Response: async def upload_asset(request: web.Request) -> web.Response:
"""Multipart/form-data endpoint for Asset uploads.""" """Multipart/form-data endpoint for Asset uploads."""
if not (request.content_type or "").lower().startswith("multipart/"):
return _error_response(415, "UNSUPPORTED_MEDIA_TYPE", "Use multipart/form-data for uploads.")
reader = await request.multipart()
file_present = False
file_client_name: str | None = None
tags_raw: list[str] = []
provided_name: str | None = None
user_metadata_raw: str | None = None
provided_hash: str | None = None
provided_hash_exists: bool | None = None
file_written = 0
tmp_path: str | None = None
while True:
field = await reader.next()
if field is None:
break
fname = getattr(field, "name", "") or ""
if fname == "hash":
try:
s = ((await field.text()) or "").strip().lower()
except Exception:
return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:<hex>'")
if s:
if ":" not in s:
return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:<hex>'")
algo, digest = s.split(":", 1)
if algo != "blake3" or not digest or any(c for c in digest if c not in "0123456789abcdef"):
return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:<hex>'")
provided_hash = f"{algo}:{digest}"
try:
provided_hash_exists = manager.asset_exists(asset_hash=provided_hash)
except Exception:
provided_hash_exists = None # do not fail the whole request here
elif fname == "file":
file_present = True
file_client_name = (field.filename or "").strip()
if provided_hash and provided_hash_exists is True:
# If client supplied a hash that we know exists, drain but do not write to disk
try:
while True:
chunk = await field.read_chunk(8 * 1024 * 1024)
if not chunk:
break
file_written += len(chunk)
except Exception:
return _error_response(500, "UPLOAD_IO_ERROR", "Failed to receive uploaded file.")
continue # Do not create temp file; we will create AssetInfo from the existing content
# Otherwise, store to temp for hashing/ingest
uploads_root = os.path.join(folder_paths.get_temp_directory(), "uploads")
unique_dir = os.path.join(uploads_root, uuid.uuid4().hex)
os.makedirs(unique_dir, exist_ok=True)
tmp_path = os.path.join(unique_dir, ".upload.part")
try:
with open(tmp_path, "wb") as f:
while True:
chunk = await field.read_chunk(8 * 1024 * 1024)
if not chunk:
break
f.write(chunk)
file_written += len(chunk)
except Exception:
try:
if os.path.exists(tmp_path or ""):
os.remove(tmp_path)
finally:
return _error_response(500, "UPLOAD_IO_ERROR", "Failed to receive and store uploaded file.")
elif fname == "tags":
tags_raw.append((await field.text()) or "")
elif fname == "name":
provided_name = (await field.text()) or None
elif fname == "user_metadata":
user_metadata_raw = (await field.text()) or None
# If client did not send file, and we are not doing a from-hash fast path -> error
if not file_present and not (provided_hash and provided_hash_exists):
return _error_response(400, "MISSING_FILE", "Form must include a 'file' part or a known 'hash'.")
if file_present and file_written == 0 and not (provided_hash and provided_hash_exists):
# Empty upload is only acceptable if we are fast-pathing from existing hash
try:
if tmp_path and os.path.exists(tmp_path):
os.remove(tmp_path)
finally:
return _error_response(400, "EMPTY_UPLOAD", "Uploaded file is empty.")
try: try:
spec = schemas_in.UploadAssetSpec.model_validate({ parsed = await parse_multipart_upload(request, check_hash_exists=manager.asset_exists)
"tags": tags_raw, except UploadError as e:
"name": provided_name, return _error_response(e.status, e.code, e.message)
"user_metadata": user_metadata_raw,
"hash": provided_hash,
})
except ValidationError as ve:
try:
if tmp_path and os.path.exists(tmp_path):
os.remove(tmp_path)
finally:
return _validation_error_response("INVALID_BODY", ve)
# Validate models category against configured folders (consistent with previous behavior)
if spec.tags and spec.tags[0] == "models":
if len(spec.tags) < 2 or spec.tags[1] not in folder_paths.folder_names_and_paths:
if tmp_path and os.path.exists(tmp_path):
os.remove(tmp_path)
return _error_response(
400, "INVALID_BODY", f"unknown models category '{spec.tags[1] if len(spec.tags) >= 2 else ''}'"
)
owner_id = USER_MANAGER.get_request_user_id(request) owner_id = USER_MANAGER.get_request_user_id(request)
# Fast path: if a valid provided hash exists, create AssetInfo without writing anything
if spec.hash and provided_hash_exists is True:
try:
result = manager.create_asset_from_hash(
hash_str=spec.hash,
name=spec.name or (spec.hash.split(":", 1)[1]),
tags=spec.tags,
user_metadata=spec.user_metadata or {},
owner_id=owner_id,
)
except Exception:
logging.exception("create_asset_from_hash failed for hash=%s, owner_id=%s", spec.hash, owner_id)
return _error_response(500, "INTERNAL", "Unexpected server error.")
if result is None:
return _error_response(404, "ASSET_NOT_FOUND", f"Asset content {spec.hash} does not exist")
# Drain temp if we accidentally saved (e.g., hash field came after file)
if tmp_path and os.path.exists(tmp_path):
with contextlib.suppress(Exception):
os.remove(tmp_path)
status = 200 if (not result.created_new) else 201
return web.json_response(result.model_dump(mode="json"), status=status)
# Otherwise, we must have a temp file path to ingest
if not tmp_path or not os.path.exists(tmp_path):
# The only case we reach here without a temp file is: client sent a hash that does not exist and no file
return _error_response(404, "ASSET_NOT_FOUND", "Provided hash not found and no file uploaded.")
try: try:
created = manager.upload_asset_from_temp_path( result = manager.process_upload(parsed=parsed, owner_id=owner_id)
spec, except AssetValidationError as e:
temp_path=tmp_path, return _error_response(400, e.code, str(e))
client_filename=file_client_name, except AssetNotFoundError as e:
owner_id=owner_id, return _error_response(404, "ASSET_NOT_FOUND", str(e))
expected_asset_hash=spec.hash, except HashMismatchError as e:
) return _error_response(400, "HASH_MISMATCH", str(e))
status = 201 if created.created_new else 200
return web.json_response(created.model_dump(mode="json"), status=status)
except ValueError as e:
if tmp_path and os.path.exists(tmp_path):
os.remove(tmp_path)
msg = str(e)
if "HASH_MISMATCH" in msg or msg.strip().upper() == "HASH_MISMATCH":
return _error_response(
400,
"HASH_MISMATCH",
"Uploaded file hash does not match provided hash.",
)
return _error_response(400, "BAD_REQUEST", "Invalid inputs.")
except Exception: except Exception:
if tmp_path and os.path.exists(tmp_path): logging.exception("process_upload failed for owner_id=%s", owner_id)
os.remove(tmp_path)
logging.exception("upload_asset_from_temp_path failed for tmp_path=%s, owner_id=%s", tmp_path, owner_id)
return _error_response(500, "INTERNAL", "Unexpected server error.") return _error_response(500, "INTERNAL", "Unexpected server error.")
status = 201 if result.created_new else 200
return web.json_response(result.model_dump(mode="json"), status=status)
@ROUTES.put(f"/api/assets/{{id:{UUID_RE}}}") @ROUTES.put(f"/api/assets/{{id:{UUID_RE}}}")
async def update_asset(request: web.Request) -> web.Response: async def update_asset(request: web.Request) -> web.Response:

View File

@ -1,4 +1,5 @@
import json import json
from dataclasses import dataclass
from typing import Any, Literal from typing import Any, Literal
from pydantic import ( from pydantic import (
@ -10,6 +11,52 @@ from pydantic import (
model_validator, model_validator,
) )
class UploadError(Exception):
"""Error during upload parsing with HTTP status and code (used in HTTP layer only)."""
def __init__(self, status: int, code: str, message: str):
super().__init__(message)
self.status = status
self.code = code
self.message = message
class AssetValidationError(Exception):
"""Validation error in asset processing (invalid tags, metadata, etc.)."""
def __init__(self, code: str, message: str):
super().__init__(message)
self.code = code
class AssetNotFoundError(Exception):
"""Asset or asset content not found."""
def __init__(self, message: str):
super().__init__(message)
class HashMismatchError(Exception):
"""Uploaded file hash does not match provided hash."""
pass
@dataclass
class ParsedUpload:
"""Result of parsing a multipart upload request."""
file_present: bool
file_written: int
file_client_name: str | None
tmp_path: str | None
tags_raw: list[str]
provided_name: str | None
user_metadata_raw: str | None
provided_hash: str | None
provided_hash_exists: bool | None
class ListAssetsQuery(BaseModel): class ListAssetsQuery(BaseModel):
include_tags: list[str] = Field(default_factory=list) include_tags: list[str] = Field(default_factory=list)
exclude_tags: list[str] = Field(default_factory=list) exclude_tags: list[str] = Field(default_factory=list)

155
app/assets/api/upload.py Normal file
View File

@ -0,0 +1,155 @@
"""
Multipart upload parsing for asset uploads.
This module handles the HTTP-specific concerns of parsing multipart form data,
streaming file uploads to temp storage, and validating hash fields.
"""
import os
import uuid
from aiohttp import web
import folder_paths
from app.assets.api.schemas_in import ParsedUpload, UploadError
def validate_hash_format(s: str) -> str:
"""
Validate and normalize a hash string.
Returns canonical 'blake3:<hex>' or raises UploadError.
"""
s = s.strip().lower()
if not s:
raise UploadError(400, "INVALID_HASH", "hash must be like 'blake3:<hex>'")
if ":" not in s:
raise UploadError(400, "INVALID_HASH", "hash must be like 'blake3:<hex>'")
algo, digest = s.split(":", 1)
if algo != "blake3" or not digest or any(c for c in digest if c not in "0123456789abcdef"):
raise UploadError(400, "INVALID_HASH", "hash must be like 'blake3:<hex>'")
return f"{algo}:{digest}"
async def parse_multipart_upload(
request: web.Request,
check_hash_exists: callable,
) -> ParsedUpload:
"""
Parse a multipart/form-data upload request.
Args:
request: The aiohttp request
check_hash_exists: Callable(hash_str) -> bool to check if a hash exists
Returns:
ParsedUpload with parsed fields and temp file path
Raises:
UploadError: On validation or I/O errors
"""
if not (request.content_type or "").lower().startswith("multipart/"):
raise UploadError(415, "UNSUPPORTED_MEDIA_TYPE", "Use multipart/form-data for uploads.")
reader = await request.multipart()
file_present = False
file_client_name: str | None = None
tags_raw: list[str] = []
provided_name: str | None = None
user_metadata_raw: str | None = None
provided_hash: str | None = None
provided_hash_exists: bool | None = None
file_written = 0
tmp_path: str | None = None
while True:
field = await reader.next()
if field is None:
break
fname = getattr(field, "name", "") or ""
if fname == "hash":
try:
s = ((await field.text()) or "").strip().lower()
except Exception:
raise UploadError(400, "INVALID_HASH", "hash must be like 'blake3:<hex>'")
if s:
provided_hash = validate_hash_format(s)
try:
provided_hash_exists = check_hash_exists(provided_hash)
except Exception:
provided_hash_exists = None # do not fail the whole request here
elif fname == "file":
file_present = True
file_client_name = (field.filename or "").strip()
if provided_hash and provided_hash_exists is True:
# If client supplied a hash that we know exists, drain but do not write to disk
try:
while True:
chunk = await field.read_chunk(8 * 1024 * 1024)
if not chunk:
break
file_written += len(chunk)
except Exception:
raise UploadError(500, "UPLOAD_IO_ERROR", "Failed to receive uploaded file.")
continue # Do not create temp file; we will create AssetInfo from the existing content
# Otherwise, store to temp for hashing/ingest
uploads_root = os.path.join(folder_paths.get_temp_directory(), "uploads")
unique_dir = os.path.join(uploads_root, uuid.uuid4().hex)
os.makedirs(unique_dir, exist_ok=True)
tmp_path = os.path.join(unique_dir, ".upload.part")
try:
with open(tmp_path, "wb") as f:
while True:
chunk = await field.read_chunk(8 * 1024 * 1024)
if not chunk:
break
f.write(chunk)
file_written += len(chunk)
except Exception:
_cleanup_temp(tmp_path)
raise UploadError(500, "UPLOAD_IO_ERROR", "Failed to receive and store uploaded file.")
elif fname == "tags":
tags_raw.append((await field.text()) or "")
elif fname == "name":
provided_name = (await field.text()) or None
elif fname == "user_metadata":
user_metadata_raw = (await field.text()) or None
# Validate we have either a file or a known hash
if not file_present and not (provided_hash and provided_hash_exists):
raise UploadError(400, "MISSING_FILE", "Form must include a 'file' part or a known 'hash'.")
if file_present and file_written == 0 and not (provided_hash and provided_hash_exists):
_cleanup_temp(tmp_path)
raise UploadError(400, "EMPTY_UPLOAD", "Uploaded file is empty.")
return ParsedUpload(
file_present=file_present,
file_written=file_written,
file_client_name=file_client_name,
tmp_path=tmp_path,
tags_raw=tags_raw,
provided_name=provided_name,
user_metadata_raw=user_metadata_raw,
provided_hash=provided_hash,
provided_hash_exists=provided_hash_exists,
)
def _cleanup_temp(tmp_path: str | None) -> None:
"""Safely remove a temp file if it exists."""
if tmp_path:
try:
if os.path.exists(tmp_path):
os.remove(tmp_path)
except Exception:
pass

View File

@ -12,9 +12,19 @@ import mimetypes
import contextlib import contextlib
from typing import Sequence from typing import Sequence
from pydantic import ValidationError
import folder_paths
import app.assets.services.hashing as hashing import app.assets.services.hashing as hashing
from app.database.db import create_session from app.database.db import create_session
from app.assets.api import schemas_out, schemas_in from app.assets.api import schemas_out, schemas_in
from app.assets.api.schemas_in import (
AssetNotFoundError,
AssetValidationError,
HashMismatchError,
ParsedUpload,
)
from app.assets.api.upload import _cleanup_temp
from app.assets.database.queries import ( from app.assets.database.queries import (
asset_exists_by_hash, asset_exists_by_hash,
fetch_asset_info_and_asset, fetch_asset_info_and_asset,
@ -292,6 +302,81 @@ def upload_asset_from_temp_path(
) )
def process_upload(
parsed: ParsedUpload,
owner_id: str = "",
) -> schemas_out.AssetCreated:
"""
Process a parsed multipart upload.
Args:
parsed: The parsed upload data from parse_multipart_upload
owner_id: The owner ID for the asset
Returns:
AssetCreated response (check created_new to determine if new asset was created)
Raises:
UploadError: On validation or processing errors
"""
try:
spec = schemas_in.UploadAssetSpec.model_validate({
"tags": parsed.tags_raw,
"name": parsed.provided_name,
"user_metadata": parsed.user_metadata_raw,
"hash": parsed.provided_hash,
})
except ValidationError as ve:
_cleanup_temp(parsed.tmp_path)
raise AssetValidationError("INVALID_BODY", f"Validation failed: {ve.json()}")
# Validate models category against configured folders
if spec.tags and spec.tags[0] == "models":
if len(spec.tags) < 2 or spec.tags[1] not in folder_paths.folder_names_and_paths:
_cleanup_temp(parsed.tmp_path)
category = spec.tags[1] if len(spec.tags) >= 2 else ""
raise AssetValidationError("INVALID_BODY", f"unknown models category '{category}'")
# Fast path: if a valid provided hash exists, create AssetInfo without writing anything
if spec.hash and parsed.provided_hash_exists is True:
result = create_asset_from_hash(
hash_str=spec.hash,
name=spec.name or (spec.hash.split(":", 1)[1]),
tags=spec.tags,
user_metadata=spec.user_metadata or {},
owner_id=owner_id,
)
if result is None:
raise AssetNotFoundError(f"Asset content {spec.hash} does not exist")
# Drain temp if we accidentally saved (e.g., hash field came after file)
_cleanup_temp(parsed.tmp_path)
return result
# Otherwise, we must have a temp file path to ingest
if not parsed.tmp_path or not os.path.exists(parsed.tmp_path):
raise AssetNotFoundError("Provided hash not found and no file uploaded.")
try:
return upload_asset_from_temp_path(
spec,
temp_path=parsed.tmp_path,
client_filename=parsed.file_client_name,
owner_id=owner_id,
expected_asset_hash=spec.hash,
)
except ValueError as e:
_cleanup_temp(parsed.tmp_path)
msg = str(e)
if "HASH_MISMATCH" in msg or msg.strip().upper() == "HASH_MISMATCH":
raise HashMismatchError("Uploaded file hash does not match provided hash.")
raise AssetValidationError("BAD_REQUEST", "Invalid inputs.")
except Exception:
_cleanup_temp(parsed.tmp_path)
raise
def update_asset( def update_asset(
asset_info_id: str, asset_info_id: str,
name: str | None = None, name: str | None = None,
@ -332,7 +417,7 @@ def set_asset_preview(
owner_id=owner_id, owner_id=owner_id,
) )
info = result["info"] info = result["info"]
asset = result["asset"] asset = result["asset"]T
tags = result["tags"] tags = result["tags"]
return schemas_out.AssetDetail( return schemas_out.AssetDetail(