mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-02-06 19:42:34 +08:00
Finished @ROUTES.post("/api/assets")
This commit is contained in:
parent
41d364030b
commit
6db4f4e3f1
@ -1,6 +1,8 @@
|
|||||||
import logging
|
import logging
|
||||||
import uuid
|
import uuid
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
|
import os
|
||||||
|
import contextlib
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
|
|
||||||
from pydantic import ValidationError
|
from pydantic import ValidationError
|
||||||
@ -10,6 +12,8 @@ from app import user_manager
|
|||||||
from app.assets.api import schemas_in
|
from app.assets.api import schemas_in
|
||||||
from app.assets.helpers import get_query_dict
|
from app.assets.helpers import get_query_dict
|
||||||
|
|
||||||
|
import folder_paths
|
||||||
|
|
||||||
ROUTES = web.RouteTableDef()
|
ROUTES = web.RouteTableDef()
|
||||||
USER_MANAGER: user_manager.UserManager | None = None
|
USER_MANAGER: user_manager.UserManager | None = None
|
||||||
|
|
||||||
@ -164,3 +168,185 @@ async def create_asset_from_hash(request: web.Request) -> web.Response:
|
|||||||
return _error_response(404, "ASSET_NOT_FOUND", f"Asset content {body.hash} does not exist")
|
return _error_response(404, "ASSET_NOT_FOUND", f"Asset content {body.hash} does not exist")
|
||||||
return web.json_response(result.model_dump(mode="json"), status=201)
|
return web.json_response(result.model_dump(mode="json"), status=201)
|
||||||
|
|
||||||
|
|
||||||
|
@ROUTES.post("/api/assets")
|
||||||
|
async def upload_asset(request: web.Request) -> web.Response:
|
||||||
|
"""Multipart/form-data endpoint for Asset uploads."""
|
||||||
|
|
||||||
|
if not (request.content_type or "").lower().startswith("multipart/"):
|
||||||
|
return _error_response(415, "UNSUPPORTED_MEDIA_TYPE", "Use multipart/form-data for uploads.")
|
||||||
|
|
||||||
|
reader = await request.multipart()
|
||||||
|
|
||||||
|
file_present = False
|
||||||
|
file_client_name: str | None = None
|
||||||
|
tags_raw: list[str] = []
|
||||||
|
provided_name: str | None = None
|
||||||
|
user_metadata_raw: str | None = None
|
||||||
|
provided_hash: str | None = None
|
||||||
|
provided_hash_exists: bool | None = None
|
||||||
|
|
||||||
|
file_written = 0
|
||||||
|
tmp_path: str | None = None
|
||||||
|
while True:
|
||||||
|
field = await reader.next()
|
||||||
|
if field is None:
|
||||||
|
break
|
||||||
|
|
||||||
|
fname = getattr(field, "name", "") or ""
|
||||||
|
|
||||||
|
if fname == "hash":
|
||||||
|
try:
|
||||||
|
s = ((await field.text()) or "").strip().lower()
|
||||||
|
except Exception:
|
||||||
|
return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:<hex>'")
|
||||||
|
|
||||||
|
if s:
|
||||||
|
if ":" not in s:
|
||||||
|
return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:<hex>'")
|
||||||
|
algo, digest = s.split(":", 1)
|
||||||
|
if algo != "blake3" or not digest or any(c for c in digest if c not in "0123456789abcdef"):
|
||||||
|
return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:<hex>'")
|
||||||
|
provided_hash = f"{algo}:{digest}"
|
||||||
|
try:
|
||||||
|
provided_hash_exists = manager.asset_exists(asset_hash=provided_hash)
|
||||||
|
except Exception:
|
||||||
|
provided_hash_exists = None # do not fail the whole request here
|
||||||
|
|
||||||
|
elif fname == "file":
|
||||||
|
file_present = True
|
||||||
|
file_client_name = (field.filename or "").strip()
|
||||||
|
|
||||||
|
if provided_hash and provided_hash_exists is True:
|
||||||
|
# If client supplied a hash that we know exists, drain but do not write to disk
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
chunk = await field.read_chunk(8 * 1024 * 1024)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
file_written += len(chunk)
|
||||||
|
except Exception:
|
||||||
|
return _error_response(500, "UPLOAD_IO_ERROR", "Failed to receive uploaded file.")
|
||||||
|
continue # Do not create temp file; we will create AssetInfo from the existing content
|
||||||
|
|
||||||
|
# Otherwise, store to temp for hashing/ingest
|
||||||
|
uploads_root = os.path.join(folder_paths.get_temp_directory(), "uploads")
|
||||||
|
unique_dir = os.path.join(uploads_root, uuid.uuid4().hex)
|
||||||
|
os.makedirs(unique_dir, exist_ok=True)
|
||||||
|
tmp_path = os.path.join(unique_dir, ".upload.part")
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(tmp_path, "wb") as f:
|
||||||
|
while True:
|
||||||
|
chunk = await field.read_chunk(8 * 1024 * 1024)
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
f.write(chunk)
|
||||||
|
file_written += len(chunk)
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
if os.path.exists(tmp_path or ""):
|
||||||
|
os.remove(tmp_path)
|
||||||
|
finally:
|
||||||
|
return _error_response(500, "UPLOAD_IO_ERROR", "Failed to receive and store uploaded file.")
|
||||||
|
elif fname == "tags":
|
||||||
|
tags_raw.append((await field.text()) or "")
|
||||||
|
elif fname == "name":
|
||||||
|
provided_name = (await field.text()) or None
|
||||||
|
elif fname == "user_metadata":
|
||||||
|
user_metadata_raw = (await field.text()) or None
|
||||||
|
|
||||||
|
# If client did not send file, and we are not doing a from-hash fast path -> error
|
||||||
|
if not file_present and not (provided_hash and provided_hash_exists):
|
||||||
|
return _error_response(400, "MISSING_FILE", "Form must include a 'file' part or a known 'hash'.")
|
||||||
|
|
||||||
|
if file_present and file_written == 0 and not (provided_hash and provided_hash_exists):
|
||||||
|
# Empty upload is only acceptable if we are fast-pathing from existing hash
|
||||||
|
try:
|
||||||
|
if tmp_path and os.path.exists(tmp_path):
|
||||||
|
os.remove(tmp_path)
|
||||||
|
finally:
|
||||||
|
return _error_response(400, "EMPTY_UPLOAD", "Uploaded file is empty.")
|
||||||
|
|
||||||
|
try:
|
||||||
|
spec = schemas_in.UploadAssetSpec.model_validate({
|
||||||
|
"tags": tags_raw,
|
||||||
|
"name": provided_name,
|
||||||
|
"user_metadata": user_metadata_raw,
|
||||||
|
"hash": provided_hash,
|
||||||
|
})
|
||||||
|
except ValidationError as ve:
|
||||||
|
try:
|
||||||
|
if tmp_path and os.path.exists(tmp_path):
|
||||||
|
os.remove(tmp_path)
|
||||||
|
finally:
|
||||||
|
return _validation_error_response("INVALID_BODY", ve)
|
||||||
|
|
||||||
|
# Validate models category against configured folders (consistent with previous behavior)
|
||||||
|
if spec.tags and spec.tags[0] == "models":
|
||||||
|
if len(spec.tags) < 2 or spec.tags[1] not in folder_paths.folder_names_and_paths:
|
||||||
|
if tmp_path and os.path.exists(tmp_path):
|
||||||
|
os.remove(tmp_path)
|
||||||
|
return _error_response(
|
||||||
|
400, "INVALID_BODY", f"unknown models category '{spec.tags[1] if len(spec.tags) >= 2 else ''}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
owner_id = USER_MANAGER.get_request_user_id(request)
|
||||||
|
|
||||||
|
# Fast path: if a valid provided hash exists, create AssetInfo without writing anything
|
||||||
|
if spec.hash and provided_hash_exists is True:
|
||||||
|
try:
|
||||||
|
result = manager.create_asset_from_hash(
|
||||||
|
hash_str=spec.hash,
|
||||||
|
name=spec.name or (spec.hash.split(":", 1)[1]),
|
||||||
|
tags=spec.tags,
|
||||||
|
user_metadata=spec.user_metadata or {},
|
||||||
|
owner_id=owner_id,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logging.exception("create_asset_from_hash failed for hash=%s, owner_id=%s", spec.hash, owner_id)
|
||||||
|
return _error_response(500, "INTERNAL", "Unexpected server error.")
|
||||||
|
|
||||||
|
if result is None:
|
||||||
|
return _error_response(404, "ASSET_NOT_FOUND", f"Asset content {spec.hash} does not exist")
|
||||||
|
|
||||||
|
# Drain temp if we accidentally saved (e.g., hash field came after file)
|
||||||
|
if tmp_path and os.path.exists(tmp_path):
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
os.remove(tmp_path)
|
||||||
|
|
||||||
|
status = 200 if (not result.created_new) else 201
|
||||||
|
return web.json_response(result.model_dump(mode="json"), status=status)
|
||||||
|
|
||||||
|
# Otherwise, we must have a temp file path to ingest
|
||||||
|
if not tmp_path or not os.path.exists(tmp_path):
|
||||||
|
# The only case we reach here without a temp file is: client sent a hash that does not exist and no file
|
||||||
|
return _error_response(404, "ASSET_NOT_FOUND", "Provided hash not found and no file uploaded.")
|
||||||
|
|
||||||
|
try:
|
||||||
|
created = manager.upload_asset_from_temp_path(
|
||||||
|
spec,
|
||||||
|
temp_path=tmp_path,
|
||||||
|
client_filename=file_client_name,
|
||||||
|
owner_id=owner_id,
|
||||||
|
expected_asset_hash=spec.hash,
|
||||||
|
)
|
||||||
|
status = 201 if created.created_new else 200
|
||||||
|
return web.json_response(created.model_dump(mode="json"), status=status)
|
||||||
|
except ValueError as e:
|
||||||
|
if tmp_path and os.path.exists(tmp_path):
|
||||||
|
os.remove(tmp_path)
|
||||||
|
msg = str(e)
|
||||||
|
if "HASH_MISMATCH" in msg or msg.strip().upper() == "HASH_MISMATCH":
|
||||||
|
return _error_response(
|
||||||
|
400,
|
||||||
|
"HASH_MISMATCH",
|
||||||
|
"Uploaded file hash does not match provided hash.",
|
||||||
|
)
|
||||||
|
return _error_response(400, "BAD_REQUEST", "Invalid inputs.")
|
||||||
|
except Exception:
|
||||||
|
if tmp_path and os.path.exists(tmp_path):
|
||||||
|
os.remove(tmp_path)
|
||||||
|
logging.exception("upload_asset_from_temp_path failed for tmp_path=%s, owner_id=%s", tmp_path, owner_id)
|
||||||
|
return _error_response(500, "INTERNAL", "Unexpected server error.")
|
||||||
|
|
||||||
|
|||||||
@ -8,6 +8,7 @@ from pydantic import (
|
|||||||
Field,
|
Field,
|
||||||
conint,
|
conint,
|
||||||
field_validator,
|
field_validator,
|
||||||
|
model_validator,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -97,7 +98,6 @@ class CreateFromHashBody(BaseModel):
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class TagsListQuery(BaseModel):
|
class TagsListQuery(BaseModel):
|
||||||
model_config = ConfigDict(extra="ignore", str_strip_whitespace=True)
|
model_config = ConfigDict(extra="ignore", str_strip_whitespace=True)
|
||||||
|
|
||||||
@ -116,6 +116,118 @@ class TagsListQuery(BaseModel):
|
|||||||
return v.lower() or None
|
return v.lower() or None
|
||||||
|
|
||||||
|
|
||||||
|
class UploadAssetSpec(BaseModel):
|
||||||
|
"""Upload Asset operation.
|
||||||
|
- tags: ordered; first is root ('models'|'input'|'output');
|
||||||
|
if root == 'models', second must be a valid category from folder_paths.folder_names_and_paths
|
||||||
|
- name: display name
|
||||||
|
- user_metadata: arbitrary JSON object (optional)
|
||||||
|
- hash: optional canonical 'blake3:<hex>' provided by the client for validation / fast-path
|
||||||
|
|
||||||
|
Files created via this endpoint are stored on disk using the **content hash** as the filename stem
|
||||||
|
and the original extension is preserved when available.
|
||||||
|
"""
|
||||||
|
model_config = ConfigDict(extra="ignore", str_strip_whitespace=True)
|
||||||
|
|
||||||
|
tags: list[str] = Field(..., min_length=1)
|
||||||
|
name: str | None = Field(default=None, max_length=512, description="Display Name")
|
||||||
|
user_metadata: dict[str, Any] = Field(default_factory=dict)
|
||||||
|
hash: str | None = Field(default=None)
|
||||||
|
|
||||||
|
@field_validator("hash", mode="before")
|
||||||
|
@classmethod
|
||||||
|
def _parse_hash(cls, v):
|
||||||
|
if v is None:
|
||||||
|
return None
|
||||||
|
s = str(v).strip().lower()
|
||||||
|
if not s:
|
||||||
|
return None
|
||||||
|
if ":" not in s:
|
||||||
|
raise ValueError("hash must be 'blake3:<hex>'")
|
||||||
|
algo, digest = s.split(":", 1)
|
||||||
|
if algo != "blake3":
|
||||||
|
raise ValueError("only canonical 'blake3:<hex>' is accepted here")
|
||||||
|
if not digest or any(c for c in digest if c not in "0123456789abcdef"):
|
||||||
|
raise ValueError("hash digest must be lowercase hex")
|
||||||
|
return f"{algo}:{digest}"
|
||||||
|
|
||||||
|
@field_validator("tags", mode="before")
|
||||||
|
@classmethod
|
||||||
|
def _parse_tags(cls, v):
|
||||||
|
"""
|
||||||
|
Accepts a list of strings (possibly multiple form fields),
|
||||||
|
where each string can be:
|
||||||
|
- JSON array (e.g., '["models","loras","foo"]')
|
||||||
|
- comma-separated ('models, loras, foo')
|
||||||
|
- single token ('models')
|
||||||
|
Returns a normalized, deduplicated, ordered list.
|
||||||
|
"""
|
||||||
|
items: list[str] = []
|
||||||
|
if v is None:
|
||||||
|
return []
|
||||||
|
if isinstance(v, str):
|
||||||
|
v = [v]
|
||||||
|
|
||||||
|
if isinstance(v, list):
|
||||||
|
for item in v:
|
||||||
|
if item is None:
|
||||||
|
continue
|
||||||
|
s = str(item).strip()
|
||||||
|
if not s:
|
||||||
|
continue
|
||||||
|
if s.startswith("["):
|
||||||
|
try:
|
||||||
|
arr = json.loads(s)
|
||||||
|
if isinstance(arr, list):
|
||||||
|
items.extend(str(x) for x in arr)
|
||||||
|
continue
|
||||||
|
except Exception:
|
||||||
|
pass # fallback to CSV parse below
|
||||||
|
items.extend([p for p in s.split(",") if p.strip()])
|
||||||
|
else:
|
||||||
|
return []
|
||||||
|
|
||||||
|
# normalize + dedupe
|
||||||
|
norm = []
|
||||||
|
seen = set()
|
||||||
|
for t in items:
|
||||||
|
tnorm = str(t).strip().lower()
|
||||||
|
if tnorm and tnorm not in seen:
|
||||||
|
seen.add(tnorm)
|
||||||
|
norm.append(tnorm)
|
||||||
|
return norm
|
||||||
|
|
||||||
|
@field_validator("user_metadata", mode="before")
|
||||||
|
@classmethod
|
||||||
|
def _parse_metadata_json(cls, v):
|
||||||
|
if v is None or isinstance(v, dict):
|
||||||
|
return v or {}
|
||||||
|
if isinstance(v, str):
|
||||||
|
s = v.strip()
|
||||||
|
if not s:
|
||||||
|
return {}
|
||||||
|
try:
|
||||||
|
parsed = json.loads(s)
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(f"user_metadata must be JSON: {e}") from e
|
||||||
|
if not isinstance(parsed, dict):
|
||||||
|
raise ValueError("user_metadata must be a JSON object")
|
||||||
|
return parsed
|
||||||
|
return {}
|
||||||
|
|
||||||
|
@model_validator(mode="after")
|
||||||
|
def _validate_order(self):
|
||||||
|
if not self.tags:
|
||||||
|
raise ValueError("tags must be provided and non-empty")
|
||||||
|
root = self.tags[0]
|
||||||
|
if root not in {"models", "input", "output"}:
|
||||||
|
raise ValueError("first tag must be one of: models, input, output")
|
||||||
|
if root == "models":
|
||||||
|
if len(self.tags) < 2:
|
||||||
|
raise ValueError("models uploads require a category tag as the second tag")
|
||||||
|
return self
|
||||||
|
|
||||||
|
|
||||||
class SetPreviewBody(BaseModel):
|
class SetPreviewBody(BaseModel):
|
||||||
"""Set or clear the preview for an AssetInfo. Provide an Asset.id or null."""
|
"""Set or clear the preview for an AssetInfo. Provide an Asset.id or null."""
|
||||||
preview_id: str | None = None
|
preview_id: str | None = None
|
||||||
|
|||||||
@ -1,8 +1,9 @@
|
|||||||
import os
|
import os
|
||||||
|
import logging
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Iterable
|
from typing import Iterable, Any
|
||||||
from sqlalchemy import select, delete, exists, func
|
from sqlalchemy import select, delete, exists, func
|
||||||
from sqlalchemy.dialects import sqlite
|
from sqlalchemy.dialects import sqlite
|
||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
@ -441,6 +442,216 @@ def replace_asset_info_metadata_projection(
|
|||||||
session.flush()
|
session.flush()
|
||||||
|
|
||||||
|
|
||||||
|
def ingest_fs_asset(
|
||||||
|
session: Session,
|
||||||
|
*,
|
||||||
|
asset_hash: str,
|
||||||
|
abs_path: str,
|
||||||
|
size_bytes: int,
|
||||||
|
mtime_ns: int,
|
||||||
|
mime_type: str | None = None,
|
||||||
|
info_name: str | None = None,
|
||||||
|
owner_id: str = "",
|
||||||
|
preview_id: str | None = None,
|
||||||
|
user_metadata: dict | None = None,
|
||||||
|
tags: Sequence[str] = (),
|
||||||
|
tag_origin: str = "manual",
|
||||||
|
require_existing_tags: bool = False,
|
||||||
|
) -> dict:
|
||||||
|
"""
|
||||||
|
Idempotently upsert:
|
||||||
|
- Asset by content hash (create if missing)
|
||||||
|
- AssetCacheState(file_path) pointing to asset_id
|
||||||
|
- Optionally AssetInfo + tag links and metadata projection
|
||||||
|
Returns flags and ids.
|
||||||
|
"""
|
||||||
|
locator = os.path.abspath(abs_path)
|
||||||
|
now = utcnow()
|
||||||
|
|
||||||
|
if preview_id:
|
||||||
|
if not session.get(Asset, preview_id):
|
||||||
|
preview_id = None
|
||||||
|
|
||||||
|
out: dict[str, Any] = {
|
||||||
|
"asset_created": False,
|
||||||
|
"asset_updated": False,
|
||||||
|
"state_created": False,
|
||||||
|
"state_updated": False,
|
||||||
|
"asset_info_id": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
# 1) Asset by hash
|
||||||
|
asset = (
|
||||||
|
session.execute(select(Asset).where(Asset.hash == asset_hash).limit(1))
|
||||||
|
).scalars().first()
|
||||||
|
if not asset:
|
||||||
|
vals = {
|
||||||
|
"hash": asset_hash,
|
||||||
|
"size_bytes": int(size_bytes),
|
||||||
|
"mime_type": mime_type,
|
||||||
|
"created_at": now,
|
||||||
|
}
|
||||||
|
res = session.execute(
|
||||||
|
sqlite.insert(Asset)
|
||||||
|
.values(**vals)
|
||||||
|
.on_conflict_do_nothing(index_elements=[Asset.hash])
|
||||||
|
)
|
||||||
|
if int(res.rowcount or 0) > 0:
|
||||||
|
out["asset_created"] = True
|
||||||
|
asset = (
|
||||||
|
session.execute(
|
||||||
|
select(Asset).where(Asset.hash == asset_hash).limit(1)
|
||||||
|
)
|
||||||
|
).scalars().first()
|
||||||
|
if not asset:
|
||||||
|
raise RuntimeError("Asset row not found after upsert.")
|
||||||
|
else:
|
||||||
|
changed = False
|
||||||
|
if asset.size_bytes != int(size_bytes) and int(size_bytes) > 0:
|
||||||
|
asset.size_bytes = int(size_bytes)
|
||||||
|
changed = True
|
||||||
|
if mime_type and asset.mime_type != mime_type:
|
||||||
|
asset.mime_type = mime_type
|
||||||
|
changed = True
|
||||||
|
if changed:
|
||||||
|
out["asset_updated"] = True
|
||||||
|
|
||||||
|
# 2) AssetCacheState upsert by file_path (unique)
|
||||||
|
vals = {
|
||||||
|
"asset_id": asset.id,
|
||||||
|
"file_path": locator,
|
||||||
|
"mtime_ns": int(mtime_ns),
|
||||||
|
}
|
||||||
|
ins = (
|
||||||
|
sqlite.insert(AssetCacheState)
|
||||||
|
.values(**vals)
|
||||||
|
.on_conflict_do_nothing(index_elements=[AssetCacheState.file_path])
|
||||||
|
)
|
||||||
|
|
||||||
|
res = session.execute(ins)
|
||||||
|
if int(res.rowcount or 0) > 0:
|
||||||
|
out["state_created"] = True
|
||||||
|
else:
|
||||||
|
upd = (
|
||||||
|
sa.update(AssetCacheState)
|
||||||
|
.where(AssetCacheState.file_path == locator)
|
||||||
|
.where(
|
||||||
|
sa.or_(
|
||||||
|
AssetCacheState.asset_id != asset.id,
|
||||||
|
AssetCacheState.mtime_ns.is_(None),
|
||||||
|
AssetCacheState.mtime_ns != int(mtime_ns),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.values(asset_id=asset.id, mtime_ns=int(mtime_ns))
|
||||||
|
)
|
||||||
|
res2 = session.execute(upd)
|
||||||
|
if int(res2.rowcount or 0) > 0:
|
||||||
|
out["state_updated"] = True
|
||||||
|
|
||||||
|
# 3) Optional AssetInfo + tags + metadata
|
||||||
|
if info_name:
|
||||||
|
try:
|
||||||
|
with session.begin_nested():
|
||||||
|
info = AssetInfo(
|
||||||
|
owner_id=owner_id,
|
||||||
|
name=info_name,
|
||||||
|
asset_id=asset.id,
|
||||||
|
preview_id=preview_id,
|
||||||
|
created_at=now,
|
||||||
|
updated_at=now,
|
||||||
|
last_access_time=now,
|
||||||
|
)
|
||||||
|
session.add(info)
|
||||||
|
session.flush()
|
||||||
|
out["asset_info_id"] = info.id
|
||||||
|
except IntegrityError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
existing_info = (
|
||||||
|
session.execute(
|
||||||
|
select(AssetInfo)
|
||||||
|
.where(
|
||||||
|
AssetInfo.asset_id == asset.id,
|
||||||
|
AssetInfo.name == info_name,
|
||||||
|
(AssetInfo.owner_id == owner_id),
|
||||||
|
)
|
||||||
|
.limit(1)
|
||||||
|
)
|
||||||
|
).unique().scalar_one_or_none()
|
||||||
|
if not existing_info:
|
||||||
|
raise RuntimeError("Failed to update or insert AssetInfo.")
|
||||||
|
|
||||||
|
if preview_id and existing_info.preview_id != preview_id:
|
||||||
|
existing_info.preview_id = preview_id
|
||||||
|
|
||||||
|
existing_info.updated_at = now
|
||||||
|
if existing_info.last_access_time < now:
|
||||||
|
existing_info.last_access_time = now
|
||||||
|
session.flush()
|
||||||
|
out["asset_info_id"] = existing_info.id
|
||||||
|
|
||||||
|
norm = [t.strip().lower() for t in (tags or []) if (t or "").strip()]
|
||||||
|
if norm and out["asset_info_id"] is not None:
|
||||||
|
if not require_existing_tags:
|
||||||
|
ensure_tags_exist(session, norm, tag_type="user")
|
||||||
|
|
||||||
|
existing_tag_names = set(
|
||||||
|
name for (name,) in (session.execute(select(Tag.name).where(Tag.name.in_(norm)))).all()
|
||||||
|
)
|
||||||
|
missing = [t for t in norm if t not in existing_tag_names]
|
||||||
|
if missing and require_existing_tags:
|
||||||
|
raise ValueError(f"Unknown tags: {missing}")
|
||||||
|
|
||||||
|
existing_links = set(
|
||||||
|
tag_name
|
||||||
|
for (tag_name,) in (
|
||||||
|
session.execute(
|
||||||
|
select(AssetInfoTag.tag_name).where(AssetInfoTag.asset_info_id == out["asset_info_id"])
|
||||||
|
)
|
||||||
|
).all()
|
||||||
|
)
|
||||||
|
to_add = [t for t in norm if t in existing_tag_names and t not in existing_links]
|
||||||
|
if to_add:
|
||||||
|
session.add_all(
|
||||||
|
[
|
||||||
|
AssetInfoTag(
|
||||||
|
asset_info_id=out["asset_info_id"],
|
||||||
|
tag_name=t,
|
||||||
|
origin=tag_origin,
|
||||||
|
added_at=now,
|
||||||
|
)
|
||||||
|
for t in to_add
|
||||||
|
]
|
||||||
|
)
|
||||||
|
session.flush()
|
||||||
|
|
||||||
|
# metadata["filename"] hack
|
||||||
|
if out["asset_info_id"] is not None:
|
||||||
|
primary_path = pick_best_live_path(list_cache_states_by_asset_id(session, asset_id=asset.id))
|
||||||
|
computed_filename = compute_relative_filename(primary_path) if primary_path else None
|
||||||
|
|
||||||
|
current_meta = existing_info.user_metadata or {}
|
||||||
|
new_meta = dict(current_meta)
|
||||||
|
if user_metadata is not None:
|
||||||
|
for k, v in user_metadata.items():
|
||||||
|
new_meta[k] = v
|
||||||
|
if computed_filename:
|
||||||
|
new_meta["filename"] = computed_filename
|
||||||
|
|
||||||
|
if new_meta != current_meta:
|
||||||
|
replace_asset_info_metadata_projection(
|
||||||
|
session,
|
||||||
|
asset_info_id=out["asset_info_id"],
|
||||||
|
user_metadata=new_meta,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
remove_missing_tag_for_asset_id(session, asset_id=asset.id)
|
||||||
|
except Exception:
|
||||||
|
logging.exception("Failed to clear 'missing' tag for asset %s", asset.id)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
def list_tags_with_usage(
|
def list_tags_with_usage(
|
||||||
session: Session,
|
session: Session,
|
||||||
prefix: str | None = None,
|
prefix: str | None = None,
|
||||||
@ -521,3 +732,16 @@ def get_asset_tags(session: Session, *, asset_info_id: str) -> list[str]:
|
|||||||
)
|
)
|
||||||
).all()
|
).all()
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def remove_missing_tag_for_asset_id(
|
||||||
|
session: Session,
|
||||||
|
*,
|
||||||
|
asset_id: str,
|
||||||
|
) -> None:
|
||||||
|
session.execute(
|
||||||
|
sa.delete(AssetInfoTag).where(
|
||||||
|
AssetInfoTag.asset_info_id.in_(sa.select(AssetInfo.id).where(AssetInfo.asset_id == asset_id)),
|
||||||
|
AssetInfoTag.tag_name == "missing",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|||||||
@ -88,6 +88,40 @@ def get_comfy_models_folders() -> list[tuple[str, list[str]]]:
|
|||||||
targets.append((name, paths))
|
targets.append((name, paths))
|
||||||
return targets
|
return targets
|
||||||
|
|
||||||
|
def resolve_destination_from_tags(tags: list[str]) -> tuple[str, list[str]]:
|
||||||
|
"""Validates and maps tags -> (base_dir, subdirs_for_fs)"""
|
||||||
|
root = tags[0]
|
||||||
|
if root == "models":
|
||||||
|
if len(tags) < 2:
|
||||||
|
raise ValueError("at least two tags required for model asset")
|
||||||
|
try:
|
||||||
|
bases = folder_paths.folder_names_and_paths[tags[1]][0]
|
||||||
|
except KeyError:
|
||||||
|
raise ValueError(f"unknown model category '{tags[1]}'")
|
||||||
|
if not bases:
|
||||||
|
raise ValueError(f"no base path configured for category '{tags[1]}'")
|
||||||
|
base_dir = os.path.abspath(bases[0])
|
||||||
|
raw_subdirs = tags[2:]
|
||||||
|
else:
|
||||||
|
base_dir = os.path.abspath(
|
||||||
|
folder_paths.get_input_directory() if root == "input" else folder_paths.get_output_directory()
|
||||||
|
)
|
||||||
|
raw_subdirs = tags[1:]
|
||||||
|
for i in raw_subdirs:
|
||||||
|
if i in (".", ".."):
|
||||||
|
raise ValueError("invalid path component in tags")
|
||||||
|
|
||||||
|
return base_dir, raw_subdirs if raw_subdirs else []
|
||||||
|
|
||||||
|
def ensure_within_base(candidate: str, base: str) -> None:
|
||||||
|
cand_abs = os.path.abspath(candidate)
|
||||||
|
base_abs = os.path.abspath(base)
|
||||||
|
try:
|
||||||
|
if os.path.commonpath([cand_abs, base_abs]) != base_abs:
|
||||||
|
raise ValueError("destination escapes base directory")
|
||||||
|
except Exception:
|
||||||
|
raise ValueError("invalid destination path")
|
||||||
|
|
||||||
def compute_relative_filename(file_path: str) -> str | None:
|
def compute_relative_filename(file_path: str) -> str | None:
|
||||||
"""
|
"""
|
||||||
Return the model's path relative to the last well-known folder (the model category),
|
Return the model's path relative to the last well-known folder (the model category),
|
||||||
@ -114,7 +148,6 @@ def compute_relative_filename(file_path: str) -> str | None:
|
|||||||
return "/".join(inside)
|
return "/".join(inside)
|
||||||
return "/".join(parts) # input/output: keep all parts
|
return "/".join(parts) # input/output: keep all parts
|
||||||
|
|
||||||
|
|
||||||
def get_relative_to_root_category_path_of_asset(file_path: str) -> tuple[Literal["input", "output", "models"], str]:
|
def get_relative_to_root_category_path_of_asset(file_path: str) -> tuple[Literal["input", "output", "models"], str]:
|
||||||
"""Given an absolute or relative file path, determine which root category the path belongs to:
|
"""Given an absolute or relative file path, determine which root category the path belongs to:
|
||||||
- 'input' if the file resides under `folder_paths.get_input_directory()`
|
- 'input' if the file resides under `folder_paths.get_input_directory()`
|
||||||
|
|||||||
@ -1,9 +1,10 @@
|
|||||||
import os
|
import os
|
||||||
import mimetypes
|
import mimetypes
|
||||||
|
import contextlib
|
||||||
from typing import Sequence
|
from typing import Sequence
|
||||||
|
|
||||||
from app.database.db import create_session
|
from app.database.db import create_session
|
||||||
from app.assets.api import schemas_out
|
from app.assets.api import schemas_out, schemas_in
|
||||||
from app.assets.database.queries import (
|
from app.assets.database.queries import (
|
||||||
asset_exists_by_hash,
|
asset_exists_by_hash,
|
||||||
get_asset_by_hash,
|
get_asset_by_hash,
|
||||||
@ -16,7 +17,11 @@ from app.assets.database.queries import (
|
|||||||
list_tags_with_usage,
|
list_tags_with_usage,
|
||||||
get_asset_tags,
|
get_asset_tags,
|
||||||
pick_best_live_path,
|
pick_best_live_path,
|
||||||
|
ingest_fs_asset,
|
||||||
)
|
)
|
||||||
|
from app.assets.helpers import resolve_destination_from_tags, ensure_within_base
|
||||||
|
|
||||||
|
import app.assets.hashing as hashing
|
||||||
|
|
||||||
|
|
||||||
def _safe_sort_field(requested: str | None) -> str:
|
def _safe_sort_field(requested: str | None) -> str:
|
||||||
@ -28,6 +33,11 @@ def _safe_sort_field(requested: str | None) -> str:
|
|||||||
return "created_at"
|
return "created_at"
|
||||||
|
|
||||||
|
|
||||||
|
def _get_size_mtime_ns(path: str) -> tuple[int, int]:
|
||||||
|
st = os.stat(path, follow_symlinks=True)
|
||||||
|
return st.st_size, getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000))
|
||||||
|
|
||||||
|
|
||||||
def _safe_filename(name: str | None, fallback: str) -> str:
|
def _safe_filename(name: str | None, fallback: str) -> str:
|
||||||
n = os.path.basename((name or "").strip() or fallback)
|
n = os.path.basename((name or "").strip() or fallback)
|
||||||
if n:
|
if n:
|
||||||
@ -42,6 +52,7 @@ def asset_exists(*, asset_hash: str) -> bool:
|
|||||||
with create_session() as session:
|
with create_session() as session:
|
||||||
return asset_exists_by_hash(session, asset_hash=asset_hash)
|
return asset_exists_by_hash(session, asset_hash=asset_hash)
|
||||||
|
|
||||||
|
|
||||||
def list_assets(
|
def list_assets(
|
||||||
*,
|
*,
|
||||||
include_tags: Sequence[str] | None = None,
|
include_tags: Sequence[str] | None = None,
|
||||||
@ -147,6 +158,126 @@ def resolve_asset_content_for_download(
|
|||||||
return abs_path, ctype, download_name
|
return abs_path, ctype, download_name
|
||||||
|
|
||||||
|
|
||||||
|
def upload_asset_from_temp_path(
|
||||||
|
spec: schemas_in.UploadAssetSpec,
|
||||||
|
*,
|
||||||
|
temp_path: str,
|
||||||
|
client_filename: str | None = None,
|
||||||
|
owner_id: str = "",
|
||||||
|
expected_asset_hash: str | None = None,
|
||||||
|
) -> schemas_out.AssetCreated:
|
||||||
|
try:
|
||||||
|
digest = hashing.blake3_hash(temp_path)
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(f"failed to hash uploaded file: {e}")
|
||||||
|
asset_hash = "blake3:" + digest
|
||||||
|
|
||||||
|
if expected_asset_hash and asset_hash != expected_asset_hash.strip().lower():
|
||||||
|
raise ValueError("HASH_MISMATCH")
|
||||||
|
|
||||||
|
with create_session() as session:
|
||||||
|
existing = get_asset_by_hash(session, asset_hash=asset_hash)
|
||||||
|
if existing is not None:
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
if temp_path and os.path.exists(temp_path):
|
||||||
|
os.remove(temp_path)
|
||||||
|
|
||||||
|
display_name = _safe_filename(spec.name or (client_filename or ""), fallback=digest)
|
||||||
|
info = create_asset_info_for_existing_asset(
|
||||||
|
session,
|
||||||
|
asset_hash=asset_hash,
|
||||||
|
name=display_name,
|
||||||
|
user_metadata=spec.user_metadata or {},
|
||||||
|
tags=spec.tags or [],
|
||||||
|
tag_origin="manual",
|
||||||
|
owner_id=owner_id,
|
||||||
|
)
|
||||||
|
tag_names = get_asset_tags(session, asset_info_id=info.id)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
return schemas_out.AssetCreated(
|
||||||
|
id=info.id,
|
||||||
|
name=info.name,
|
||||||
|
asset_hash=existing.hash,
|
||||||
|
size=int(existing.size_bytes) if existing.size_bytes is not None else None,
|
||||||
|
mime_type=existing.mime_type,
|
||||||
|
tags=tag_names,
|
||||||
|
user_metadata=info.user_metadata or {},
|
||||||
|
preview_id=info.preview_id,
|
||||||
|
created_at=info.created_at,
|
||||||
|
last_access_time=info.last_access_time,
|
||||||
|
created_new=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
base_dir, subdirs = resolve_destination_from_tags(spec.tags)
|
||||||
|
dest_dir = os.path.join(base_dir, *subdirs) if subdirs else base_dir
|
||||||
|
os.makedirs(dest_dir, exist_ok=True)
|
||||||
|
|
||||||
|
src_for_ext = (client_filename or spec.name or "").strip()
|
||||||
|
_ext = os.path.splitext(os.path.basename(src_for_ext))[1] if src_for_ext else ""
|
||||||
|
ext = _ext if 0 < len(_ext) <= 16 else ""
|
||||||
|
hashed_basename = f"{digest}{ext}"
|
||||||
|
dest_abs = os.path.abspath(os.path.join(dest_dir, hashed_basename))
|
||||||
|
ensure_within_base(dest_abs, base_dir)
|
||||||
|
|
||||||
|
content_type = (
|
||||||
|
mimetypes.guess_type(os.path.basename(src_for_ext), strict=False)[0]
|
||||||
|
or mimetypes.guess_type(hashed_basename, strict=False)[0]
|
||||||
|
or "application/octet-stream"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
os.replace(temp_path, dest_abs)
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(f"failed to move uploaded file into place: {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
size_bytes, mtime_ns = _get_size_mtime_ns(dest_abs)
|
||||||
|
except OSError as e:
|
||||||
|
raise RuntimeError(f"failed to stat destination file: {e}")
|
||||||
|
|
||||||
|
with create_session() as session:
|
||||||
|
result = ingest_fs_asset(
|
||||||
|
session,
|
||||||
|
asset_hash=asset_hash,
|
||||||
|
abs_path=dest_abs,
|
||||||
|
size_bytes=size_bytes,
|
||||||
|
mtime_ns=mtime_ns,
|
||||||
|
mime_type=content_type,
|
||||||
|
info_name=_safe_filename(spec.name or (client_filename or ""), fallback=digest),
|
||||||
|
owner_id=owner_id,
|
||||||
|
preview_id=None,
|
||||||
|
user_metadata=spec.user_metadata or {},
|
||||||
|
tags=spec.tags,
|
||||||
|
tag_origin="manual",
|
||||||
|
require_existing_tags=False,
|
||||||
|
)
|
||||||
|
info_id = result["asset_info_id"]
|
||||||
|
if not info_id:
|
||||||
|
raise RuntimeError("failed to create asset metadata")
|
||||||
|
|
||||||
|
pair = fetch_asset_info_and_asset(session, asset_info_id=info_id, owner_id=owner_id)
|
||||||
|
if not pair:
|
||||||
|
raise RuntimeError("inconsistent DB state after ingest")
|
||||||
|
info, asset = pair
|
||||||
|
tag_names = get_asset_tags(session, asset_info_id=info.id)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
return schemas_out.AssetCreated(
|
||||||
|
id=info.id,
|
||||||
|
name=info.name,
|
||||||
|
asset_hash=asset.hash,
|
||||||
|
size=int(asset.size_bytes),
|
||||||
|
mime_type=asset.mime_type,
|
||||||
|
tags=tag_names,
|
||||||
|
user_metadata=info.user_metadata or {},
|
||||||
|
preview_id=info.preview_id,
|
||||||
|
created_at=info.created_at,
|
||||||
|
last_access_time=info.last_access_time,
|
||||||
|
created_new=result["asset_created"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def create_asset_from_hash(
|
def create_asset_from_hash(
|
||||||
*,
|
*,
|
||||||
hash_str: str,
|
hash_str: str,
|
||||||
|
|||||||
@ -22,6 +22,7 @@ alembic
|
|||||||
SQLAlchemy
|
SQLAlchemy
|
||||||
av>=14.2.0
|
av>=14.2.0
|
||||||
comfy-kitchen>=0.2.6
|
comfy-kitchen>=0.2.6
|
||||||
|
blake3
|
||||||
|
|
||||||
#non essential dependencies:
|
#non essential dependencies:
|
||||||
kornia>=0.7.1
|
kornia>=0.7.1
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user