Added tests, rewritten from the ones present in the asset-management branch

This commit is contained in:
Jedrzej Kosinski 2026-01-29 16:56:39 -08:00
parent 2f0db0e680
commit 6840ad0bbe
8 changed files with 2338 additions and 0 deletions

271
tests-assets/conftest.py Normal file
View File

@ -0,0 +1,271 @@
import contextlib
import json
import os
import socket
import subprocess
import sys
import tempfile
import time
from pathlib import Path
from typing import Callable, Iterator, Optional
import pytest
import requests
def pytest_addoption(parser: pytest.Parser) -> None:
"""
Allow overriding the database URL used by the spawned ComfyUI process.
Priority:
1) --db-url command line option
2) ASSETS_TEST_DB_URL environment variable (used by CI)
3) default: None (will use file-backed sqlite in temp dir)
"""
parser.addoption(
"--db-url",
action="store",
default=os.environ.get("ASSETS_TEST_DB_URL"),
help="SQLAlchemy DB URL (e.g. sqlite:///path/to/db.sqlite3)",
)
def _free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
def _make_base_dirs(root: Path) -> None:
for sub in ("models", "custom_nodes", "input", "output", "temp", "user"):
(root / sub).mkdir(parents=True, exist_ok=True)
def _wait_http_ready(base: str, session: requests.Session, timeout: float = 90.0) -> None:
start = time.time()
last_err = None
while time.time() - start < timeout:
try:
r = session.get(base + "/api/assets", timeout=5)
if r.status_code in (200, 400):
return
except Exception as e:
last_err = e
time.sleep(0.25)
raise RuntimeError(f"ComfyUI HTTP did not become ready: {last_err}")
@pytest.fixture(scope="session")
def comfy_tmp_base_dir() -> Path:
env_base = os.environ.get("ASSETS_TEST_BASE_DIR")
created_by_fixture = False
if env_base:
tmp = Path(env_base)
tmp.mkdir(parents=True, exist_ok=True)
else:
tmp = Path(tempfile.mkdtemp(prefix="comfyui-assets-tests-"))
created_by_fixture = True
_make_base_dirs(tmp)
yield tmp
if created_by_fixture:
with contextlib.suppress(Exception):
for p in sorted(tmp.rglob("*"), reverse=True):
if p.is_file() or p.is_symlink():
p.unlink(missing_ok=True)
for p in sorted(tmp.glob("**/*"), reverse=True):
with contextlib.suppress(Exception):
p.rmdir()
tmp.rmdir()
@pytest.fixture(scope="session")
def comfy_url_and_proc(comfy_tmp_base_dir: Path, request: pytest.FixtureRequest):
"""
Boot ComfyUI subprocess with:
- sandbox base dir
- file-backed sqlite DB in temp dir
- autoscan disabled
Returns (base_url, process, port)
"""
port = _free_port()
db_url = request.config.getoption("--db-url")
if not db_url:
# Use a file-backed sqlite database in the temp directory
db_path = comfy_tmp_base_dir / "assets-test.sqlite3"
db_url = f"sqlite:///{db_path}"
logs_dir = comfy_tmp_base_dir / "logs"
logs_dir.mkdir(exist_ok=True)
out_log = open(logs_dir / "stdout.log", "w", buffering=1)
err_log = open(logs_dir / "stderr.log", "w", buffering=1)
comfy_root = Path(__file__).resolve().parent.parent
if not (comfy_root / "main.py").is_file():
raise FileNotFoundError(f"main.py not found under {comfy_root}")
proc = subprocess.Popen(
args=[
sys.executable,
"main.py",
f"--base-directory={str(comfy_tmp_base_dir)}",
f"--database-url={db_url}",
"--disable-assets-autoscan",
"--listen",
"127.0.0.1",
"--port",
str(port),
"--cpu",
],
stdout=out_log,
stderr=err_log,
cwd=str(comfy_root),
env={**os.environ},
)
for _ in range(50):
if proc.poll() is not None:
out_log.flush()
err_log.flush()
raise RuntimeError(f"ComfyUI exited early with code {proc.returncode}")
time.sleep(0.1)
base_url = f"http://127.0.0.1:{port}"
try:
with requests.Session() as s:
_wait_http_ready(base_url, s, timeout=90.0)
yield base_url, proc, port
except Exception as e:
with contextlib.suppress(Exception):
proc.terminate()
proc.wait(timeout=10)
with contextlib.suppress(Exception):
out_log.flush()
err_log.flush()
raise RuntimeError(f"ComfyUI did not become ready: {e}")
if proc and proc.poll() is None:
with contextlib.suppress(Exception):
proc.terminate()
proc.wait(timeout=15)
out_log.close()
err_log.close()
@pytest.fixture
def http() -> Iterator[requests.Session]:
with requests.Session() as s:
s.timeout = 120
yield s
@pytest.fixture
def api_base(comfy_url_and_proc) -> str:
base_url, _proc, _port = comfy_url_and_proc
return base_url
def _post_multipart_asset(
session: requests.Session,
base: str,
*,
name: str,
tags: list[str],
meta: dict,
data: bytes,
extra_fields: Optional[dict] = None,
) -> tuple[int, dict]:
files = {"file": (name, data, "application/octet-stream")}
form_data = {
"tags": json.dumps(tags),
"name": name,
"user_metadata": json.dumps(meta),
}
if extra_fields:
for k, v in extra_fields.items():
form_data[k] = v
r = session.post(base + "/api/assets", files=files, data=form_data, timeout=120)
return r.status_code, r.json()
@pytest.fixture
def make_asset_bytes() -> Callable[[str, int], bytes]:
def _make(name: str, size: int = 8192) -> bytes:
seed = sum(ord(c) for c in name) % 251
return bytes((i * 31 + seed) % 256 for i in range(size))
return _make
@pytest.fixture
def asset_factory(http: requests.Session, api_base: str):
"""
Returns create(name, tags, meta, data) -> response dict
Tracks created ids and deletes them after the test.
"""
created: list[str] = []
def create(name: str, tags: list[str], meta: dict, data: bytes) -> dict:
status, body = _post_multipart_asset(http, api_base, name=name, tags=tags, meta=meta, data=data)
assert status in (200, 201), body
created.append(body["id"])
return body
yield create
for aid in created:
with contextlib.suppress(Exception):
http.delete(f"{api_base}/api/assets/{aid}", timeout=30)
@pytest.fixture
def seeded_asset(request: pytest.FixtureRequest, http: requests.Session, api_base: str) -> dict:
"""
Upload one asset with ".safetensors" extension into models/checkpoints/unit-tests/<name>.
Returns response dict with id, asset_hash, tags, etc.
"""
name = "unit_1_example.safetensors"
p = getattr(request, "param", {}) or {}
tags: Optional[list[str]] = p.get("tags")
if tags is None:
tags = ["models", "checkpoints", "unit-tests", "alpha"]
meta = {"purpose": "test", "epoch": 1, "flags": ["x", "y"], "nullable": None}
files = {"file": (name, b"A" * 4096, "application/octet-stream")}
form_data = {
"tags": json.dumps(tags),
"name": name,
"user_metadata": json.dumps(meta),
}
r = http.post(api_base + "/api/assets", files=files, data=form_data, timeout=120)
body = r.json()
assert r.status_code == 201, body
return body
@pytest.fixture(autouse=True)
def autoclean_unit_test_assets(http: requests.Session, api_base: str):
"""Ensure isolation by removing all AssetInfo rows tagged with 'unit-tests' after each test."""
yield
while True:
r = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests", "limit": "500", "sort": "name"},
timeout=30,
)
if r.status_code != 200:
break
body = r.json()
ids = [a["id"] for a in body.get("assets", [])]
if not ids:
break
for aid in ids:
with contextlib.suppress(Exception):
http.delete(f"{api_base}/api/assets/{aid}", timeout=30)
def trigger_sync_seed_assets(session: requests.Session, base_url: str) -> None:
"""Force a fast sync/seed pass by calling the seed endpoint."""
session.post(base_url + "/api/assets/seed", json={"roots": ["models", "input", "output"]}, timeout=30)
time.sleep(0.2)
def get_asset_filename(asset_hash: str, extension: str) -> str:
return asset_hash.removeprefix("blake3:") + extension

View File

@ -0,0 +1,348 @@
import os
import uuid
from pathlib import Path
import pytest
import requests
from conftest import get_asset_filename, trigger_sync_seed_assets
@pytest.mark.parametrize("root", ["input", "output"])
def test_seed_asset_removed_when_file_is_deleted(
root: str,
http: requests.Session,
api_base: str,
comfy_tmp_base_dir: Path,
):
"""Asset without hash (seed) whose file disappears:
after triggering sync_seed_assets, Asset + AssetInfo disappear.
"""
# Create a file directly under input/unit-tests/<case> so tags include "unit-tests"
case_dir = comfy_tmp_base_dir / root / "unit-tests" / "syncseed"
case_dir.mkdir(parents=True, exist_ok=True)
name = f"seed_{uuid.uuid4().hex[:8]}.bin"
fp = case_dir / name
fp.write_bytes(b"Z" * 2048)
# Trigger a seed sync so DB sees this path (seed asset => hash is NULL)
trigger_sync_seed_assets(http, api_base)
# Verify it is visible via API and carries no hash (seed)
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,syncseed", "name_contains": name},
timeout=120,
)
body1 = r1.json()
assert r1.status_code == 200
# there should be exactly one with that name
matches = [a for a in body1.get("assets", []) if a.get("name") == name]
assert matches
assert matches[0].get("asset_hash") is None
asset_info_id = matches[0]["id"]
# Remove the underlying file and sync again
if fp.exists():
fp.unlink()
trigger_sync_seed_assets(http, api_base)
# It should disappear (AssetInfo and seed Asset gone)
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,syncseed", "name_contains": name},
timeout=120,
)
body2 = r2.json()
assert r2.status_code == 200
matches2 = [a for a in body2.get("assets", []) if a.get("name") == name]
assert not matches2, f"Seed asset {asset_info_id} should be gone after sync"
@pytest.mark.skip(reason="Requires computing hashes of files in directories to verify and clear missing tags")
def test_hashed_asset_missing_tag_added_then_removed_after_scan(
http: requests.Session,
api_base: str,
comfy_tmp_base_dir: Path,
asset_factory,
make_asset_bytes,
):
"""Hashed asset with a single cache_state:
1. delete its file -> sync adds 'missing'
2. restore file -> sync removes 'missing'
"""
name = "missing_tag_test.png"
tags = ["input", "unit-tests", "msync2"]
data = make_asset_bytes(name, 4096)
a = asset_factory(name, tags, {}, data)
# Compute its on-disk path and remove it
dest = comfy_tmp_base_dir / "input" / "unit-tests" / "msync2" / get_asset_filename(a["asset_hash"], ".png")
assert dest.exists(), f"Expected asset file at {dest}"
dest.unlink()
# Fast sync should add 'missing' to the AssetInfo
trigger_sync_seed_assets(http, api_base)
g1 = http.get(f"{api_base}/api/assets/{a['id']}", timeout=120)
d1 = g1.json()
assert g1.status_code == 200, d1
assert "missing" in set(d1.get("tags", [])), "Expected 'missing' tag after deletion"
# Restore the file with the exact same content and sync again
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(data)
trigger_sync_seed_assets(http, api_base)
g2 = http.get(f"{api_base}/api/assets/{a['id']}", timeout=120)
d2 = g2.json()
assert g2.status_code == 200, d2
assert "missing" not in set(d2.get("tags", [])), "Missing tag should be cleared after verify"
def test_hashed_asset_two_asset_infos_both_get_missing(
http: requests.Session,
api_base: str,
comfy_tmp_base_dir: Path,
asset_factory,
):
"""Hashed asset with a single cache_state, but two AssetInfo rows:
deleting the single file then syncing should add 'missing' to both infos.
"""
# Upload one hashed asset
name = "two_infos_one_path.png"
base_tags = ["input", "unit-tests", "multiinfo"]
created = asset_factory(name, base_tags, {}, b"A" * 2048)
# Create second AssetInfo for the same Asset via from-hash
payload = {
"hash": created["asset_hash"],
"name": "two_infos_one_path_copy.png",
"tags": base_tags, # keep it in our unit-tests scope for cleanup
"user_metadata": {"k": "v"},
}
r2 = http.post(api_base + "/api/assets/from-hash", json=payload, timeout=120)
b2 = r2.json()
assert r2.status_code == 201, b2
second_id = b2["id"]
# Remove the single underlying file
p = comfy_tmp_base_dir / "input" / "unit-tests" / "multiinfo" / get_asset_filename(b2["asset_hash"], ".png")
assert p.exists()
p.unlink()
r0 = http.get(api_base + "/api/tags", params={"limit": "1000", "include_zero": "false"}, timeout=120)
tags0 = r0.json()
assert r0.status_code == 200, tags0
byname0 = {t["name"]: t for t in tags0.get("tags", [])}
old_missing = int(byname0.get("missing", {}).get("count", 0))
# Sync -> both AssetInfos for this asset must receive 'missing'
trigger_sync_seed_assets(http, api_base)
ga = http.get(f"{api_base}/api/assets/{created['id']}", timeout=120)
da = ga.json()
assert ga.status_code == 200, da
assert "missing" in set(da.get("tags", []))
gb = http.get(f"{api_base}/api/assets/{second_id}", timeout=120)
db = gb.json()
assert gb.status_code == 200, db
assert "missing" in set(db.get("tags", []))
# Tag usage for 'missing' increased by exactly 2 (two AssetInfos)
r1 = http.get(api_base + "/api/tags", params={"limit": "1000", "include_zero": "false"}, timeout=120)
tags1 = r1.json()
assert r1.status_code == 200, tags1
byname1 = {t["name"]: t for t in tags1.get("tags", [])}
new_missing = int(byname1.get("missing", {}).get("count", 0))
assert new_missing == old_missing + 2
@pytest.mark.skip(reason="Requires computing hashes of files in directories to deduplicate into multiple cache states")
def test_hashed_asset_two_cache_states_partial_delete_then_full_delete(
http: requests.Session,
api_base: str,
comfy_tmp_base_dir: Path,
asset_factory,
make_asset_bytes,
run_scan_and_wait,
):
"""Hashed asset with two cache_state rows:
1. delete one file -> sync should NOT add 'missing'
2. delete second file -> sync should add 'missing'
"""
name = "two_cache_states_partial_delete.png"
tags = ["input", "unit-tests", "dual"]
data = make_asset_bytes(name, 3072)
created = asset_factory(name, tags, {}, data)
path1 = comfy_tmp_base_dir / "input" / "unit-tests" / "dual" / get_asset_filename(created["asset_hash"], ".png")
assert path1.exists()
# Create a second on-disk copy under the same root but different subfolder
path2 = comfy_tmp_base_dir / "input" / "unit-tests" / "dual_copy" / name
path2.parent.mkdir(parents=True, exist_ok=True)
path2.write_bytes(data)
# Fast seed so the second path appears (as a seed initially)
trigger_sync_seed_assets(http, api_base)
# Deduplication of AssetInfo-s will not happen as first AssetInfo has owner='default' and second has empty owner.
run_scan_and_wait("input")
# Remove only one file and sync -> asset should still be healthy (no 'missing')
path1.unlink()
trigger_sync_seed_assets(http, api_base)
g1 = http.get(f"{api_base}/api/assets/{created['id']}", timeout=120)
d1 = g1.json()
assert g1.status_code == 200, d1
assert "missing" not in set(d1.get("tags", [])), "Should not be missing while one valid path remains"
# Baseline 'missing' usage count just before last file removal
r0 = http.get(api_base + "/api/tags", params={"limit": "1000", "include_zero": "false"}, timeout=120)
tags0 = r0.json()
assert r0.status_code == 200, tags0
old_missing = int({t["name"]: t for t in tags0.get("tags", [])}.get("missing", {}).get("count", 0))
# Remove the second (last) file and sync -> now we expect 'missing' on this AssetInfo
path2.unlink()
trigger_sync_seed_assets(http, api_base)
g2 = http.get(f"{api_base}/api/assets/{created['id']}", timeout=120)
d2 = g2.json()
assert g2.status_code == 200, d2
assert "missing" in set(d2.get("tags", [])), "Missing must be set once no valid paths remain"
# Tag usage for 'missing' increased by exactly 2 (two AssetInfo for one Asset)
r1 = http.get(api_base + "/api/tags", params={"limit": "1000", "include_zero": "false"}, timeout=120)
tags1 = r1.json()
assert r1.status_code == 200, tags1
new_missing = int({t["name"]: t for t in tags1.get("tags", [])}.get("missing", {}).get("count", 0))
assert new_missing == old_missing + 2
@pytest.mark.parametrize("root", ["input", "output"])
def test_missing_tag_clears_on_fastpass_when_mtime_and_size_match(
root: str,
http: requests.Session,
api_base: str,
comfy_tmp_base_dir: Path,
asset_factory,
make_asset_bytes,
):
"""
Fast pass alone clears 'missing' when size and mtime match exactly:
1) upload (hashed), record original mtime_ns
2) delete -> fast pass adds 'missing'
3) restore same bytes and set mtime back to the original value
4) run fast pass again -> 'missing' is removed (no slow scan)
"""
scope = f"fastclear-{uuid.uuid4().hex[:6]}"
name = "fastpass_clear.bin"
data = make_asset_bytes(name, 3072)
a = asset_factory(name, [root, "unit-tests", scope], {}, data)
aid = a["id"]
base = comfy_tmp_base_dir / root / "unit-tests" / scope
p = base / get_asset_filename(a["asset_hash"], ".bin")
st0 = p.stat()
orig_mtime_ns = getattr(st0, "st_mtime_ns", int(st0.st_mtime * 1_000_000_000))
# Delete -> fast pass adds 'missing'
p.unlink()
trigger_sync_seed_assets(http, api_base)
g1 = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
d1 = g1.json()
assert g1.status_code == 200, d1
assert "missing" in set(d1.get("tags", []))
# Restore same bytes and revert mtime to the original value
p.parent.mkdir(parents=True, exist_ok=True)
p.write_bytes(data)
# set both atime and mtime in ns to ensure exact match
os.utime(p, ns=(orig_mtime_ns, orig_mtime_ns))
# Fast pass should clear 'missing' without a scan
trigger_sync_seed_assets(http, api_base)
g2 = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
d2 = g2.json()
assert g2.status_code == 200, d2
assert "missing" not in set(d2.get("tags", [])), "Fast pass should clear 'missing' when size+mtime match"
@pytest.mark.skip(reason="Requires computing hashes of files in directories to deduplicate into multiple cache states")
@pytest.mark.parametrize("root", ["input", "output"])
def test_fastpass_removes_stale_state_row_no_missing(
root: str,
http: requests.Session,
api_base: str,
comfy_tmp_base_dir: Path,
asset_factory,
make_asset_bytes,
run_scan_and_wait,
):
"""
Hashed asset with two states:
- delete one file
- run fast pass only
Expect:
- asset stays healthy (no 'missing')
- stale AssetCacheState row for the deleted path is removed.
We verify this behaviorally by recreating the deleted path and running fast pass again:
a new *seed* AssetInfo is created, which proves the old state row was not reused.
"""
scope = f"stale-{uuid.uuid4().hex[:6]}"
name = "two_states.bin"
data = make_asset_bytes(name, 2048)
# Upload hashed asset at path1
a = asset_factory(name, [root, "unit-tests", scope], {}, data)
base = comfy_tmp_base_dir / root / "unit-tests" / scope
a1_filename = get_asset_filename(a["asset_hash"], ".bin")
p1 = base / a1_filename
assert p1.exists()
aid = a["id"]
h = a["asset_hash"]
# Create second state path2, seed+scan to dedupe into the same Asset
p2 = base / "copy" / name
p2.parent.mkdir(parents=True, exist_ok=True)
p2.write_bytes(data)
trigger_sync_seed_assets(http, api_base)
run_scan_and_wait(root)
# Delete path1 and run fast pass -> no 'missing' and stale state row should be removed
p1.unlink()
trigger_sync_seed_assets(http, api_base)
g1 = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
d1 = g1.json()
assert g1.status_code == 200, d1
assert "missing" not in set(d1.get("tags", []))
# Recreate path1 and run fast pass again.
# If the stale state row was removed, a NEW seed AssetInfo will appear for this path.
p1.write_bytes(data)
trigger_sync_seed_assets(http, api_base)
rl = http.get(
api_base + "/api/assets",
params={"include_tags": f"unit-tests,{scope}"},
timeout=120,
)
bl = rl.json()
assert rl.status_code == 200, bl
items = bl.get("assets", [])
# one hashed AssetInfo (asset_hash == h) + one seed AssetInfo (asset_hash == null)
hashes = [it.get("asset_hash") for it in items if it.get("name") in (name, a1_filename)]
assert h in hashes
assert any(x is None for x in hashes), "Expected a new seed AssetInfo for the recreated path"
# Asset identity still healthy
rh = http.head(f"{api_base}/api/assets/hash/{h}", timeout=120)
assert rh.status_code == 200

306
tests-assets/test_crud.py Normal file
View File

@ -0,0 +1,306 @@
import uuid
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import pytest
import requests
from conftest import get_asset_filename, trigger_sync_seed_assets
def test_create_from_hash_success(
http: requests.Session, api_base: str, seeded_asset: dict
):
h = seeded_asset["asset_hash"]
payload = {
"hash": h,
"name": "from_hash_ok.safetensors",
"tags": ["models", "checkpoints", "unit-tests", "from-hash"],
"user_metadata": {"k": "v"},
}
r1 = http.post(f"{api_base}/api/assets/from-hash", json=payload, timeout=120)
b1 = r1.json()
assert r1.status_code == 201, b1
assert b1["asset_hash"] == h
assert b1["created_new"] is False
aid = b1["id"]
# Calling again with the same name should return the same AssetInfo id
r2 = http.post(f"{api_base}/api/assets/from-hash", json=payload, timeout=120)
b2 = r2.json()
assert r2.status_code == 201, b2
assert b2["id"] == aid
def test_get_and_delete_asset(http: requests.Session, api_base: str, seeded_asset: dict):
aid = seeded_asset["id"]
# GET detail
rg = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
detail = rg.json()
assert rg.status_code == 200, detail
assert detail["id"] == aid
assert "user_metadata" in detail
assert "filename" in detail["user_metadata"]
# DELETE
rd = http.delete(f"{api_base}/api/assets/{aid}", timeout=120)
assert rd.status_code == 204
# GET again -> 404
rg2 = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
body = rg2.json()
assert rg2.status_code == 404
assert body["error"]["code"] == "ASSET_NOT_FOUND"
def test_delete_upon_reference_count(
http: requests.Session, api_base: str, seeded_asset: dict
):
# Create a second reference to the same asset via from-hash
src_hash = seeded_asset["asset_hash"]
payload = {
"hash": src_hash,
"name": "unit_ref_copy.safetensors",
"tags": ["models", "checkpoints", "unit-tests", "del-flow"],
"user_metadata": {"note": "copy"},
}
r2 = http.post(f"{api_base}/api/assets/from-hash", json=payload, timeout=120)
copy = r2.json()
assert r2.status_code == 201, copy
assert copy["asset_hash"] == src_hash
assert copy["created_new"] is False
# Delete original reference -> asset identity must remain
aid1 = seeded_asset["id"]
rd1 = http.delete(f"{api_base}/api/assets/{aid1}", timeout=120)
assert rd1.status_code == 204
rh1 = http.head(f"{api_base}/api/assets/hash/{src_hash}", timeout=120)
assert rh1.status_code == 200 # identity still present
# Delete the last reference with default semantics -> identity and cached files removed
aid2 = copy["id"]
rd2 = http.delete(f"{api_base}/api/assets/{aid2}", timeout=120)
assert rd2.status_code == 204
rh2 = http.head(f"{api_base}/api/assets/hash/{src_hash}", timeout=120)
assert rh2.status_code == 404 # orphan content removed
def test_update_asset_fields(http: requests.Session, api_base: str, seeded_asset: dict):
aid = seeded_asset["id"]
original_tags = seeded_asset["tags"]
payload = {
"name": "unit_1_renamed.safetensors",
"user_metadata": {"purpose": "updated", "epoch": 2},
}
ru = http.put(f"{api_base}/api/assets/{aid}", json=payload, timeout=120)
body = ru.json()
assert ru.status_code == 200, body
assert body["name"] == payload["name"]
assert body["tags"] == original_tags # tags unchanged
assert body["user_metadata"]["purpose"] == "updated"
# filename should still be present and normalized by server
assert "filename" in body["user_metadata"]
def test_head_asset_by_hash(http: requests.Session, api_base: str, seeded_asset: dict):
h = seeded_asset["asset_hash"]
# Existing
rh1 = http.head(f"{api_base}/api/assets/hash/{h}", timeout=120)
assert rh1.status_code == 200
# Non-existent
rh2 = http.head(f"{api_base}/api/assets/hash/blake3:{'0'*64}", timeout=120)
assert rh2.status_code == 404
def test_head_asset_bad_hash_returns_400_and_no_body(http: requests.Session, api_base: str):
# Invalid format; handler returns a JSON error, but HEAD responses must not carry a payload.
# requests exposes an empty body for HEAD, so validate status and that there is no payload.
rh = http.head(f"{api_base}/api/assets/hash/not_a_hash", timeout=120)
assert rh.status_code == 400
body = rh.content
assert body == b""
def test_delete_nonexistent_returns_404(http: requests.Session, api_base: str):
bogus = str(uuid.uuid4())
r = http.delete(f"{api_base}/api/assets/{bogus}", timeout=120)
body = r.json()
assert r.status_code == 404
assert body["error"]["code"] == "ASSET_NOT_FOUND"
def test_create_from_hash_invalids(http: requests.Session, api_base: str):
# Bad hash algorithm
bad = {
"hash": "sha256:" + "0" * 64,
"name": "x.bin",
"tags": ["models", "checkpoints", "unit-tests"],
}
r1 = http.post(f"{api_base}/api/assets/from-hash", json=bad, timeout=120)
b1 = r1.json()
assert r1.status_code == 400
assert b1["error"]["code"] == "INVALID_BODY"
# Invalid JSON body
r2 = http.post(f"{api_base}/api/assets/from-hash", data=b"{not json}", timeout=120)
b2 = r2.json()
assert r2.status_code == 400
assert b2["error"]["code"] == "INVALID_JSON"
def test_get_update_download_bad_ids(http: requests.Session, api_base: str):
# All endpoints should be not found, as we UUID regex directly in the route definition.
bad_id = "not-a-uuid"
r1 = http.get(f"{api_base}/api/assets/{bad_id}", timeout=120)
assert r1.status_code == 404
r3 = http.get(f"{api_base}/api/assets/{bad_id}/content", timeout=120)
assert r3.status_code == 404
def test_update_requires_at_least_one_field(http: requests.Session, api_base: str, seeded_asset: dict):
aid = seeded_asset["id"]
r = http.put(f"{api_base}/api/assets/{aid}", json={}, timeout=120)
body = r.json()
assert r.status_code == 400
assert body["error"]["code"] == "INVALID_BODY"
@pytest.mark.parametrize("root", ["input", "output"])
def test_concurrent_delete_same_asset_info_single_204(
root: str,
http: requests.Session,
api_base: str,
asset_factory,
make_asset_bytes,
):
"""
Many concurrent DELETE for the same AssetInfo should result in:
- exactly one 204 No Content (the one that actually deleted)
- all others 404 Not Found (row already gone)
"""
scope = f"conc-del-{uuid.uuid4().hex[:6]}"
name = "to_delete.bin"
data = make_asset_bytes(name, 1536)
created = asset_factory(name, [root, "unit-tests", scope], {}, data)
aid = created["id"]
# Hit the same endpoint N times in parallel.
n_tests = 4
url = f"{api_base}/api/assets/{aid}?delete_content=false"
def _do_delete(delete_url):
with requests.Session() as s:
return s.delete(delete_url, timeout=120).status_code
with ThreadPoolExecutor(max_workers=n_tests) as ex:
statuses = list(ex.map(_do_delete, [url] * n_tests))
# Exactly one actual delete, the rest must be 404
assert statuses.count(204) == 1, f"Expected exactly one 204; got: {statuses}"
assert statuses.count(404) == n_tests - 1, f"Expected {n_tests-1} 404; got: {statuses}"
# The resource must be gone.
rg = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
assert rg.status_code == 404
@pytest.mark.parametrize("root", ["input", "output"])
def test_metadata_filename_is_set_for_seed_asset_without_hash(
root: str,
http: requests.Session,
api_base: str,
comfy_tmp_base_dir: Path,
):
"""Seed ingest (no hash yet) must compute user_metadata['filename'] immediately."""
scope = f"seedmeta-{uuid.uuid4().hex[:6]}"
name = "seed_filename.bin"
base = comfy_tmp_base_dir / root / "unit-tests" / scope / "a" / "b"
base.mkdir(parents=True, exist_ok=True)
fp = base / name
fp.write_bytes(b"Z" * 2048)
trigger_sync_seed_assets(http, api_base)
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": f"unit-tests,{scope}", "name_contains": name},
timeout=120,
)
body = r1.json()
assert r1.status_code == 200, body
matches = [a for a in body.get("assets", []) if a.get("name") == name]
assert matches, "Seed asset should be visible after sync"
assert matches[0].get("asset_hash") is None # still a seed
aid = matches[0]["id"]
r2 = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
detail = r2.json()
assert r2.status_code == 200, detail
filename = (detail.get("user_metadata") or {}).get("filename")
expected = str(fp.relative_to(comfy_tmp_base_dir / root)).replace("\\", "/")
assert filename == expected, f"expected filename={expected}, got {filename!r}"
@pytest.mark.skip(reason="Requires computing hashes of files in directories to retarget cache states")
@pytest.mark.parametrize("root", ["input", "output"])
def test_metadata_filename_computed_and_updated_on_retarget(
root: str,
http: requests.Session,
api_base: str,
comfy_tmp_base_dir: Path,
asset_factory,
make_asset_bytes,
run_scan_and_wait,
):
"""
1) Ingest under {root}/unit-tests/<scope>/a/b/<name> -> filename reflects relative path.
2) Retarget by copying to {root}/unit-tests/<scope>/x/<new_name>, remove old file,
run fast pass + scan -> filename updates to new relative path.
"""
scope = f"meta-fn-{uuid.uuid4().hex[:6]}"
name1 = "compute_metadata_filename.png"
name2 = "compute_changed_metadata_filename.png"
data = make_asset_bytes(name1, 2100)
# Upload into nested path a/b
a = asset_factory(name1, [root, "unit-tests", scope, "a", "b"], {}, data)
aid = a["id"]
root_base = comfy_tmp_base_dir / root
p1 = (root_base / "unit-tests" / scope / "a" / "b" / get_asset_filename(a["asset_hash"], ".png"))
assert p1.exists()
# filename at ingest should be the path relative to root
rel1 = str(p1.relative_to(root_base)).replace("\\", "/")
g1 = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
d1 = g1.json()
assert g1.status_code == 200, d1
fn1 = d1["user_metadata"].get("filename")
assert fn1 == rel1
# Retarget: copy to x/<name2>, remove old, then sync+scan
p2 = root_base / "unit-tests" / scope / "x" / name2
p2.parent.mkdir(parents=True, exist_ok=True)
p2.write_bytes(data)
if p1.exists():
p1.unlink()
trigger_sync_seed_assets(http, api_base) # seed the new path
run_scan_and_wait(root) # verify/hash and reconcile
# filename should now point at x/<name2>
rel2 = str(p2.relative_to(root_base)).replace("\\", "/")
g2 = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
d2 = g2.json()
assert g2.status_code == 200, d2
fn2 = d2["user_metadata"].get("filename")
assert fn2 == rel2

View File

@ -0,0 +1,166 @@
import time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
import pytest
import requests
from conftest import get_asset_filename, trigger_sync_seed_assets
def test_download_attachment_and_inline(http: requests.Session, api_base: str, seeded_asset: dict):
aid = seeded_asset["id"]
# default attachment
r1 = http.get(f"{api_base}/api/assets/{aid}/content", timeout=120)
data = r1.content
assert r1.status_code == 200
cd = r1.headers.get("Content-Disposition", "")
assert "attachment" in cd
assert data and len(data) == 4096
# inline requested
r2 = http.get(f"{api_base}/api/assets/{aid}/content?disposition=inline", timeout=120)
r2.content
assert r2.status_code == 200
cd2 = r2.headers.get("Content-Disposition", "")
assert "inline" in cd2
@pytest.mark.skip(reason="Requires computing hashes of files in directories to deduplicate into multiple cache states")
@pytest.mark.parametrize("root", ["input", "output"])
def test_download_chooses_existing_state_and_updates_access_time(
root: str,
http: requests.Session,
api_base: str,
comfy_tmp_base_dir: Path,
asset_factory,
make_asset_bytes,
run_scan_and_wait,
):
"""
Hashed asset with two state paths: if the first one disappears,
GET /content still serves from the remaining path and bumps last_access_time.
"""
scope = f"dl-first-{uuid.uuid4().hex[:6]}"
name = "first_existing_state.bin"
data = make_asset_bytes(name, 3072)
# Upload -> path1
a = asset_factory(name, [root, "unit-tests", scope], {}, data)
aid = a["id"]
base = comfy_tmp_base_dir / root / "unit-tests" / scope
path1 = base / get_asset_filename(a["asset_hash"], ".bin")
assert path1.exists()
# Seed path2 by copying, then scan to dedupe into a second state
path2 = base / "alt" / name
path2.parent.mkdir(parents=True, exist_ok=True)
path2.write_bytes(data)
trigger_sync_seed_assets(http, api_base)
run_scan_and_wait(root)
# Remove path1 so server must fall back to path2
path1.unlink()
# last_access_time before
rg0 = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
d0 = rg0.json()
assert rg0.status_code == 200, d0
ts0 = d0.get("last_access_time")
time.sleep(0.05)
r = http.get(f"{api_base}/api/assets/{aid}/content", timeout=120)
blob = r.content
assert r.status_code == 200
assert blob == data # must serve from the surviving state (same bytes)
rg1 = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
d1 = rg1.json()
assert rg1.status_code == 200, d1
ts1 = d1.get("last_access_time")
def _parse_iso8601(s: Optional[str]) -> Optional[float]:
if not s:
return None
s = s[:-1] if s.endswith("Z") else s
return datetime.fromisoformat(s).timestamp()
t0 = _parse_iso8601(ts0)
t1 = _parse_iso8601(ts1)
assert t1 is not None
if t0 is not None:
assert t1 > t0
@pytest.mark.parametrize("seeded_asset", [{"tags": ["models", "checkpoints"]}], indirect=True)
def test_download_missing_file_returns_404(
http: requests.Session, api_base: str, comfy_tmp_base_dir: Path, seeded_asset: dict
):
# Remove the underlying file then attempt download.
# We initialize fixture without additional tags to know exactly the asset file path.
try:
aid = seeded_asset["id"]
rg = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
detail = rg.json()
assert rg.status_code == 200
asset_filename = get_asset_filename(detail["asset_hash"], ".safetensors")
abs_path = comfy_tmp_base_dir / "models" / "checkpoints" / asset_filename
assert abs_path.exists()
abs_path.unlink()
r2 = http.get(f"{api_base}/api/assets/{aid}/content", timeout=120)
assert r2.status_code == 404
body = r2.json()
assert body["error"]["code"] == "FILE_NOT_FOUND"
finally:
# We created asset without the "unit-tests" tag(see `autoclean_unit_test_assets`), we need to clear it manually.
dr = http.delete(f"{api_base}/api/assets/{aid}", timeout=120)
dr.content
@pytest.mark.skip(reason="Requires computing hashes of files in directories to deduplicate into multiple cache states")
@pytest.mark.parametrize("root", ["input", "output"])
def test_download_404_if_all_states_missing(
root: str,
http: requests.Session,
api_base: str,
comfy_tmp_base_dir: Path,
asset_factory,
make_asset_bytes,
run_scan_and_wait,
):
"""Multi-state asset: after the last remaining on-disk file is removed, download must return 404."""
scope = f"dl-404-{uuid.uuid4().hex[:6]}"
name = "missing_all_states.bin"
data = make_asset_bytes(name, 2048)
# Upload -> path1
a = asset_factory(name, [root, "unit-tests", scope], {}, data)
aid = a["id"]
base = comfy_tmp_base_dir / root / "unit-tests" / scope
p1 = base / get_asset_filename(a["asset_hash"], ".bin")
assert p1.exists()
# Seed a second state and dedupe
p2 = base / "copy" / name
p2.parent.mkdir(parents=True, exist_ok=True)
p2.write_bytes(data)
trigger_sync_seed_assets(http, api_base)
run_scan_and_wait(root)
# Remove first file -> download should still work via the second state
p1.unlink()
ok1 = http.get(f"{api_base}/api/assets/{aid}/content", timeout=120)
b1 = ok1.content
assert ok1.status_code == 200 and b1 == data
# Remove the last file -> download must 404
p2.unlink()
r2 = http.get(f"{api_base}/api/assets/{aid}/content", timeout=120)
body = r2.json()
assert r2.status_code == 404
assert body["error"]["code"] == "FILE_NOT_FOUND"

View File

@ -0,0 +1,343 @@
import time
import uuid
import requests
import pytest
def test_list_assets_paging_and_sort(http: requests.Session, api_base: str, asset_factory, make_asset_bytes):
names = ["a1_u.safetensors", "a2_u.safetensors", "a3_u.safetensors"]
for n in names:
asset_factory(
n,
["models", "checkpoints", "unit-tests", "paging"],
{"epoch": 1},
make_asset_bytes(n, size=2048),
)
# name ascending for stable order
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,paging", "sort": "name", "order": "asc", "limit": "2", "offset": "0"},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200
got1 = [a["name"] for a in b1["assets"]]
assert got1 == sorted(names)[:2]
assert b1["has_more"] is True
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,paging", "sort": "name", "order": "asc", "limit": "2", "offset": "2"},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200
got2 = [a["name"] for a in b2["assets"]]
assert got2 == sorted(names)[2:]
assert b2["has_more"] is False
def test_list_assets_include_exclude_and_name_contains(http: requests.Session, api_base: str, asset_factory):
a = asset_factory("inc_a.safetensors", ["models", "checkpoints", "unit-tests", "alpha"], {}, b"X" * 1024)
b = asset_factory("inc_b.safetensors", ["models", "checkpoints", "unit-tests", "beta"], {}, b"Y" * 1024)
r = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,alpha", "exclude_tags": "beta", "limit": "50"},
timeout=120,
)
body = r.json()
assert r.status_code == 200
names = [x["name"] for x in body["assets"]]
assert a["name"] in names
assert b["name"] not in names
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests", "name_contains": "inc_"},
timeout=120,
)
body2 = r2.json()
assert r2.status_code == 200
names2 = [x["name"] for x in body2["assets"]]
assert a["name"] in names2
assert b["name"] in names2
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "non-existing-tag"},
timeout=120,
)
body3 = r2.json()
assert r2.status_code == 200
assert not body3["assets"]
def test_list_assets_sort_by_size_both_orders(http, api_base, asset_factory, make_asset_bytes):
t = ["models", "checkpoints", "unit-tests", "lf-size"]
n1, n2, n3 = "sz1.safetensors", "sz2.safetensors", "sz3.safetensors"
asset_factory(n1, t, {}, make_asset_bytes(n1, 1024))
asset_factory(n2, t, {}, make_asset_bytes(n2, 2048))
asset_factory(n3, t, {}, make_asset_bytes(n3, 3072))
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,lf-size", "sort": "size", "order": "asc"},
timeout=120,
)
b1 = r1.json()
names = [a["name"] for a in b1["assets"]]
assert names[:3] == [n1, n2, n3]
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,lf-size", "sort": "size", "order": "desc"},
timeout=120,
)
b2 = r2.json()
names2 = [a["name"] for a in b2["assets"]]
assert names2[:3] == [n3, n2, n1]
def test_list_assets_sort_by_updated_at_desc(http, api_base, asset_factory, make_asset_bytes):
t = ["models", "checkpoints", "unit-tests", "lf-upd"]
a1 = asset_factory("upd_a.safetensors", t, {}, make_asset_bytes("upd_a", 1200))
a2 = asset_factory("upd_b.safetensors", t, {}, make_asset_bytes("upd_b", 1200))
# Rename the second asset to bump updated_at
rp = http.put(f"{api_base}/api/assets/{a2['id']}", json={"name": "upd_b_renamed.safetensors"}, timeout=120)
upd = rp.json()
assert rp.status_code == 200, upd
r = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,lf-upd", "sort": "updated_at", "order": "desc"},
timeout=120,
)
body = r.json()
assert r.status_code == 200
names = [x["name"] for x in body["assets"]]
assert names[0] == "upd_b_renamed.safetensors"
assert a1["name"] in names
def test_list_assets_sort_by_last_access_time_desc(http, api_base, asset_factory, make_asset_bytes):
t = ["models", "checkpoints", "unit-tests", "lf-access"]
asset_factory("acc_a.safetensors", t, {}, make_asset_bytes("acc_a", 1100))
time.sleep(0.02)
a2 = asset_factory("acc_b.safetensors", t, {}, make_asset_bytes("acc_b", 1100))
# Touch last_access_time of b by downloading its content
time.sleep(0.02)
dl = http.get(f"{api_base}/api/assets/{a2['id']}/content", timeout=120)
assert dl.status_code == 200
dl.content
r = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,lf-access", "sort": "last_access_time", "order": "desc"},
timeout=120,
)
body = r.json()
assert r.status_code == 200
names = [x["name"] for x in body["assets"]]
assert names[0] == a2["name"]
def test_list_assets_include_tags_variants_and_case(http, api_base, asset_factory, make_asset_bytes):
t = ["models", "checkpoints", "unit-tests", "lf-include"]
a = asset_factory("incvar_alpha.safetensors", [*t, "alpha"], {}, make_asset_bytes("iva"))
asset_factory("incvar_beta.safetensors", [*t, "beta"], {}, make_asset_bytes("ivb"))
# CSV + case-insensitive
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "UNIT-TESTS,LF-INCLUDE,alpha"},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200
names1 = [x["name"] for x in b1["assets"]]
assert a["name"] in names1
assert not any("beta" in x for x in names1)
# Repeated query params for include_tags
params_multi = [
("include_tags", "unit-tests"),
("include_tags", "lf-include"),
("include_tags", "alpha"),
]
r2 = http.get(api_base + "/api/assets", params=params_multi, timeout=120)
b2 = r2.json()
assert r2.status_code == 200
names2 = [x["name"] for x in b2["assets"]]
assert a["name"] in names2
assert not any("beta" in x for x in names2)
# Duplicates and spaces in CSV
r3 = http.get(
api_base + "/api/assets",
params={"include_tags": " unit-tests , lf-include , alpha , alpha "},
timeout=120,
)
b3 = r3.json()
assert r3.status_code == 200
names3 = [x["name"] for x in b3["assets"]]
assert a["name"] in names3
def test_list_assets_exclude_tags_dedup_and_case(http, api_base, asset_factory, make_asset_bytes):
t = ["models", "checkpoints", "unit-tests", "lf-exclude"]
a = asset_factory("ex_a_alpha.safetensors", [*t, "alpha"], {}, make_asset_bytes("exa", 900))
asset_factory("ex_b_beta.safetensors", [*t, "beta"], {}, make_asset_bytes("exb", 900))
# Exclude uppercase should work
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,lf-exclude", "exclude_tags": "BETA"},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200
names1 = [x["name"] for x in b1["assets"]]
assert a["name"] in names1
# Repeated excludes with duplicates
params_multi = [
("include_tags", "unit-tests"),
("include_tags", "lf-exclude"),
("exclude_tags", "beta"),
("exclude_tags", "beta"),
]
r2 = http.get(api_base + "/api/assets", params=params_multi, timeout=120)
b2 = r2.json()
assert r2.status_code == 200
names2 = [x["name"] for x in b2["assets"]]
assert all("beta" not in x for x in names2)
def test_list_assets_name_contains_case_and_specials(http, api_base, asset_factory, make_asset_bytes):
t = ["models", "checkpoints", "unit-tests", "lf-name"]
a1 = asset_factory("CaseMix.SAFE", t, {}, make_asset_bytes("cm", 800))
a2 = asset_factory("case-other.safetensors", t, {}, make_asset_bytes("co", 800))
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,lf-name", "name_contains": "casemix"},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200
names1 = [x["name"] for x in b1["assets"]]
assert a1["name"] in names1
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,lf-name", "name_contains": ".SAFE"},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200
names2 = [x["name"] for x in b2["assets"]]
assert a1["name"] in names2
r3 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,lf-name", "name_contains": "case-"},
timeout=120,
)
b3 = r3.json()
assert r3.status_code == 200
names3 = [x["name"] for x in b3["assets"]]
assert a2["name"] in names3
def test_list_assets_offset_beyond_total_and_limit_boundary(http, api_base, asset_factory, make_asset_bytes):
t = ["models", "checkpoints", "unit-tests", "lf-pagelimits"]
asset_factory("pl1.safetensors", t, {}, make_asset_bytes("pl1", 600))
asset_factory("pl2.safetensors", t, {}, make_asset_bytes("pl2", 600))
asset_factory("pl3.safetensors", t, {}, make_asset_bytes("pl3", 600))
# Offset far beyond total
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,lf-pagelimits", "limit": "2", "offset": "10"},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200
assert not b1["assets"]
assert b1["has_more"] is False
# Boundary large limit (<=500 is valid)
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,lf-pagelimits", "limit": "500"},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200
assert len(b2["assets"]) == 3
assert b2["has_more"] is False
def test_list_assets_offset_negative_and_limit_nonint_rejected(http, api_base):
r1 = http.get(api_base + "/api/assets", params={"offset": "-1"}, timeout=120)
b1 = r1.json()
assert r1.status_code == 400
assert b1["error"]["code"] == "INVALID_QUERY"
r2 = http.get(api_base + "/api/assets", params={"limit": "abc"}, timeout=120)
b2 = r2.json()
assert r2.status_code == 400
assert b2["error"]["code"] == "INVALID_QUERY"
def test_list_assets_invalid_query_rejected(http: requests.Session, api_base: str):
# limit too small
r1 = http.get(api_base + "/api/assets", params={"limit": "0"}, timeout=120)
b1 = r1.json()
assert r1.status_code == 400
assert b1["error"]["code"] == "INVALID_QUERY"
# bad metadata JSON
r2 = http.get(api_base + "/api/assets", params={"metadata_filter": "{not json"}, timeout=120)
b2 = r2.json()
assert r2.status_code == 400
assert b2["error"]["code"] == "INVALID_QUERY"
def test_list_assets_name_contains_literal_underscore(
http,
api_base,
asset_factory,
make_asset_bytes,
):
"""'name_contains' must treat '_' literally, not as a SQL wildcard.
We create:
- foo_bar.safetensors (should match)
- fooxbar.safetensors (must NOT match if '_' is escaped)
- foobar.safetensors (must NOT match)
"""
scope = f"lf-underscore-{uuid.uuid4().hex[:6]}"
tags = ["models", "checkpoints", "unit-tests", scope]
a = asset_factory("foo_bar.safetensors", tags, {}, make_asset_bytes("a", 700))
b = asset_factory("fooxbar.safetensors", tags, {}, make_asset_bytes("b", 700))
c = asset_factory("foobar.safetensors", tags, {}, make_asset_bytes("c", 700))
r = http.get(
api_base + "/api/assets",
params={"include_tags": f"unit-tests,{scope}", "name_contains": "foo_bar"},
timeout=120,
)
body = r.json()
assert r.status_code == 200, body
names = [x["name"] for x in body["assets"]]
assert a["name"] in names, f"Expected literal underscore match to include {a['name']}"
assert b["name"] not in names, "Underscore must be escaped — should not match 'fooxbar'"
assert c["name"] not in names, "Underscore must be escaped — should not match 'foobar'"
assert body["total"] == 1

View File

@ -0,0 +1,397 @@
import json
import pytest
def test_meta_and_across_keys_and_types(
http, api_base: str, asset_factory, make_asset_bytes
):
name = "mf_and_mix.safetensors"
tags = ["models", "checkpoints", "unit-tests", "mf-and"]
meta = {"purpose": "mix", "epoch": 1, "active": True, "score": 1.23}
asset_factory(name, tags, meta, make_asset_bytes(name, 4096))
# All keys must match (AND semantics)
f_ok = {"purpose": "mix", "epoch": 1, "active": True, "score": 1.23}
r1 = http.get(
api_base + "/api/assets",
params={
"include_tags": "unit-tests,mf-and",
"metadata_filter": json.dumps(f_ok),
},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200
names = [a["name"] for a in b1["assets"]]
assert name in names
# One key mismatched -> no result
f_bad = {"purpose": "mix", "epoch": 2, "active": True}
r2 = http.get(
api_base + "/api/assets",
params={
"include_tags": "unit-tests,mf-and",
"metadata_filter": json.dumps(f_bad),
},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200
assert not b2["assets"]
def test_meta_type_strictness_int_vs_str_and_bool(http, api_base, asset_factory, make_asset_bytes):
name = "mf_types.safetensors"
tags = ["models", "checkpoints", "unit-tests", "mf-types"]
meta = {"epoch": 1, "active": True}
asset_factory(name, tags, meta, make_asset_bytes(name))
# int filter matches numeric
r1 = http.get(
api_base + "/api/assets",
params={
"include_tags": "unit-tests,mf-types",
"metadata_filter": json.dumps({"epoch": 1}),
},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200 and any(a["name"] == name for a in b1["assets"])
# string "1" must NOT match numeric 1
r2 = http.get(
api_base + "/api/assets",
params={
"include_tags": "unit-tests,mf-types",
"metadata_filter": json.dumps({"epoch": "1"}),
},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200 and not b2["assets"]
# bool True matches, string "true" must NOT match
r3 = http.get(
api_base + "/api/assets",
params={
"include_tags": "unit-tests,mf-types",
"metadata_filter": json.dumps({"active": True}),
},
timeout=120,
)
b3 = r3.json()
assert r3.status_code == 200 and any(a["name"] == name for a in b3["assets"])
r4 = http.get(
api_base + "/api/assets",
params={
"include_tags": "unit-tests,mf-types",
"metadata_filter": json.dumps({"active": "true"}),
},
timeout=120,
)
b4 = r4.json()
assert r4.status_code == 200 and not b4["assets"]
def test_meta_any_of_list_of_scalars(http, api_base, asset_factory, make_asset_bytes):
name = "mf_list_scalars.safetensors"
tags = ["models", "checkpoints", "unit-tests", "mf-list"]
meta = {"flags": ["red", "green"]}
asset_factory(name, tags, meta, make_asset_bytes(name, 3000))
# Any-of should match because "green" is present
filt_ok = {"flags": ["blue", "green"]}
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-list", "metadata_filter": json.dumps(filt_ok)},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200 and any(a["name"] == name for a in b1["assets"])
# None of provided flags present -> no match
filt_miss = {"flags": ["blue", "yellow"]}
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-list", "metadata_filter": json.dumps(filt_miss)},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200 and not b2["assets"]
# Duplicates in list should not break matching
filt_dup = {"flags": ["green", "green", "green"]}
r3 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-list", "metadata_filter": json.dumps(filt_dup)},
timeout=120,
)
b3 = r3.json()
assert r3.status_code == 200 and any(a["name"] == name for a in b3["assets"])
def test_meta_none_semantics_missing_or_null_and_any_of_with_none(
http, api_base, asset_factory, make_asset_bytes
):
# a1: key missing; a2: explicit null; a3: concrete value
t = ["models", "checkpoints", "unit-tests", "mf-none"]
a1 = asset_factory("mf_none_missing.safetensors", t, {"x": 1}, make_asset_bytes("a1"))
a2 = asset_factory("mf_none_null.safetensors", t, {"maybe": None}, make_asset_bytes("a2"))
a3 = asset_factory("mf_none_value.safetensors", t, {"maybe": "x"}, make_asset_bytes("a3"))
# Filter {maybe: None} must match a1 and a2, not a3
filt = {"maybe": None}
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-none", "metadata_filter": json.dumps(filt), "sort": "name"},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200
got = [a["name"] for a in b1["assets"]]
assert a1["name"] in got and a2["name"] in got and a3["name"] not in got
# Any-of with None should include missing/null plus value matches
filt_any = {"maybe": [None, "x"]}
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-none", "metadata_filter": json.dumps(filt_any), "sort": "name"},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200
got2 = [a["name"] for a in b2["assets"]]
assert a1["name"] in got2 and a2["name"] in got2 and a3["name"] in got2
def test_meta_nested_json_object_equality(http, api_base, asset_factory, make_asset_bytes):
name = "mf_nested_json.safetensors"
tags = ["models", "checkpoints", "unit-tests", "mf-nested"]
cfg = {"optimizer": "adam", "lr": 0.001, "schedule": {"type": "cosine", "warmup": 100}}
asset_factory(name, tags, {"config": cfg}, make_asset_bytes(name, 2200))
# Exact JSON object equality (same structure)
r1 = http.get(
api_base + "/api/assets",
params={
"include_tags": "unit-tests,mf-nested",
"metadata_filter": json.dumps({"config": cfg}),
},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200 and any(a["name"] == name for a in b1["assets"])
# Different JSON object should not match
r2 = http.get(
api_base + "/api/assets",
params={
"include_tags": "unit-tests,mf-nested",
"metadata_filter": json.dumps({"config": {"optimizer": "sgd"}}),
},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200 and not b2["assets"]
def test_meta_list_of_objects_any_of(http, api_base, asset_factory, make_asset_bytes):
name = "mf_list_objects.safetensors"
tags = ["models", "checkpoints", "unit-tests", "mf-objlist"]
transforms = [{"type": "crop", "size": 128}, {"type": "flip", "p": 0.5}]
asset_factory(name, tags, {"transforms": transforms}, make_asset_bytes(name, 2048))
# Any-of for list of objects should match when one element equals the filter object
r1 = http.get(
api_base + "/api/assets",
params={
"include_tags": "unit-tests,mf-objlist",
"metadata_filter": json.dumps({"transforms": {"type": "flip", "p": 0.5}}),
},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200 and any(a["name"] == name for a in b1["assets"])
# Non-matching object -> no match
r2 = http.get(
api_base + "/api/assets",
params={
"include_tags": "unit-tests,mf-objlist",
"metadata_filter": json.dumps({"transforms": {"type": "rotate", "deg": 90}}),
},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200 and not b2["assets"]
def test_meta_with_special_and_unicode_keys(http, api_base, asset_factory, make_asset_bytes):
name = "mf_keys_unicode.safetensors"
tags = ["models", "checkpoints", "unit-tests", "mf-keys"]
meta = {
"weird.key": "v1",
"path/like": 7,
"with:colon": True,
"ключ": "значение",
"emoji": "🐍",
}
asset_factory(name, tags, meta, make_asset_bytes(name, 1500))
# Match all the special keys
filt = {"weird.key": "v1", "path/like": 7, "with:colon": True, "emoji": "🐍"}
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-keys", "metadata_filter": json.dumps(filt)},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200 and any(a["name"] == name for a in b1["assets"])
# Unicode key match
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-keys", "metadata_filter": json.dumps({"ключ": "значение"})},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200 and any(a["name"] == name for a in b2["assets"])
def test_meta_with_zero_and_boolean_lists(http, api_base, asset_factory, make_asset_bytes):
t = ["models", "checkpoints", "unit-tests", "mf-zero-bool"]
a0 = asset_factory("mf_zero_count.safetensors", t, {"count": 0}, make_asset_bytes("z", 1025))
a1 = asset_factory("mf_bool_list.safetensors", t, {"choices": [True, False]}, make_asset_bytes("b", 1026))
# count == 0 must match only a0
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-zero-bool", "metadata_filter": json.dumps({"count": 0})},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200
names1 = [a["name"] for a in b1["assets"]]
assert a0["name"] in names1 and a1["name"] not in names1
# Any-of list of booleans: True matches second asset
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-zero-bool", "metadata_filter": json.dumps({"choices": True})},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200 and any(a["name"] == a1["name"] for a in b2["assets"])
def test_meta_mixed_list_types_and_strictness(http, api_base, asset_factory, make_asset_bytes):
name = "mf_mixed_list.safetensors"
tags = ["models", "checkpoints", "unit-tests", "mf-mixed"]
meta = {"mix": ["1", 1, True, None]}
asset_factory(name, tags, meta, make_asset_bytes(name, 1999))
# Should match because 1 is present
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-mixed", "metadata_filter": json.dumps({"mix": [2, 1]})},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200 and any(a["name"] == name for a in b1["assets"])
# Should NOT match for False
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-mixed", "metadata_filter": json.dumps({"mix": False})},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200 and not b2["assets"]
def test_meta_unknown_key_and_none_behavior_with_scope_tags(http, api_base, asset_factory, make_asset_bytes):
# Use a unique scope tag to avoid interference
t = ["models", "checkpoints", "unit-tests", "mf-unknown-scope"]
x = asset_factory("mf_unknown_a.safetensors", t, {"k1": 1}, make_asset_bytes("ua"))
y = asset_factory("mf_unknown_b.safetensors", t, {"k2": 2}, make_asset_bytes("ub"))
# Filtering by unknown key with None should return both (missing key OR null)
r1 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-unknown-scope", "metadata_filter": json.dumps({"unknown": None})},
timeout=120,
)
b1 = r1.json()
assert r1.status_code == 200
names = {a["name"] for a in b1["assets"]}
assert x["name"] in names and y["name"] in names
# Filtering by unknown key with concrete value should return none
r2 = http.get(
api_base + "/api/assets",
params={"include_tags": "unit-tests,mf-unknown-scope", "metadata_filter": json.dumps({"unknown": "x"})},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200 and not b2["assets"]
def test_meta_with_tags_include_exclude_and_name_contains(http, api_base, asset_factory, make_asset_bytes):
# alpha matches epoch=1; beta has epoch=2
a = asset_factory(
"mf_tag_alpha.safetensors",
["models", "checkpoints", "unit-tests", "mf-tag", "alpha"],
{"epoch": 1},
make_asset_bytes("alpha"),
)
b = asset_factory(
"mf_tag_beta.safetensors",
["models", "checkpoints", "unit-tests", "mf-tag", "beta"],
{"epoch": 2},
make_asset_bytes("beta"),
)
params = {
"include_tags": "unit-tests,mf-tag,alpha",
"exclude_tags": "beta",
"name_contains": "mf_tag_",
"metadata_filter": json.dumps({"epoch": 1}),
}
r = http.get(api_base + "/api/assets", params=params, timeout=120)
body = r.json()
assert r.status_code == 200
names = [x["name"] for x in body["assets"]]
assert a["name"] in names
assert b["name"] not in names
def test_meta_sort_and_paging_under_filter(http, api_base, asset_factory, make_asset_bytes):
# Three assets in same scope with different sizes and a common filter key
t = ["models", "checkpoints", "unit-tests", "mf-sort"]
n1, n2, n3 = "mf_sort_1.safetensors", "mf_sort_2.safetensors", "mf_sort_3.safetensors"
asset_factory(n1, t, {"group": "g"}, make_asset_bytes(n1, 1024))
asset_factory(n2, t, {"group": "g"}, make_asset_bytes(n2, 2048))
asset_factory(n3, t, {"group": "g"}, make_asset_bytes(n3, 3072))
# Sort by size ascending with paging
q = {
"include_tags": "unit-tests,mf-sort",
"metadata_filter": json.dumps({"group": "g"}),
"sort": "size", "order": "asc", "limit": "2",
}
r1 = http.get(api_base + "/api/assets", params=q, timeout=120)
b1 = r1.json()
assert r1.status_code == 200
got1 = [a["name"] for a in b1["assets"]]
assert got1 == [n1, n2]
assert b1["has_more"] is True
q2 = {**q, "offset": "2"}
r2 = http.get(api_base + "/api/assets", params=q2, timeout=120)
b2 = r2.json()
assert r2.status_code == 200
got2 = [a["name"] for a in b2["assets"]]
assert got2 == [n3]
assert b2["has_more"] is False

226
tests-assets/test_tags.py Normal file
View File

@ -0,0 +1,226 @@
import json
import uuid
import pytest
import requests
def test_tags_present(http: requests.Session, api_base: str, seeded_asset: dict):
# Include zero-usage tags by default
r1 = http.get(api_base + "/api/tags", params={"limit": "50"}, timeout=120)
body1 = r1.json()
assert r1.status_code == 200
names = [t["name"] for t in body1["tags"]]
# A few system tags from migration should exist:
assert "models" in names
assert "checkpoints" in names
# Only used tags before we add anything new from this test cycle
r2 = http.get(api_base + "/api/tags", params={"include_zero": "false"}, timeout=120)
body2 = r2.json()
assert r2.status_code == 200
# We already seeded one asset via fixture, so used tags must be non-empty
used_names = [t["name"] for t in body2["tags"]]
assert "models" in used_names
assert "checkpoints" in used_names
# Prefix filter should refine the list
r3 = http.get(api_base + "/api/tags", params={"include_zero": "false", "prefix": "uni"}, timeout=120)
b3 = r3.json()
assert r3.status_code == 200
names3 = [t["name"] for t in b3["tags"]]
assert "unit-tests" in names3
assert "models" not in names3 # filtered out by prefix
# Order by name ascending should be stable
r4 = http.get(api_base + "/api/tags", params={"include_zero": "false", "order": "name_asc"}, timeout=120)
b4 = r4.json()
assert r4.status_code == 200
names4 = [t["name"] for t in b4["tags"]]
assert names4 == sorted(names4)
def test_tags_empty_usage(http: requests.Session, api_base: str, asset_factory, make_asset_bytes):
# Baseline: system tags exist when include_zero (default) is true
r1 = http.get(api_base + "/api/tags", params={"limit": "500"}, timeout=120)
body1 = r1.json()
assert r1.status_code == 200
names = [t["name"] for t in body1["tags"]]
assert "models" in names and "checkpoints" in names
# Create a short-lived asset under input with a unique custom tag
scope = f"tags-empty-usage-{uuid.uuid4().hex[:6]}"
custom_tag = f"temp-{uuid.uuid4().hex[:8]}"
name = "tag_seed.bin"
_asset = asset_factory(
name,
["input", "unit-tests", scope, custom_tag],
{},
make_asset_bytes(name, 512),
)
# While the asset exists, the custom tag must appear when include_zero=false
r2 = http.get(
api_base + "/api/tags",
params={"include_zero": "false", "prefix": custom_tag, "limit": "50"},
timeout=120,
)
body2 = r2.json()
assert r2.status_code == 200
used_names = [t["name"] for t in body2["tags"]]
assert custom_tag in used_names
# Delete the asset so the tag usage drops to zero
rd = http.delete(f"{api_base}/api/assets/{_asset['id']}", timeout=120)
assert rd.status_code == 204
# Now the custom tag must not be returned when include_zero=false
r3 = http.get(
api_base + "/api/tags",
params={"include_zero": "false", "prefix": custom_tag, "limit": "50"},
timeout=120,
)
body3 = r3.json()
assert r3.status_code == 200
names_after = [t["name"] for t in body3["tags"]]
assert custom_tag not in names_after
assert not names_after # filtered view should be empty now
def test_add_and_remove_tags(http: requests.Session, api_base: str, seeded_asset: dict):
aid = seeded_asset["id"]
# Add tags with duplicates and mixed case
payload_add = {"tags": ["NewTag", "unit-tests", "newtag", "BETA"]}
r1 = http.post(f"{api_base}/api/assets/{aid}/tags", json=payload_add, timeout=120)
b1 = r1.json()
assert r1.status_code == 200, b1
# normalized, deduplicated; 'unit-tests' was already present from the seed
assert set(b1["added"]) == {"newtag", "beta"}
assert set(b1["already_present"]) == {"unit-tests"}
assert "newtag" in b1["total_tags"] and "beta" in b1["total_tags"]
rg = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
g = rg.json()
assert rg.status_code == 200
tags_now = set(g["tags"])
assert {"newtag", "beta"}.issubset(tags_now)
# Remove a tag and a non-existent tag
payload_del = {"tags": ["newtag", "does-not-exist"]}
r2 = http.delete(f"{api_base}/api/assets/{aid}/tags", json=payload_del, timeout=120)
b2 = r2.json()
assert r2.status_code == 200
assert set(b2["removed"]) == {"newtag"}
assert set(b2["not_present"]) == {"does-not-exist"}
# Verify remaining tags after deletion
rg2 = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
g2 = rg2.json()
assert rg2.status_code == 200
tags_later = set(g2["tags"])
assert "newtag" not in tags_later
assert "beta" in tags_later # still present
def test_tags_list_order_and_prefix(http: requests.Session, api_base: str, seeded_asset: dict):
aid = seeded_asset["id"]
h = seeded_asset["asset_hash"]
# Add both tags to the seeded asset (usage: orderaaa=1, orderbbb=1)
r_add = http.post(f"{api_base}/api/assets/{aid}/tags", json={"tags": ["orderaaa", "orderbbb"]}, timeout=120)
add_body = r_add.json()
assert r_add.status_code == 200, add_body
# Create another AssetInfo from the same content but tagged ONLY with 'orderbbb'.
payload = {
"hash": h,
"name": "order_only_bbb.safetensors",
"tags": ["input", "unit-tests", "orderbbb"],
"user_metadata": {},
}
r_copy = http.post(f"{api_base}/api/assets/from-hash", json=payload, timeout=120)
copy_body = r_copy.json()
assert r_copy.status_code == 201, copy_body
# 1) Default order (count_desc): 'orderbbb' should come before 'orderaaa'
# because it has higher usage (2 vs 1).
r1 = http.get(api_base + "/api/tags", params={"prefix": "order", "include_zero": "false"}, timeout=120)
b1 = r1.json()
assert r1.status_code == 200, b1
names1 = [t["name"] for t in b1["tags"]]
counts1 = {t["name"]: t["count"] for t in b1["tags"]}
# Both must be present within the prefix subset
assert "orderaaa" in names1 and "orderbbb" in names1
# Usage of 'orderbbb' must be >= 'orderaaa'; in our setup it's 2 vs 1
assert counts1["orderbbb"] >= counts1["orderaaa"]
# And with count_desc, 'orderbbb' appears earlier than 'orderaaa'
assert names1.index("orderbbb") < names1.index("orderaaa")
# 2) name_asc: lexical order should flip the relative order
r2 = http.get(
api_base + "/api/tags",
params={"prefix": "order", "include_zero": "false", "order": "name_asc"},
timeout=120,
)
b2 = r2.json()
assert r2.status_code == 200, b2
names2 = [t["name"] for t in b2["tags"]]
assert "orderaaa" in names2 and "orderbbb" in names2
assert names2.index("orderaaa") < names2.index("orderbbb")
# 3) invalid limit rejected (existing negative case retained)
r3 = http.get(api_base + "/api/tags", params={"limit": "1001"}, timeout=120)
b3 = r3.json()
assert r3.status_code == 400
assert b3["error"]["code"] == "INVALID_QUERY"
def test_tags_endpoints_invalid_bodies(http: requests.Session, api_base: str, seeded_asset: dict):
aid = seeded_asset["id"]
# Add with empty list
r1 = http.post(f"{api_base}/api/assets/{aid}/tags", json={"tags": []}, timeout=120)
b1 = r1.json()
assert r1.status_code == 400
assert b1["error"]["code"] == "INVALID_BODY"
# Remove with wrong type
r2 = http.delete(f"{api_base}/api/assets/{aid}/tags", json={"tags": [123]}, timeout=120)
b2 = r2.json()
assert r2.status_code == 400
assert b2["error"]["code"] == "INVALID_BODY"
# metadata_filter provided as JSON array should be rejected (must be object)
r3 = http.get(
api_base + "/api/assets",
params={"metadata_filter": json.dumps([{"x": 1}])},
timeout=120,
)
b3 = r3.json()
assert r3.status_code == 400
assert b3["error"]["code"] == "INVALID_QUERY"
def test_tags_prefix_treats_underscore_literal(
http,
api_base,
asset_factory,
make_asset_bytes,
):
"""'prefix' for /api/tags must treat '_' literally, not as a wildcard."""
base = f"pref_{uuid.uuid4().hex[:6]}"
tag_ok = f"{base}_ok" # should match prefix=f"{base}_"
tag_bad = f"{base}xok" # must NOT match if '_' is escaped
scope = f"tags-underscore-{uuid.uuid4().hex[:6]}"
asset_factory("t1.bin", ["input", "unit-tests", scope, tag_ok], {}, make_asset_bytes("t1", 512))
asset_factory("t2.bin", ["input", "unit-tests", scope, tag_bad], {}, make_asset_bytes("t2", 512))
r = http.get(api_base + "/api/tags", params={"include_zero": "false", "prefix": f"{base}_"}, timeout=120)
body = r.json()
assert r.status_code == 200, body
names = [t["name"] for t in body["tags"]]
assert tag_ok in names, f"Expected {tag_ok} to be returned for prefix '{base}_'"
assert tag_bad not in names, f"'{tag_bad}' must not match — '_' is not a wildcard"
assert body["total"] == 1

View File

@ -0,0 +1,281 @@
import json
import uuid
from concurrent.futures import ThreadPoolExecutor
import requests
import pytest
def test_upload_ok_duplicate_reference(http: requests.Session, api_base: str, make_asset_bytes):
name = "dup_a.safetensors"
tags = ["models", "checkpoints", "unit-tests", "alpha"]
meta = {"purpose": "dup"}
data = make_asset_bytes(name)
files = {"file": (name, data, "application/octet-stream")}
form = {"tags": json.dumps(tags), "name": name, "user_metadata": json.dumps(meta)}
r1 = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
a1 = r1.json()
assert r1.status_code == 201, a1
assert a1["created_new"] is True
# Second upload with the same data and name should return created_new == False and the same asset
files = {"file": (name, data, "application/octet-stream")}
form = {"tags": json.dumps(tags), "name": name, "user_metadata": json.dumps(meta)}
r2 = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
a2 = r2.json()
assert r2.status_code == 200, a2
assert a2["created_new"] is False
assert a2["asset_hash"] == a1["asset_hash"]
assert a2["id"] == a1["id"] # old reference
# Third upload with the same data but new name should return created_new == False and the new AssetReference
files = {"file": (name, data, "application/octet-stream")}
form = {"tags": json.dumps(tags), "name": name + "_d", "user_metadata": json.dumps(meta)}
r2 = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
a3 = r2.json()
assert r2.status_code == 200, a3
assert a3["created_new"] is False
assert a3["asset_hash"] == a1["asset_hash"]
assert a3["id"] != a1["id"] # old reference
def test_upload_fastpath_from_existing_hash_no_file(http: requests.Session, api_base: str):
# Seed a small file first
name = "fastpath_seed.safetensors"
tags = ["models", "checkpoints", "unit-tests"]
meta = {}
files = {"file": (name, b"B" * 1024, "application/octet-stream")}
form = {"tags": json.dumps(tags), "name": name, "user_metadata": json.dumps(meta)}
r1 = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
b1 = r1.json()
assert r1.status_code == 201, b1
h = b1["asset_hash"]
# Now POST /api/assets with only hash and no file
files = [
("hash", (None, h)),
("tags", (None, json.dumps(tags))),
("name", (None, "fastpath_copy.safetensors")),
("user_metadata", (None, json.dumps({"purpose": "copy"}))),
]
r2 = http.post(api_base + "/api/assets", files=files, timeout=120)
b2 = r2.json()
assert r2.status_code == 200, b2 # fast path returns 200 with created_new == False
assert b2["created_new"] is False
assert b2["asset_hash"] == h
def test_upload_fastpath_with_known_hash_and_file(
http: requests.Session, api_base: str
):
# Seed
files = {"file": ("seed.safetensors", b"C" * 128, "application/octet-stream")}
form = {"tags": json.dumps(["models", "checkpoints", "unit-tests", "fp"]), "name": "seed.safetensors", "user_metadata": json.dumps({})}
r1 = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
b1 = r1.json()
assert r1.status_code == 201, b1
h = b1["asset_hash"]
# Send both file and hash of existing content -> server must drain file and create from hash (200)
files = {"file": ("ignored.bin", b"ignored" * 10, "application/octet-stream")}
form = {"hash": h, "tags": json.dumps(["models", "checkpoints", "unit-tests", "fp"]), "name": "copy_from_hash.safetensors", "user_metadata": json.dumps({})}
r2 = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
b2 = r2.json()
assert r2.status_code == 200, b2
assert b2["created_new"] is False
assert b2["asset_hash"] == h
def test_upload_multiple_tags_fields_are_merged(http: requests.Session, api_base: str):
data = [
("tags", "models,checkpoints"),
("tags", json.dumps(["unit-tests", "alpha"])),
("name", "merge.safetensors"),
("user_metadata", json.dumps({"u": 1})),
]
files = {"file": ("merge.safetensors", b"B" * 256, "application/octet-stream")}
r1 = http.post(api_base + "/api/assets", data=data, files=files, timeout=120)
created = r1.json()
assert r1.status_code in (200, 201), created
aid = created["id"]
# Verify all tags are present on the resource
rg = http.get(f"{api_base}/api/assets/{aid}", timeout=120)
detail = rg.json()
assert rg.status_code == 200, detail
tags = set(detail["tags"])
assert {"models", "checkpoints", "unit-tests", "alpha"}.issubset(tags)
@pytest.mark.parametrize("root", ["input", "output"])
def test_concurrent_upload_identical_bytes_different_names(
root: str,
http: requests.Session,
api_base: str,
make_asset_bytes,
):
"""
Two concurrent uploads of identical bytes but different names.
Expect a single Asset (same hash), two AssetInfo rows, and exactly one created_new=True.
"""
scope = f"concupload-{uuid.uuid4().hex[:6]}"
name1, name2 = "cu_a.bin", "cu_b.bin"
data = make_asset_bytes("concurrent", 4096)
tags = [root, "unit-tests", scope]
def _do_upload(args):
url, form_data, files_data = args
with requests.Session() as s:
return s.post(url, data=form_data, files=files_data, timeout=120)
url = api_base + "/api/assets"
form1 = {"tags": json.dumps(tags), "name": name1, "user_metadata": json.dumps({})}
files1 = {"file": (name1, data, "application/octet-stream")}
form2 = {"tags": json.dumps(tags), "name": name2, "user_metadata": json.dumps({})}
files2 = {"file": (name2, data, "application/octet-stream")}
with ThreadPoolExecutor(max_workers=2) as executor:
futures = list(executor.map(_do_upload, [(url, form1, files1), (url, form2, files2)]))
r1, r2 = futures
b1, b2 = r1.json(), r2.json()
assert r1.status_code in (200, 201), b1
assert r2.status_code in (200, 201), b2
assert b1["asset_hash"] == b2["asset_hash"]
assert b1["id"] != b2["id"]
created_flags = sorted([bool(b1.get("created_new")), bool(b2.get("created_new"))])
assert created_flags == [False, True]
rl = http.get(
api_base + "/api/assets",
params={"include_tags": f"unit-tests,{scope}", "sort": "name"},
timeout=120,
)
bl = rl.json()
assert rl.status_code == 200, bl
names = [a["name"] for a in bl.get("assets", [])]
assert set([name1, name2]).issubset(names)
def test_create_from_hash_endpoint_404(http: requests.Session, api_base: str):
payload = {
"hash": "blake3:" + "0" * 64,
"name": "nonexistent.bin",
"tags": ["models", "checkpoints", "unit-tests"],
}
r = http.post(api_base + "/api/assets/from-hash", json=payload, timeout=120)
body = r.json()
assert r.status_code == 404
assert body["error"]["code"] == "ASSET_NOT_FOUND"
def test_upload_zero_byte_rejected(http: requests.Session, api_base: str):
files = {"file": ("empty.safetensors", b"", "application/octet-stream")}
form = {"tags": json.dumps(["models", "checkpoints", "unit-tests", "edge"]), "name": "empty.safetensors", "user_metadata": json.dumps({})}
r = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
body = r.json()
assert r.status_code == 400
assert body["error"]["code"] == "EMPTY_UPLOAD"
def test_upload_invalid_root_tag_rejected(http: requests.Session, api_base: str):
files = {"file": ("badroot.bin", b"A" * 64, "application/octet-stream")}
form = {"tags": json.dumps(["not-a-root", "whatever"]), "name": "badroot.bin", "user_metadata": json.dumps({})}
r = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
body = r.json()
assert r.status_code == 400
assert body["error"]["code"] == "INVALID_BODY"
def test_upload_user_metadata_must_be_json(http: requests.Session, api_base: str):
files = {"file": ("badmeta.bin", b"A" * 128, "application/octet-stream")}
form = {"tags": json.dumps(["models", "checkpoints", "unit-tests", "edge"]), "name": "badmeta.bin", "user_metadata": "{not json}"}
r = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
body = r.json()
assert r.status_code == 400
assert body["error"]["code"] == "INVALID_BODY"
def test_upload_requires_multipart(http: requests.Session, api_base: str):
r = http.post(api_base + "/api/assets", json={"foo": "bar"}, timeout=120)
body = r.json()
assert r.status_code == 415
assert body["error"]["code"] == "UNSUPPORTED_MEDIA_TYPE"
def test_upload_missing_file_and_hash(http: requests.Session, api_base: str):
files = [
("tags", (None, json.dumps(["models", "checkpoints", "unit-tests"]))),
("name", (None, "x.safetensors")),
]
r = http.post(api_base + "/api/assets", files=files, timeout=120)
body = r.json()
assert r.status_code == 400
assert body["error"]["code"] == "MISSING_FILE"
def test_upload_models_unknown_category(http: requests.Session, api_base: str):
files = {"file": ("m.safetensors", b"A" * 128, "application/octet-stream")}
form = {"tags": json.dumps(["models", "no_such_category", "unit-tests"]), "name": "m.safetensors"}
r = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
body = r.json()
assert r.status_code == 400
assert body["error"]["code"] == "INVALID_BODY"
assert body["error"]["message"].startswith("unknown models category")
def test_upload_models_requires_category(http: requests.Session, api_base: str):
files = {"file": ("nocat.safetensors", b"A" * 64, "application/octet-stream")}
form = {"tags": json.dumps(["models"]), "name": "nocat.safetensors", "user_metadata": json.dumps({})}
r = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
body = r.json()
assert r.status_code == 400
assert body["error"]["code"] == "INVALID_BODY"
def test_upload_tags_traversal_guard(http: requests.Session, api_base: str):
files = {"file": ("evil.safetensors", b"A" * 256, "application/octet-stream")}
form = {"tags": json.dumps(["models", "checkpoints", "unit-tests", "..", "zzz"]), "name": "evil.safetensors"}
r = http.post(api_base + "/api/assets", data=form, files=files, timeout=120)
body = r.json()
assert r.status_code == 400
assert body["error"]["code"] in ("BAD_REQUEST", "INVALID_BODY")
@pytest.mark.parametrize("root", ["input", "output"])
def test_duplicate_upload_same_display_name_does_not_clobber(
root: str,
http: requests.Session,
api_base: str,
asset_factory,
make_asset_bytes,
):
"""
Two uploads use the same tags and the same display name but different bytes.
With hash-based filenames, they must NOT overwrite each other. Both assets
remain accessible and serve their original content.
"""
scope = f"dup-path-{uuid.uuid4().hex[:6]}"
display_name = "same_display.bin"
d1 = make_asset_bytes(scope + "-v1", 1536)
d2 = make_asset_bytes(scope + "-v2", 2048)
tags = [root, "unit-tests", scope]
first = asset_factory(display_name, tags, {}, d1)
second = asset_factory(display_name, tags, {}, d2)
assert first["id"] != second["id"]
assert first["asset_hash"] != second["asset_hash"] # different content
assert first["name"] == second["name"] == display_name
# Both must be independently retrievable
r1 = http.get(f"{api_base}/api/assets/{first['id']}/content", timeout=120)
b1 = r1.content
assert r1.status_code == 200
assert b1 == d1
r2 = http.get(f"{api_base}/api/assets/{second['id']}/content", timeout=120)
b2 = r2.content
assert r2.status_code == 200
assert b2 == d2