mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2026-03-28 20:43:32 +08:00
- Make enqueue_enrich atomic by moving start_enrich call inside self._lock, preventing pending work from being lost when a scan finishes between the start attempt and the queue write. - Call ensure_tags_exist before batch_insert_seed_assets in ingest_existing_file to avoid FK violations on asset_reference_tags. - Fix test_enrich helper to use real file mtime instead of hardcoded value so the optimistic staleness guard in enrich_asset passes correctly. - Add db_engine_fk fixture (SQLite with PRAGMA foreign_keys=ON) and a regression test proving ingest_existing_file seeds Tag rows before inserting reference tags.
283 lines
9.6 KiB
Python
283 lines
9.6 KiB
Python
"""Tests for ingest services."""
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from sqlalchemy.orm import Session as SASession, Session
|
|
|
|
from app.assets.database.models import Asset, AssetReference, AssetReferenceTag, Tag
|
|
from app.assets.database.queries import get_reference_tags
|
|
from app.assets.services.ingest import (
|
|
_ingest_file_from_path,
|
|
_register_existing_asset,
|
|
ingest_existing_file,
|
|
)
|
|
|
|
|
|
class TestIngestFileFromPath:
|
|
def test_creates_asset_and_reference(self, mock_create_session, temp_dir: Path, session: Session):
|
|
file_path = temp_dir / "test_file.bin"
|
|
file_path.write_bytes(b"test content")
|
|
|
|
result = _ingest_file_from_path(
|
|
abs_path=str(file_path),
|
|
asset_hash="blake3:abc123",
|
|
size_bytes=12,
|
|
mtime_ns=1234567890000000000,
|
|
mime_type="application/octet-stream",
|
|
)
|
|
|
|
assert result.asset_created is True
|
|
assert result.ref_created is True
|
|
assert result.reference_id is not None
|
|
|
|
# Verify DB state
|
|
assets = session.query(Asset).all()
|
|
assert len(assets) == 1
|
|
assert assets[0].hash == "blake3:abc123"
|
|
|
|
refs = session.query(AssetReference).all()
|
|
assert len(refs) == 1
|
|
assert refs[0].file_path == str(file_path)
|
|
|
|
def test_creates_reference_when_name_provided(self, mock_create_session, temp_dir: Path, session: Session):
|
|
file_path = temp_dir / "model.safetensors"
|
|
file_path.write_bytes(b"model data")
|
|
|
|
result = _ingest_file_from_path(
|
|
abs_path=str(file_path),
|
|
asset_hash="blake3:def456",
|
|
size_bytes=10,
|
|
mtime_ns=1234567890000000000,
|
|
mime_type="application/octet-stream",
|
|
info_name="My Model",
|
|
owner_id="user1",
|
|
)
|
|
|
|
assert result.asset_created is True
|
|
assert result.reference_id is not None
|
|
|
|
ref = session.query(AssetReference).first()
|
|
assert ref is not None
|
|
assert ref.name == "My Model"
|
|
assert ref.owner_id == "user1"
|
|
|
|
def test_creates_tags_when_provided(self, mock_create_session, temp_dir: Path, session: Session):
|
|
file_path = temp_dir / "tagged.bin"
|
|
file_path.write_bytes(b"data")
|
|
|
|
result = _ingest_file_from_path(
|
|
abs_path=str(file_path),
|
|
asset_hash="blake3:ghi789",
|
|
size_bytes=4,
|
|
mtime_ns=1234567890000000000,
|
|
info_name="Tagged Asset",
|
|
tags=["models", "checkpoints"],
|
|
)
|
|
|
|
assert result.reference_id is not None
|
|
|
|
# Verify tags were created and linked
|
|
tags = session.query(Tag).all()
|
|
tag_names = {t.name for t in tags}
|
|
assert "models" in tag_names
|
|
assert "checkpoints" in tag_names
|
|
|
|
ref_tags = get_reference_tags(session, reference_id=result.reference_id)
|
|
assert set(ref_tags) == {"models", "checkpoints"}
|
|
|
|
def test_idempotent_upsert(self, mock_create_session, temp_dir: Path, session: Session):
|
|
file_path = temp_dir / "dup.bin"
|
|
file_path.write_bytes(b"content")
|
|
|
|
# First ingest
|
|
r1 = _ingest_file_from_path(
|
|
abs_path=str(file_path),
|
|
asset_hash="blake3:repeat",
|
|
size_bytes=7,
|
|
mtime_ns=1234567890000000000,
|
|
)
|
|
assert r1.asset_created is True
|
|
|
|
# Second ingest with same hash - should update, not create
|
|
r2 = _ingest_file_from_path(
|
|
abs_path=str(file_path),
|
|
asset_hash="blake3:repeat",
|
|
size_bytes=7,
|
|
mtime_ns=1234567890000000001, # different mtime
|
|
)
|
|
assert r2.asset_created is False
|
|
assert r2.ref_created is False
|
|
assert r2.ref_updated is True
|
|
|
|
# Still only one asset
|
|
assets = session.query(Asset).all()
|
|
assert len(assets) == 1
|
|
|
|
def test_validates_preview_id(self, mock_create_session, temp_dir: Path, session: Session):
|
|
file_path = temp_dir / "with_preview.bin"
|
|
file_path.write_bytes(b"data")
|
|
|
|
# Create a preview asset and reference
|
|
preview_asset = Asset(hash="blake3:preview", size_bytes=100)
|
|
session.add(preview_asset)
|
|
session.flush()
|
|
from app.assets.helpers import get_utc_now
|
|
now = get_utc_now()
|
|
preview_ref = AssetReference(
|
|
asset_id=preview_asset.id, name="preview.png", owner_id="",
|
|
created_at=now, updated_at=now, last_access_time=now,
|
|
)
|
|
session.add(preview_ref)
|
|
session.commit()
|
|
preview_id = preview_ref.id
|
|
|
|
result = _ingest_file_from_path(
|
|
abs_path=str(file_path),
|
|
asset_hash="blake3:main",
|
|
size_bytes=4,
|
|
mtime_ns=1234567890000000000,
|
|
info_name="With Preview",
|
|
preview_id=preview_id,
|
|
)
|
|
|
|
assert result.reference_id is not None
|
|
ref = session.query(AssetReference).filter_by(id=result.reference_id).first()
|
|
assert ref.preview_id == preview_id
|
|
|
|
def test_invalid_preview_id_is_cleared(self, mock_create_session, temp_dir: Path, session: Session):
|
|
file_path = temp_dir / "bad_preview.bin"
|
|
file_path.write_bytes(b"data")
|
|
|
|
result = _ingest_file_from_path(
|
|
abs_path=str(file_path),
|
|
asset_hash="blake3:badpreview",
|
|
size_bytes=4,
|
|
mtime_ns=1234567890000000000,
|
|
info_name="Bad Preview",
|
|
preview_id="nonexistent-uuid",
|
|
)
|
|
|
|
assert result.reference_id is not None
|
|
ref = session.query(AssetReference).filter_by(id=result.reference_id).first()
|
|
assert ref.preview_id is None
|
|
|
|
|
|
class TestRegisterExistingAsset:
|
|
def test_creates_reference_for_existing_asset(self, mock_create_session, session: Session):
|
|
# Create existing asset
|
|
asset = Asset(hash="blake3:existing", size_bytes=1024, mime_type="image/png")
|
|
session.add(asset)
|
|
session.commit()
|
|
|
|
result = _register_existing_asset(
|
|
asset_hash="blake3:existing",
|
|
name="Registered Asset",
|
|
user_metadata={"key": "value"},
|
|
tags=["models"],
|
|
)
|
|
|
|
assert result.created is True
|
|
assert "models" in result.tags
|
|
|
|
# Verify by re-fetching from DB
|
|
session.expire_all()
|
|
refs = session.query(AssetReference).filter_by(name="Registered Asset").all()
|
|
assert len(refs) == 1
|
|
|
|
def test_creates_new_reference_even_with_same_name(self, mock_create_session, session: Session):
|
|
# Create asset and reference
|
|
asset = Asset(hash="blake3:withref", size_bytes=512)
|
|
session.add(asset)
|
|
session.flush()
|
|
|
|
from app.assets.helpers import get_utc_now
|
|
ref = AssetReference(
|
|
owner_id="",
|
|
name="Existing Ref",
|
|
asset_id=asset.id,
|
|
created_at=get_utc_now(),
|
|
updated_at=get_utc_now(),
|
|
last_access_time=get_utc_now(),
|
|
)
|
|
session.add(ref)
|
|
session.flush()
|
|
ref_id = ref.id
|
|
session.commit()
|
|
|
|
result = _register_existing_asset(
|
|
asset_hash="blake3:withref",
|
|
name="Existing Ref",
|
|
owner_id="",
|
|
)
|
|
|
|
# Multiple files with same name are allowed
|
|
assert result.created is True
|
|
|
|
# Verify two AssetReferences exist for this name
|
|
session.expire_all()
|
|
refs = session.query(AssetReference).filter_by(name="Existing Ref").all()
|
|
assert len(refs) == 2
|
|
assert ref_id in [r.id for r in refs]
|
|
|
|
def test_raises_for_nonexistent_hash(self, mock_create_session):
|
|
with pytest.raises(ValueError, match="No asset with hash"):
|
|
_register_existing_asset(
|
|
asset_hash="blake3:doesnotexist",
|
|
name="Fail",
|
|
)
|
|
|
|
def test_applies_tags_to_new_reference(self, mock_create_session, session: Session):
|
|
asset = Asset(hash="blake3:tagged", size_bytes=256)
|
|
session.add(asset)
|
|
session.commit()
|
|
|
|
result = _register_existing_asset(
|
|
asset_hash="blake3:tagged",
|
|
name="Tagged Ref",
|
|
tags=["alpha", "beta"],
|
|
)
|
|
|
|
assert result.created is True
|
|
assert set(result.tags) == {"alpha", "beta"}
|
|
|
|
|
|
class TestIngestExistingFileTagFK:
|
|
"""Regression: ingest_existing_file must seed Tag rows before inserting
|
|
AssetReferenceTag rows, otherwise FK enforcement raises IntegrityError."""
|
|
|
|
def test_creates_tag_rows_before_reference_tags(self, db_engine_fk, temp_dir: Path):
|
|
"""With PRAGMA foreign_keys=ON, tags must exist in the tags table
|
|
before they can be referenced in asset_reference_tags."""
|
|
|
|
@contextmanager
|
|
def _create_session():
|
|
with SASession(db_engine_fk) as sess:
|
|
yield sess
|
|
|
|
file_path = temp_dir / "output.png"
|
|
file_path.write_bytes(b"image data")
|
|
|
|
with patch("app.assets.services.ingest.create_session", _create_session), \
|
|
patch(
|
|
"app.assets.services.ingest.get_name_and_tags_from_asset_path",
|
|
return_value=("output.png", ["output"]),
|
|
):
|
|
result = ingest_existing_file(
|
|
abs_path=str(file_path),
|
|
extra_tags=["my-job"],
|
|
)
|
|
|
|
assert result is True
|
|
|
|
with SASession(db_engine_fk) as sess:
|
|
tag_names = {t.name for t in sess.query(Tag).all()}
|
|
assert "output" in tag_names
|
|
assert "my-job" in tag_names
|
|
|
|
ref_tags = sess.query(AssetReferenceTag).all()
|
|
ref_tag_names = {rt.tag_name for rt in ref_tags}
|
|
assert "output" in ref_tag_names
|
|
assert "my-job" in ref_tag_names
|