refactor: convert asset tests to table-driven parametrized tests

- test_metadata.py: consolidate 7 filter type classes into parametrized tests
- test_asset.py: parametrize exists, get, and upsert test cases
- test_cache_state.py: parametrize upsert and delete scenarios
- test_crud.py: consolidate error response tests into single parametrized test
- test_list_filter.py: consolidate invalid query tests

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Luke Mino-Altherr 2026-02-03 19:50:16 -08:00
parent 3311b13740
commit 6b1f9f7755
5 changed files with 275 additions and 286 deletions

View File

@ -1,4 +1,5 @@
import uuid
import pytest
from sqlalchemy.orm import Session
from app.assets.database.models import Asset
@ -11,124 +12,102 @@ from app.assets.database.queries import (
class TestAssetExistsByHash:
def test_returns_false_for_nonexistent(self, session: Session):
assert asset_exists_by_hash(session, asset_hash="nonexistent") is False
@pytest.mark.parametrize(
"setup_hash,query_hash,expected",
[
(None, "nonexistent", False), # No asset exists
("blake3:abc123", "blake3:abc123", True), # Asset exists with matching hash
(None, "", False), # Null hash in DB doesn't match empty string
],
ids=["nonexistent", "existing", "null_hash_no_match"],
)
def test_exists_by_hash(self, session: Session, setup_hash, query_hash, expected):
if setup_hash is not None or setup_hash is None:
# Create asset with given hash (including None for null hash test)
if setup_hash is not None or query_hash == "":
asset = Asset(hash=setup_hash, size_bytes=100)
session.add(asset)
session.commit()
def test_returns_true_for_existing(self, session: Session):
asset = Asset(hash="blake3:abc123", size_bytes=100)
session.add(asset)
session.commit()
assert asset_exists_by_hash(session, asset_hash="blake3:abc123") is True
def test_does_not_match_null_hash(self, session: Session):
asset = Asset(hash=None, size_bytes=100)
session.add(asset)
session.commit()
assert asset_exists_by_hash(session, asset_hash="") is False
assert asset_exists_by_hash(session, asset_hash=query_hash) is expected
class TestGetAssetByHash:
def test_returns_none_for_nonexistent(self, session: Session):
assert get_asset_by_hash(session, asset_hash="nonexistent") is None
@pytest.mark.parametrize(
"setup_hash,query_hash,should_find",
[
(None, "nonexistent", False),
("blake3:def456", "blake3:def456", True),
],
ids=["nonexistent", "existing"],
)
def test_get_by_hash(self, session: Session, setup_hash, query_hash, should_find):
if setup_hash is not None:
asset = Asset(hash=setup_hash, size_bytes=200, mime_type="image/png")
session.add(asset)
session.commit()
def test_returns_asset_for_existing(self, session: Session):
asset = Asset(hash="blake3:def456", size_bytes=200, mime_type="image/png")
session.add(asset)
session.commit()
result = get_asset_by_hash(session, asset_hash="blake3:def456")
assert result is not None
assert result.id == asset.id
assert result.size_bytes == 200
assert result.mime_type == "image/png"
result = get_asset_by_hash(session, asset_hash=query_hash)
if should_find:
assert result is not None
assert result.size_bytes == 200
assert result.mime_type == "image/png"
else:
assert result is None
class TestUpsertAsset:
def test_creates_new_asset(self, session: Session):
@pytest.mark.parametrize(
"first_size,first_mime,second_size,second_mime,expect_created,expect_updated,final_size,final_mime",
[
# New asset creation
(None, None, 1024, "application/octet-stream", True, False, 1024, "application/octet-stream"),
# Existing asset, same values - no update
(500, "text/plain", 500, "text/plain", False, False, 500, "text/plain"),
# Existing asset with size 0, update with new values
(0, None, 2048, "image/png", False, True, 2048, "image/png"),
# Existing asset, second call with size 0 - no update
(1000, None, 0, None, False, False, 1000, None),
],
ids=["new_asset", "existing_no_change", "update_from_zero", "zero_size_no_update"],
)
def test_upsert_scenarios(
self,
session: Session,
first_size,
first_mime,
second_size,
second_mime,
expect_created,
expect_updated,
final_size,
final_mime,
):
asset_hash = f"blake3:test_{first_size}_{second_size}"
# First upsert (if first_size is not None, we're testing the second call)
if first_size is not None:
upsert_asset(
session,
asset_hash=asset_hash,
size_bytes=first_size,
mime_type=first_mime,
)
session.commit()
# The upsert call we're testing
asset, created, updated = upsert_asset(
session,
asset_hash="blake3:newasset",
size_bytes=1024,
mime_type="application/octet-stream",
asset_hash=asset_hash,
size_bytes=second_size,
mime_type=second_mime,
)
session.commit()
assert created is True
assert updated is False
assert asset.hash == "blake3:newasset"
assert asset.size_bytes == 1024
assert asset.mime_type == "application/octet-stream"
def test_returns_existing_asset_without_update(self, session: Session):
# First insert
asset1, created1, _ = upsert_asset(
session,
asset_hash="blake3:existing",
size_bytes=500,
mime_type="text/plain",
)
session.commit()
# Second upsert with same values
asset2, created2, updated2 = upsert_asset(
session,
asset_hash="blake3:existing",
size_bytes=500,
mime_type="text/plain",
)
session.commit()
assert created1 is True
assert created2 is False
assert updated2 is False
assert asset1.id == asset2.id
def test_updates_existing_asset_with_new_values(self, session: Session):
# First insert with size 0
asset1, created1, _ = upsert_asset(
session,
asset_hash="blake3:toupdate",
size_bytes=0,
)
session.commit()
# Second upsert with new size and mime type
asset2, created2, updated2 = upsert_asset(
session,
asset_hash="blake3:toupdate",
size_bytes=2048,
mime_type="image/png",
)
session.commit()
assert created1 is True
assert created2 is False
assert updated2 is True
assert asset2.size_bytes == 2048
assert asset2.mime_type == "image/png"
def test_does_not_update_if_size_zero(self, session: Session):
# First insert
asset1, _, _ = upsert_asset(
session,
asset_hash="blake3:keepsize",
size_bytes=1000,
)
session.commit()
# Second upsert with size 0 should not change size
asset2, created2, updated2 = upsert_asset(
session,
asset_hash="blake3:keepsize",
size_bytes=0,
)
session.commit()
assert created2 is False
assert updated2 is False
assert asset2.size_bytes == 1000
assert created is expect_created
assert updated is expect_updated
assert asset.size_bytes == final_size
assert asset.mime_type == final_mime
class TestBulkInsertAssets:

View File

@ -1,4 +1,5 @@
"""Tests for cache_state query functions."""
import pytest
from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetCacheState, AssetInfo
@ -133,46 +134,39 @@ class TestSelectBestLivePathWithMocking:
class TestUpsertCacheState:
def test_creates_new_state(self, session: Session):
@pytest.mark.parametrize(
"initial_mtime,second_mtime,expect_created,expect_updated,final_mtime",
[
# New state creation
(None, 12345, True, False, 12345),
# Existing state, same mtime - no update
(100, 100, False, False, 100),
# Existing state, different mtime - update
(100, 200, False, True, 200),
],
ids=["new_state", "existing_no_change", "existing_update_mtime"],
)
def test_upsert_scenarios(
self, session: Session, initial_mtime, second_mtime, expect_created, expect_updated, final_mtime
):
asset = _make_asset(session, "hash1")
file_path = f"/path_{initial_mtime}_{second_mtime}.bin"
# Create initial state if needed
if initial_mtime is not None:
upsert_cache_state(session, asset_id=asset.id, file_path=file_path, mtime_ns=initial_mtime)
session.commit()
# The upsert call we're testing
created, updated = upsert_cache_state(
session, asset_id=asset.id, file_path="/new/path.bin", mtime_ns=12345
session, asset_id=asset.id, file_path=file_path, mtime_ns=second_mtime
)
session.commit()
assert created is True
assert updated is False
state = session.query(AssetCacheState).filter_by(file_path="/new/path.bin").one()
assert state.asset_id == asset.id
assert state.mtime_ns == 12345
def test_returns_existing_without_update(self, session: Session):
asset = _make_asset(session, "hash1")
upsert_cache_state(session, asset_id=asset.id, file_path="/existing.bin", mtime_ns=100)
session.commit()
created, updated = upsert_cache_state(
session, asset_id=asset.id, file_path="/existing.bin", mtime_ns=100
)
session.commit()
assert created is False
assert updated is False
def test_updates_existing_with_new_mtime(self, session: Session):
asset = _make_asset(session, "hash1")
upsert_cache_state(session, asset_id=asset.id, file_path="/update.bin", mtime_ns=100)
session.commit()
created, updated = upsert_cache_state(
session, asset_id=asset.id, file_path="/update.bin", mtime_ns=200
)
session.commit()
assert created is False
assert updated is True
state = session.query(AssetCacheState).filter_by(file_path="/update.bin").one()
assert state.mtime_ns == 200
assert created is expect_created
assert updated is expect_updated
state = session.query(AssetCacheState).filter_by(file_path=file_path).one()
assert state.mtime_ns == final_mtime
class TestDeleteCacheStatesOutsidePrefixes:
@ -323,26 +317,35 @@ class TestDeleteCacheStatesByIds:
class TestDeleteOrphanedSeedAsset:
def test_deletes_seed_asset_and_infos(self, session: Session):
asset = _make_asset(session, hash_val=None)
now = get_utc_now()
info = AssetInfo(
owner_id="", name="test", asset_id=asset.id,
created_at=now, updated_at=now, last_access_time=now
)
session.add(info)
session.commit()
@pytest.mark.parametrize(
"create_asset,expected_deleted,expected_count",
[
(True, True, 0), # Existing asset gets deleted
(False, False, 0), # Nonexistent returns False
],
ids=["deletes_existing", "nonexistent_returns_false"],
)
def test_delete_orphaned_seed_asset(
self, session: Session, create_asset, expected_deleted, expected_count
):
asset_id = "nonexistent-id"
if create_asset:
asset = _make_asset(session, hash_val=None)
asset_id = asset.id
now = get_utc_now()
info = AssetInfo(
owner_id="", name="test", asset_id=asset.id,
created_at=now, updated_at=now, last_access_time=now
)
session.add(info)
session.commit()
deleted = delete_orphaned_seed_asset(session, asset.id)
session.commit()
deleted = delete_orphaned_seed_asset(session, asset_id)
if create_asset:
session.commit()
assert deleted is True
assert session.query(Asset).count() == 0
assert session.query(AssetInfo).count() == 0
def test_returns_false_for_nonexistent(self, session: Session):
deleted = delete_orphaned_seed_asset(session, "nonexistent-id")
assert deleted is False
assert deleted is expected_deleted
assert session.query(Asset).count() == expected_count
class TestBulkInsertCacheStatesIgnoreConflicts:

View File

@ -1,4 +1,5 @@
"""Tests for metadata filtering logic in asset_info queries."""
import pytest
from sqlalchemy.orm import Session
from app.assets.database.models import Asset, AssetInfo, AssetInfoMeta
@ -51,94 +52,94 @@ def _make_asset_info(
return info
class TestMetadataFilterString:
def test_filter_by_string_value(self, session: Session):
class TestMetadataFilterByType:
"""Table-driven tests for metadata filtering by different value types."""
@pytest.mark.parametrize(
"match_meta,nomatch_meta,filter_key,filter_val",
[
# String matching
({"category": "models"}, {"category": "images"}, "category", "models"),
# Integer matching
({"epoch": 5}, {"epoch": 10}, "epoch", 5),
# Float matching
({"score": 0.95}, {"score": 0.5}, "score", 0.95),
# Boolean True matching
({"enabled": True}, {"enabled": False}, "enabled", True),
# Boolean False matching
({"enabled": False}, {"enabled": True}, "enabled", False),
],
ids=["string", "int", "float", "bool_true", "bool_false"],
)
def test_filter_matches_correct_value(
self, session: Session, match_meta, nomatch_meta, filter_key, filter_val
):
asset = _make_asset(session, "hash1")
_make_asset_info(session, asset, "match", {"category": "models"})
_make_asset_info(session, asset, "nomatch", {"category": "images"})
_make_asset_info(session, asset, "match", match_meta)
_make_asset_info(session, asset, "nomatch", nomatch_meta)
session.commit()
infos, _, total = list_asset_infos_page(session, metadata_filter={"category": "models"})
infos, _, total = list_asset_infos_page(
session, metadata_filter={filter_key: filter_val}
)
assert total == 1
assert infos[0].name == "match"
def test_filter_by_string_no_match(self, session: Session):
@pytest.mark.parametrize(
"stored_meta,filter_key,filter_val",
[
# String no match
({"category": "models"}, "category", "other"),
# Int no match
({"epoch": 5}, "epoch", 99),
# Float no match
({"score": 0.5}, "score", 0.99),
],
ids=["string_no_match", "int_no_match", "float_no_match"],
)
def test_filter_returns_empty_when_no_match(
self, session: Session, stored_meta, filter_key, filter_val
):
asset = _make_asset(session, "hash1")
_make_asset_info(session, asset, "item", {"category": "models"})
_make_asset_info(session, asset, "item", stored_meta)
session.commit()
infos, _, total = list_asset_infos_page(session, metadata_filter={"category": "other"})
infos, _, total = list_asset_infos_page(
session, metadata_filter={filter_key: filter_val}
)
assert total == 0
class TestMetadataFilterNumeric:
def test_filter_by_int_value(self, session: Session):
asset = _make_asset(session, "hash1")
_make_asset_info(session, asset, "epoch5", {"epoch": 5})
_make_asset_info(session, asset, "epoch10", {"epoch": 10})
session.commit()
infos, _, total = list_asset_infos_page(session, metadata_filter={"epoch": 5})
assert total == 1
assert infos[0].name == "epoch5"
def test_filter_by_float_value(self, session: Session):
asset = _make_asset(session, "hash1")
_make_asset_info(session, asset, "high", {"score": 0.95})
_make_asset_info(session, asset, "low", {"score": 0.5})
session.commit()
infos, _, total = list_asset_infos_page(session, metadata_filter={"score": 0.95})
assert total == 1
assert infos[0].name == "high"
class TestMetadataFilterBoolean:
def test_filter_by_true(self, session: Session):
asset = _make_asset(session, "hash1")
_make_asset_info(session, asset, "active", {"enabled": True})
_make_asset_info(session, asset, "inactive", {"enabled": False})
session.commit()
infos, _, total = list_asset_infos_page(session, metadata_filter={"enabled": True})
assert total == 1
assert infos[0].name == "active"
def test_filter_by_false(self, session: Session):
asset = _make_asset(session, "hash1")
_make_asset_info(session, asset, "active", {"enabled": True})
_make_asset_info(session, asset, "inactive", {"enabled": False})
session.commit()
infos, _, total = list_asset_infos_page(session, metadata_filter={"enabled": False})
assert total == 1
assert infos[0].name == "inactive"
class TestMetadataFilterNull:
def test_filter_by_null_matches_missing_key(self, session: Session):
"""Tests for null/missing key filtering."""
@pytest.mark.parametrize(
"match_name,match_meta,nomatch_name,nomatch_meta,filter_key",
[
# Null matches missing key
("missing_key", {}, "has_key", {"optional": "value"}, "optional"),
# Null matches explicit null
("explicit_null", {"nullable": None}, "has_value", {"nullable": "present"}, "nullable"),
],
ids=["missing_key", "explicit_null"],
)
def test_null_filter_matches(
self, session: Session, match_name, match_meta, nomatch_name, nomatch_meta, filter_key
):
asset = _make_asset(session, "hash1")
_make_asset_info(session, asset, "has_key", {"optional": "value"})
_make_asset_info(session, asset, "missing_key", {})
_make_asset_info(session, asset, match_name, match_meta)
_make_asset_info(session, asset, nomatch_name, nomatch_meta)
session.commit()
infos, _, total = list_asset_infos_page(session, metadata_filter={"optional": None})
infos, _, total = list_asset_infos_page(session, metadata_filter={filter_key: None})
assert total == 1
assert infos[0].name == "missing_key"
def test_filter_by_null_matches_explicit_null(self, session: Session):
asset = _make_asset(session, "hash1")
_make_asset_info(session, asset, "explicit_null", {"nullable": None})
_make_asset_info(session, asset, "has_value", {"nullable": "present"})
session.commit()
infos, _, total = list_asset_infos_page(session, metadata_filter={"nullable": None})
assert total == 1
assert infos[0].name == "explicit_null"
assert infos[0].name == match_name
class TestMetadataFilterList:
def test_filter_by_list_or(self, session: Session):
"""Tests for list-based (OR) filtering."""
def test_filter_by_list_matches_any(self, session: Session):
"""List values should match ANY of the values (OR)."""
asset = _make_asset(session, "hash1")
_make_asset_info(session, asset, "cat_a", {"category": "a"})
@ -153,7 +154,9 @@ class TestMetadataFilterList:
class TestMetadataFilterMultipleKeys:
def test_multiple_keys_and(self, session: Session):
"""Tests for multiple filter keys (AND semantics)."""
def test_multiple_keys_must_all_match(self, session: Session):
"""Multiple keys should ALL match (AND)."""
asset = _make_asset(session, "hash1")
_make_asset_info(session, asset, "match", {"type": "model", "version": 2})
@ -169,6 +172,8 @@ class TestMetadataFilterMultipleKeys:
class TestMetadataFilterEmptyDict:
"""Tests for empty filter behavior."""
def test_empty_filter_returns_all(self, session: Session):
asset = _make_asset(session, "hash1")
_make_asset_info(session, asset, "a", {"key": "val"})

View File

@ -126,42 +126,52 @@ def test_head_asset_bad_hash_returns_400_and_no_body(http: requests.Session, api
assert body == b""
def test_delete_nonexistent_returns_404(http: requests.Session, api_base: str):
bogus = str(uuid.uuid4())
r = http.delete(f"{api_base}/api/assets/{bogus}", timeout=120)
@pytest.mark.parametrize(
"method,endpoint_template,payload,expected_status,error_code",
[
# Delete nonexistent asset
("delete", "/api/assets/{uuid}", None, 404, "ASSET_NOT_FOUND"),
# Bad hash algorithm in from-hash
(
"post",
"/api/assets/from-hash",
{"hash": "sha256:" + "0" * 64, "name": "x.bin", "tags": ["models", "checkpoints", "unit-tests"]},
400,
"INVALID_BODY",
),
# Get with bad UUID format
("get", "/api/assets/not-a-uuid", None, 404, None),
# Get content with bad UUID format
("get", "/api/assets/not-a-uuid/content", None, 404, None),
],
ids=["delete_nonexistent", "bad_hash_algorithm", "get_bad_uuid", "content_bad_uuid"],
)
def test_error_responses(
http: requests.Session, api_base: str, method, endpoint_template, payload, expected_status, error_code
):
# Replace {uuid} placeholder with a random UUID for delete test
endpoint = endpoint_template.replace("{uuid}", str(uuid.uuid4()))
url = f"{api_base}{endpoint}"
if method == "get":
r = http.get(url, timeout=120)
elif method == "post":
r = http.post(url, json=payload, timeout=120)
elif method == "delete":
r = http.delete(url, timeout=120)
assert r.status_code == expected_status
if error_code:
body = r.json()
assert body["error"]["code"] == error_code
def test_create_from_hash_invalid_json(http: requests.Session, api_base: str):
"""Invalid JSON body requires special handling (data= instead of json=)."""
r = http.post(f"{api_base}/api/assets/from-hash", data=b"{not json}", timeout=120)
body = r.json()
assert r.status_code == 404
assert body["error"]["code"] == "ASSET_NOT_FOUND"
def test_create_from_hash_invalids(http: requests.Session, api_base: str):
# Bad hash algorithm
bad = {
"hash": "sha256:" + "0" * 64,
"name": "x.bin",
"tags": ["models", "checkpoints", "unit-tests"],
}
r1 = http.post(f"{api_base}/api/assets/from-hash", json=bad, timeout=120)
b1 = r1.json()
assert r1.status_code == 400
assert b1["error"]["code"] == "INVALID_BODY"
# Invalid JSON body
r2 = http.post(f"{api_base}/api/assets/from-hash", data=b"{not json}", timeout=120)
b2 = r2.json()
assert r2.status_code == 400
assert b2["error"]["code"] == "INVALID_JSON"
def test_get_update_download_bad_ids(http: requests.Session, api_base: str):
# All endpoints should be not found, as we UUID regex directly in the route definition.
bad_id = "not-a-uuid"
r1 = http.get(f"{api_base}/api/assets/{bad_id}", timeout=120)
assert r1.status_code == 404
r3 = http.get(f"{api_base}/api/assets/{bad_id}/content", timeout=120)
assert r3.status_code == 404
assert r.status_code == 400
assert body["error"]["code"] == "INVALID_JSON"
def test_update_requires_at_least_one_field(http: requests.Session, api_base: str, seeded_asset: dict):

View File

@ -1,6 +1,7 @@
import time
import uuid
import pytest
import requests
@ -283,30 +284,21 @@ def test_list_assets_offset_beyond_total_and_limit_boundary(http, api_base, asse
assert b2["has_more"] is False
def test_list_assets_offset_negative_and_limit_nonint_rejected(http, api_base):
r1 = http.get(api_base + "/api/assets", params={"offset": "-1"}, timeout=120)
b1 = r1.json()
assert r1.status_code == 400
assert b1["error"]["code"] == "INVALID_QUERY"
r2 = http.get(api_base + "/api/assets", params={"limit": "abc"}, timeout=120)
b2 = r2.json()
assert r2.status_code == 400
assert b2["error"]["code"] == "INVALID_QUERY"
def test_list_assets_invalid_query_rejected(http: requests.Session, api_base: str):
# limit too small
r1 = http.get(api_base + "/api/assets", params={"limit": "0"}, timeout=120)
b1 = r1.json()
assert r1.status_code == 400
assert b1["error"]["code"] == "INVALID_QUERY"
# bad metadata JSON
r2 = http.get(api_base + "/api/assets", params={"metadata_filter": "{not json"}, timeout=120)
b2 = r2.json()
assert r2.status_code == 400
assert b2["error"]["code"] == "INVALID_QUERY"
@pytest.mark.parametrize(
"params,error_code",
[
({"offset": "-1"}, "INVALID_QUERY"),
({"limit": "abc"}, "INVALID_QUERY"),
({"limit": "0"}, "INVALID_QUERY"),
({"metadata_filter": "{not json"}, "INVALID_QUERY"),
],
ids=["negative_offset", "non_int_limit", "zero_limit", "invalid_metadata_json"],
)
def test_list_assets_invalid_query_rejected(http: requests.Session, api_base: str, params, error_code):
r = http.get(api_base + "/api/assets", params=params, timeout=120)
body = r.json()
assert r.status_code == 400
assert body["error"]["code"] == error_code
def test_list_assets_name_contains_literal_underscore(