diff --git a/app/assets/api/routes.py b/app/assets/api/routes.py index fdddc653d..a0b7984e7 100644 --- a/app/assets/api/routes.py +++ b/app/assets/api/routes.py @@ -16,7 +16,7 @@ from app.assets.api.schemas_in import ( UploadError, ) from app.assets.api.upload import parse_multipart_upload -from app.assets.services.scanner import seed_assets +from app.assets.services.scanner import seed_assets as scanner_seed_assets from typing import Any @@ -46,22 +46,22 @@ def register_assets_system(app: web.Application, user_manager_instance: user_man USER_MANAGER = user_manager_instance app.add_routes(ROUTES) -def _error_response(status: int, code: str, message: str, details: dict | None = None) -> web.Response: +def _build_error_response(status: int, code: str, message: str, details: dict | None = None) -> web.Response: return web.json_response({"error": {"code": code, "message": message, "details": details or {}}}, status=status) -def _validation_error_response(code: str, ve: ValidationError) -> web.Response: - return _error_response(400, code, "Validation failed.", {"errors": ve.json()}) +def _build_validation_error_response(code: str, ve: ValidationError) -> web.Response: + return _build_error_response(400, code, "Validation failed.", {"errors": ve.json()}) @ROUTES.head("/api/assets/hash/{hash}") async def head_asset_by_hash(request: web.Request) -> web.Response: hash_str = request.match_info.get("hash", "").strip().lower() if not hash_str or ":" not in hash_str: - return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") + return _build_error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") algo, digest = hash_str.split(":", 1) if algo != "blake3" or not digest or any(c for c in digest if c not in "0123456789abcdef"): - return _error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") + return _build_error_response(400, "INVALID_HASH", "hash must be like 'blake3:'") exists = manager.asset_exists(asset_hash=hash_str) return web.Response(status=200 if exists else 404) @@ -75,7 +75,7 @@ async def list_assets(request: web.Request) -> web.Response: try: q = schemas_in.ListAssetsQuery.model_validate(query_dict) except ValidationError as ve: - return _validation_error_response("INVALID_QUERY", ve) + return _build_validation_error_response("INVALID_QUERY", ve) payload = manager.list_assets( include_tags=q.include_tags, @@ -103,14 +103,14 @@ async def get_asset(request: web.Request) -> web.Response: owner_id=USER_MANAGER.get_request_user_id(request), ) except ValueError as e: - return _error_response(404, "ASSET_NOT_FOUND", str(e), {"id": asset_info_id}) + return _build_error_response(404, "ASSET_NOT_FOUND", str(e), {"id": asset_info_id}) except Exception: logging.exception( "get_asset failed for asset_info_id=%s, owner_id=%s", asset_info_id, USER_MANAGER.get_request_user_id(request), ) - return _error_response(500, "INTERNAL", "Unexpected server error.") + return _build_error_response(500, "INTERNAL", "Unexpected server error.") return web.json_response(result.model_dump(mode="json"), status=200) @@ -127,11 +127,11 @@ async def download_asset_content(request: web.Request) -> web.Response: owner_id=USER_MANAGER.get_request_user_id(request), ) except ValueError as ve: - return _error_response(404, "ASSET_NOT_FOUND", str(ve)) + return _build_error_response(404, "ASSET_NOT_FOUND", str(ve)) except NotImplementedError as nie: - return _error_response(501, "BACKEND_UNSUPPORTED", str(nie)) + return _build_error_response(501, "BACKEND_UNSUPPORTED", str(nie)) except FileNotFoundError: - return _error_response(404, "FILE_NOT_FOUND", "Underlying file not found on disk.") + return _build_error_response(404, "FILE_NOT_FOUND", "Underlying file not found on disk.") quoted = (filename or "").replace("\r", "").replace("\n", "").replace('"', "'") cd = f'{disposition}; filename="{quoted}"; filename*=UTF-8\'\'{urllib.parse.quote(filename)}' @@ -146,7 +146,7 @@ async def download_asset_content(request: web.Request) -> web.Response: filename, ) - async def file_sender(): + async def stream_file_chunks(): chunk_size = 64 * 1024 with open(abs_path, "rb") as f: while True: @@ -156,7 +156,7 @@ async def download_asset_content(request: web.Request) -> web.Response: yield chunk return web.Response( - body=file_sender(), + body=stream_file_chunks(), content_type=content_type, headers={ "Content-Disposition": cd, @@ -171,9 +171,9 @@ async def create_asset_from_hash(request: web.Request) -> web.Response: payload = await request.json() body = schemas_in.CreateFromHashBody.model_validate(payload) except ValidationError as ve: - return _validation_error_response("INVALID_BODY", ve) + return _build_validation_error_response("INVALID_BODY", ve) except Exception: - return _error_response(400, "INVALID_JSON", "Request body must be valid JSON.") + return _build_error_response(400, "INVALID_JSON", "Request body must be valid JSON.") result = manager.create_asset_from_hash( hash_str=body.hash, @@ -183,7 +183,7 @@ async def create_asset_from_hash(request: web.Request) -> web.Response: owner_id=USER_MANAGER.get_request_user_id(request), ) if result is None: - return _error_response(404, "ASSET_NOT_FOUND", f"Asset content {body.hash} does not exist") + return _build_error_response(404, "ASSET_NOT_FOUND", f"Asset content {body.hash} does not exist") return web.json_response(result.model_dump(mode="json"), status=201) @@ -193,21 +193,21 @@ async def upload_asset(request: web.Request) -> web.Response: try: parsed = await parse_multipart_upload(request, check_hash_exists=manager.asset_exists) except UploadError as e: - return _error_response(e.status, e.code, e.message) + return _build_error_response(e.status, e.code, e.message) owner_id = USER_MANAGER.get_request_user_id(request) try: result = manager.process_upload(parsed=parsed, owner_id=owner_id) except AssetValidationError as e: - return _error_response(400, e.code, str(e)) + return _build_error_response(400, e.code, str(e)) except AssetNotFoundError as e: - return _error_response(404, "ASSET_NOT_FOUND", str(e)) + return _build_error_response(404, "ASSET_NOT_FOUND", str(e)) except HashMismatchError as e: - return _error_response(400, "HASH_MISMATCH", str(e)) + return _build_error_response(400, "HASH_MISMATCH", str(e)) except Exception: logging.exception("process_upload failed for owner_id=%s", owner_id) - return _error_response(500, "INTERNAL", "Unexpected server error.") + return _build_error_response(500, "INTERNAL", "Unexpected server error.") status = 201 if result.created_new else 200 return web.json_response(result.model_dump(mode="json"), status=status) @@ -219,9 +219,9 @@ async def update_asset(request: web.Request) -> web.Response: try: body = schemas_in.UpdateAssetBody.model_validate(await request.json()) except ValidationError as ve: - return _validation_error_response("INVALID_BODY", ve) + return _build_validation_error_response("INVALID_BODY", ve) except Exception: - return _error_response(400, "INVALID_JSON", "Request body must be valid JSON.") + return _build_error_response(400, "INVALID_JSON", "Request body must be valid JSON.") try: result = manager.update_asset( @@ -231,14 +231,14 @@ async def update_asset(request: web.Request) -> web.Response: owner_id=USER_MANAGER.get_request_user_id(request), ) except (ValueError, PermissionError) as ve: - return _error_response(404, "ASSET_NOT_FOUND", str(ve), {"id": asset_info_id}) + return _build_error_response(404, "ASSET_NOT_FOUND", str(ve), {"id": asset_info_id}) except Exception: logging.exception( "update_asset failed for asset_info_id=%s, owner_id=%s", asset_info_id, USER_MANAGER.get_request_user_id(request), ) - return _error_response(500, "INTERNAL", "Unexpected server error.") + return _build_error_response(500, "INTERNAL", "Unexpected server error.") return web.json_response(result.model_dump(mode="json"), status=200) @@ -260,10 +260,10 @@ async def delete_asset(request: web.Request) -> web.Response: asset_info_id, USER_MANAGER.get_request_user_id(request), ) - return _error_response(500, "INTERNAL", "Unexpected server error.") + return _build_error_response(500, "INTERNAL", "Unexpected server error.") if not deleted: - return _error_response(404, "ASSET_NOT_FOUND", f"AssetInfo {asset_info_id} not found.") + return _build_error_response(404, "ASSET_NOT_FOUND", f"AssetInfo {asset_info_id} not found.") return web.Response(status=204) @@ -300,9 +300,9 @@ async def add_asset_tags(request: web.Request) -> web.Response: payload = await request.json() data = schemas_in.TagsAdd.model_validate(payload) except ValidationError as ve: - return _error_response(400, "INVALID_BODY", "Invalid JSON body for tags add.", {"errors": ve.errors()}) + return _build_error_response(400, "INVALID_BODY", "Invalid JSON body for tags add.", {"errors": ve.errors()}) except Exception: - return _error_response(400, "INVALID_JSON", "Request body must be valid JSON.") + return _build_error_response(400, "INVALID_JSON", "Request body must be valid JSON.") try: result = manager.add_tags_to_asset( @@ -312,14 +312,14 @@ async def add_asset_tags(request: web.Request) -> web.Response: owner_id=USER_MANAGER.get_request_user_id(request), ) except (ValueError, PermissionError) as ve: - return _error_response(404, "ASSET_NOT_FOUND", str(ve), {"id": asset_info_id}) + return _build_error_response(404, "ASSET_NOT_FOUND", str(ve), {"id": asset_info_id}) except Exception: logging.exception( "add_tags_to_asset failed for asset_info_id=%s, owner_id=%s", asset_info_id, USER_MANAGER.get_request_user_id(request), ) - return _error_response(500, "INTERNAL", "Unexpected server error.") + return _build_error_response(500, "INTERNAL", "Unexpected server error.") return web.json_response(result.model_dump(mode="json"), status=200) @@ -331,9 +331,9 @@ async def delete_asset_tags(request: web.Request) -> web.Response: payload = await request.json() data = schemas_in.TagsRemove.model_validate(payload) except ValidationError as ve: - return _error_response(400, "INVALID_BODY", "Invalid JSON body for tags remove.", {"errors": ve.errors()}) + return _build_error_response(400, "INVALID_BODY", "Invalid JSON body for tags remove.", {"errors": ve.errors()}) except Exception: - return _error_response(400, "INVALID_JSON", "Request body must be valid JSON.") + return _build_error_response(400, "INVALID_JSON", "Request body must be valid JSON.") try: result = manager.remove_tags_from_asset( @@ -342,20 +342,20 @@ async def delete_asset_tags(request: web.Request) -> web.Response: owner_id=USER_MANAGER.get_request_user_id(request), ) except ValueError as ve: - return _error_response(404, "ASSET_NOT_FOUND", str(ve), {"id": asset_info_id}) + return _build_error_response(404, "ASSET_NOT_FOUND", str(ve), {"id": asset_info_id}) except Exception: logging.exception( "remove_tags_from_asset failed for asset_info_id=%s, owner_id=%s", asset_info_id, USER_MANAGER.get_request_user_id(request), ) - return _error_response(500, "INTERNAL", "Unexpected server error.") + return _build_error_response(500, "INTERNAL", "Unexpected server error.") return web.json_response(result.model_dump(mode="json"), status=200) @ROUTES.post("/api/assets/seed") -async def seed_assets_endpoint(request: web.Request) -> web.Response: +async def seed_assets(request: web.Request) -> web.Response: """Trigger asset seeding for specified roots (models, input, output).""" try: payload = await request.json() @@ -365,12 +365,12 @@ async def seed_assets_endpoint(request: web.Request) -> web.Response: valid_roots = [r for r in roots if r in ("models", "input", "output")] if not valid_roots: - return _error_response(400, "INVALID_BODY", "No valid roots specified") + return _build_error_response(400, "INVALID_BODY", "No valid roots specified") try: - seed_assets(tuple(valid_roots)) + scanner_seed_assets(tuple(valid_roots)) except Exception: - logging.exception("seed_assets failed for roots=%s", valid_roots) - return _error_response(500, "INTERNAL", "Seed operation failed") + logging.exception("scanner_seed_assets failed for roots=%s", valid_roots) + return _build_error_response(500, "INTERNAL", "Seed operation failed") return web.json_response({"seeded": valid_roots}, status=200) diff --git a/app/assets/api/schemas_in.py b/app/assets/api/schemas_in.py index b141e957c..38d335845 100644 --- a/app/assets/api/schemas_in.py +++ b/app/assets/api/schemas_in.py @@ -108,7 +108,7 @@ class UpdateAssetBody(BaseModel): user_metadata: dict[str, Any] | None = None @model_validator(mode="after") - def _at_least_one(self): + def _validate_at_least_one_field(self): if self.name is None and self.user_metadata is None: raise ValueError("Provide at least one of: name, user_metadata.") return self @@ -137,7 +137,7 @@ class CreateFromHashBody(BaseModel): @field_validator("tags", mode="before") @classmethod - def _tags_norm(cls, v): + def _normalize_tags_field(cls, v): if v is None: return [] if isinstance(v, list): diff --git a/app/assets/api/schemas_out.py b/app/assets/api/schemas_out.py index b6fb3da0c..f36447856 100644 --- a/app/assets/api/schemas_out.py +++ b/app/assets/api/schemas_out.py @@ -19,7 +19,7 @@ class AssetSummary(BaseModel): model_config = ConfigDict(from_attributes=True) @field_serializer("created_at", "updated_at", "last_access_time") - def _ser_dt(self, v: datetime | None, _info): + def _serialize_datetime(self, v: datetime | None, _info): return v.isoformat() if v else None @@ -40,7 +40,7 @@ class AssetUpdated(BaseModel): model_config = ConfigDict(from_attributes=True) @field_serializer("updated_at") - def _ser_updated(self, v: datetime | None, _info): + def _serialize_updated_at(self, v: datetime | None, _info): return v.isoformat() if v else None @@ -59,7 +59,7 @@ class AssetDetail(BaseModel): model_config = ConfigDict(from_attributes=True) @field_serializer("created_at", "last_access_time") - def _ser_dt(self, v: datetime | None, _info): + def _serialize_datetime(self, v: datetime | None, _info): return v.isoformat() if v else None diff --git a/app/assets/database/models.py b/app/assets/database/models.py index 3cd28f68b..ea9fc198e 100644 --- a/app/assets/database/models.py +++ b/app/assets/database/models.py @@ -20,7 +20,7 @@ from sqlalchemy import ( ) from sqlalchemy.orm import Mapped, foreign, mapped_column, relationship -from app.assets.helpers import utcnow +from app.assets.helpers import get_utc_now from app.database.models import to_dict, Base @@ -32,7 +32,7 @@ class Asset(Base): size_bytes: Mapped[int] = mapped_column(BigInteger, nullable=False, default=0) mime_type: Mapped[str | None] = mapped_column(String(255)) created_at: Mapped[datetime] = mapped_column( - DateTime(timezone=False), nullable=False, default=utcnow + DateTime(timezone=False), nullable=False, default=get_utc_now ) infos: Mapped[list[AssetInfo]] = relationship( @@ -105,9 +105,9 @@ class AssetInfo(Base): asset_id: Mapped[str] = mapped_column(String(36), ForeignKey("assets.id", ondelete="RESTRICT"), nullable=False) preview_id: Mapped[str | None] = mapped_column(String(36), ForeignKey("assets.id", ondelete="SET NULL")) user_metadata: Mapped[dict[str, Any] | None] = mapped_column(JSON(none_as_null=True)) - created_at: Mapped[datetime] = mapped_column(DateTime(timezone=False), nullable=False, default=utcnow) - updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=False), nullable=False, default=utcnow) - last_access_time: Mapped[datetime] = mapped_column(DateTime(timezone=False), nullable=False, default=utcnow) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=False), nullable=False, default=get_utc_now) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=False), nullable=False, default=get_utc_now) + last_access_time: Mapped[datetime] = mapped_column(DateTime(timezone=False), nullable=False, default=get_utc_now) asset: Mapped[Asset] = relationship( "Asset", @@ -196,7 +196,7 @@ class AssetInfoTag(Base): ) origin: Mapped[str] = mapped_column(String(32), nullable=False, default="manual") added_at: Mapped[datetime] = mapped_column( - DateTime(timezone=False), nullable=False, default=utcnow + DateTime(timezone=False), nullable=False, default=get_utc_now ) asset_info: Mapped[AssetInfo] = relationship(back_populates="tag_links") diff --git a/app/assets/database/queries/asset_info.py b/app/assets/database/queries/asset_info.py index c523059c3..952bac569 100644 --- a/app/assets/database/queries/asset_info.py +++ b/app/assets/database/queries/asset_info.py @@ -18,10 +18,10 @@ from sqlalchemy.orm import Session, contains_eager, noload from app.assets.database.models import ( Asset, AssetInfo, AssetInfoMeta, AssetInfoTag, Tag ) -from app.assets.helpers import escape_like_prefix, normalize_tags, utcnow +from app.assets.helpers import escape_like_prefix, normalize_tags, get_utc_now -def is_scalar(v): +def check_is_scalar(v): if v is None: return True if isinstance(v, bool): @@ -31,7 +31,7 @@ def is_scalar(v): return False -def project_kv(key: str, value): +def expand_metadata_to_rows(key: str, value): """ Turn a metadata key/value into typed projection rows. Returns list[dict] with keys: @@ -49,7 +49,7 @@ def project_kv(key: str, value): rows.append(_null_row(0)) return rows - if is_scalar(value): + if check_is_scalar(value): if isinstance(value, bool): rows.append({"key": key, "ordinal": 0, "val_bool": bool(value)}) elif isinstance(value, (int, float, Decimal)): @@ -62,7 +62,7 @@ def project_kv(key: str, value): return rows if isinstance(value, list): - if all(is_scalar(x) for x in value): + if all(check_is_scalar(x) for x in value): for i, x in enumerate(value): if x is None: rows.append(_null_row(i)) @@ -95,7 +95,7 @@ def _iter_chunks(seq, n: int): yield seq[i : i + n] -def _visible_owner_clause(owner_id: str) -> sa.sql.ClauseElement: +def _build_visible_owner_clause(owner_id: str) -> sa.sql.ClauseElement: """Build owner visibility predicate for reads. Owner-less rows are visible to everyone.""" owner_id = (owner_id or "").strip() if owner_id == "": @@ -210,7 +210,7 @@ def insert_asset_info( preview_id: str | None = None, ) -> AssetInfo | None: """Insert a new AssetInfo. Returns None if unique constraint violated.""" - now = utcnow() + now = get_utc_now() try: with session.begin_nested(): info = AssetInfo( @@ -267,7 +267,7 @@ def update_asset_info_timestamps( preview_id: str | None = None, ) -> None: """Update timestamps and optionally preview_id on existing AssetInfo.""" - now = utcnow() + now = get_utc_now() if preview_id and asset_info.preview_id != preview_id: asset_info.preview_id = preview_id asset_info.updated_at = now @@ -292,7 +292,7 @@ def list_asset_infos_page( select(AssetInfo) .join(Asset, Asset.id == AssetInfo.asset_id) .options(contains_eager(AssetInfo.asset), noload(AssetInfo.tags)) - .where(_visible_owner_clause(owner_id)) + .where(_build_visible_owner_clause(owner_id)) ) if name_contains: @@ -320,7 +320,7 @@ def list_asset_infos_page( select(sa.func.count()) .select_from(AssetInfo) .join(Asset, Asset.id == AssetInfo.asset_id) - .where(_visible_owner_clause(owner_id)) + .where(_build_visible_owner_clause(owner_id)) ) if name_contains: escaped, esc = escape_like_prefix(name_contains) @@ -359,7 +359,7 @@ def fetch_asset_info_asset_and_tags( .join(Tag, Tag.name == AssetInfoTag.tag_name, isouter=True) .where( AssetInfo.id == asset_info_id, - _visible_owner_clause(owner_id), + _build_visible_owner_clause(owner_id), ) .options(noload(AssetInfo.tags)) .order_by(Tag.name.asc()) @@ -389,7 +389,7 @@ def fetch_asset_info_and_asset( .join(Asset, Asset.id == AssetInfo.asset_id) .where( AssetInfo.id == asset_info_id, - _visible_owner_clause(owner_id), + _build_visible_owner_clause(owner_id), ) .limit(1) .options(noload(AssetInfo.tags)) @@ -407,7 +407,7 @@ def touch_asset_info_by_id( ts: datetime | None = None, only_if_newer: bool = True, ) -> None: - ts = ts or utcnow() + ts = ts or get_utc_now() stmt = sa.update(AssetInfo).where(AssetInfo.id == asset_info_id) if only_if_newer: stmt = stmt.where( @@ -426,7 +426,7 @@ def replace_asset_info_metadata_projection( raise ValueError(f"AssetInfo {asset_info_id} not found") info.user_metadata = user_metadata or {} - info.updated_at = utcnow() + info.updated_at = get_utc_now() session.flush() session.execute(delete(AssetInfoMeta).where(AssetInfoMeta.asset_info_id == asset_info_id)) @@ -437,7 +437,7 @@ def replace_asset_info_metadata_projection( rows: list[AssetInfoMeta] = [] for k, v in user_metadata.items(): - for r in project_kv(k, v): + for r in expand_metadata_to_rows(k, v): rows.append( AssetInfoMeta( asset_info_id=asset_info_id, @@ -461,7 +461,7 @@ def delete_asset_info_by_id( ) -> bool: stmt = sa.delete(AssetInfo).where( AssetInfo.id == asset_info_id, - _visible_owner_clause(owner_id), + _build_visible_owner_clause(owner_id), ) return int((session.execute(stmt)).rowcount or 0) > 0 @@ -483,7 +483,7 @@ def set_asset_info_preview( raise ValueError(f"Preview Asset {preview_asset_id} not found") info.preview_id = preview_asset_id - info.updated_at = utcnow() + info.updated_at = get_utc_now() session.flush() diff --git a/app/assets/database/queries/tags.py b/app/assets/database/queries/tags.py index 5e6fbc066..63dec9022 100644 --- a/app/assets/database/queries/tags.py +++ b/app/assets/database/queries/tags.py @@ -7,7 +7,7 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from app.assets.database.models import AssetInfo, AssetInfoMeta, AssetInfoTag, Tag -from app.assets.helpers import escape_like_prefix, normalize_tags, utcnow +from app.assets.helpers import escape_like_prefix, normalize_tags, get_utc_now MAX_BIND_PARAMS = 800 @@ -16,7 +16,7 @@ def _rows_per_stmt(cols: int) -> int: return max(1, MAX_BIND_PARAMS // max(1, cols)) -def _chunk_rows(rows: list[dict], cols_per_row: int) -> Iterable[list[dict]]: +def _iter_row_chunks(rows: list[dict], cols_per_row: int) -> Iterable[list[dict]]: if not rows: return [] rows_per_stmt = max(1, MAX_BIND_PARAMS // max(1, cols_per_row)) @@ -24,7 +24,7 @@ def _chunk_rows(rows: list[dict], cols_per_row: int) -> Iterable[list[dict]]: yield rows[i : i + rows_per_stmt] -def _visible_owner_clause(owner_id: str) -> sa.sql.ClauseElement: +def _build_visible_owner_clause(owner_id: str) -> sa.sql.ClauseElement: """Build owner visibility predicate for reads. Owner-less rows are visible to everyone.""" owner_id = (owner_id or "").strip() if owner_id == "": @@ -75,7 +75,7 @@ def set_asset_info_tags( if to_add: ensure_tags_exist(session, to_add, tag_type="user") session.add_all([ - AssetInfoTag(asset_info_id=asset_info_id, tag_name=t, origin=origin, added_at=utcnow()) + AssetInfoTag(asset_info_id=asset_info_id, tag_name=t, origin=origin, added_at=get_utc_now()) for t in to_add ]) session.flush() @@ -132,7 +132,7 @@ def add_tags_to_asset_info( asset_info_id=asset_info_id, tag_name=t, origin=origin, - added_at=utcnow(), + added_at=get_utc_now(), ) for t in to_add ] @@ -199,7 +199,7 @@ def add_missing_tag_for_asset_id( AssetInfo.id.label("asset_info_id"), sa.literal("missing").label("tag_name"), sa.literal(origin).label("origin"), - sa.literal(utcnow()).label("added_at"), + sa.literal(get_utc_now()).label("added_at"), ) .where(AssetInfo.asset_id == asset_id) .where( @@ -246,7 +246,7 @@ def list_tags_with_usage( ) .select_from(AssetInfoTag) .join(AssetInfo, AssetInfo.id == AssetInfoTag.asset_info_id) - .where(_visible_owner_clause(owner_id)) + .where(_build_visible_owner_clause(owner_id)) .group_by(AssetInfoTag.tag_name) .subquery() ) @@ -305,12 +305,12 @@ def bulk_insert_tags_and_meta( ins_tags = sqlite.insert(AssetInfoTag).on_conflict_do_nothing( index_elements=[AssetInfoTag.asset_info_id, AssetInfoTag.tag_name] ) - for chunk in _chunk_rows(tag_rows, cols_per_row=4): + for chunk in _iter_row_chunks(tag_rows, cols_per_row=4): session.execute(ins_tags, chunk) if meta_rows: ins_meta = sqlite.insert(AssetInfoMeta).on_conflict_do_nothing( index_elements=[AssetInfoMeta.asset_info_id, AssetInfoMeta.key, AssetInfoMeta.ordinal] ) - for chunk in _chunk_rows(meta_rows, cols_per_row=7): + for chunk in _iter_row_chunks(meta_rows, cols_per_row=7): session.execute(ins_meta, chunk) diff --git a/app/assets/helpers.py b/app/assets/helpers.py index a0002b37f..f37965702 100644 --- a/app/assets/helpers.py +++ b/app/assets/helpers.py @@ -31,7 +31,7 @@ def escape_like_prefix(s: str, escape: str = "!") -> tuple[str, str]: return s, escape -def utcnow() -> datetime: +def get_utc_now() -> datetime: """Naive UTC timestamp (no tzinfo). We always treat DB datetimes as UTC.""" return datetime.now(timezone.utc).replace(tzinfo=None) diff --git a/app/assets/manager.py b/app/assets/manager.py index 63fe4acde..dd18551b8 100644 --- a/app/assets/manager.py +++ b/app/assets/manager.py @@ -52,7 +52,7 @@ from app.assets.services import ( from app.assets.services.tagging import list_tags as svc_list_tags -def _safe_sort_field(requested: str | None) -> str: +def _validate_sort_field(requested: str | None) -> str: if not requested: return "created_at" v = requested.lower() @@ -66,7 +66,7 @@ def _get_size_mtime_ns(path: str) -> tuple[int, int]: return st.st_size, getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000)) -def _safe_filename(name: str | None, fallback: str) -> str: +def _sanitize_filename(name: str | None, fallback: str) -> str: n = os.path.basename((name or "").strip() or fallback) if n: return n @@ -89,7 +89,7 @@ def list_assets( order: str = "desc", owner_id: str = "", ) -> schemas_out.AssetsList: - sort = _safe_sort_field(sort) + sort = _validate_sort_field(sort) order = "desc" if (order or "desc").lower() not in {"asc", "desc"} else order.lower() with create_session() as session: @@ -188,7 +188,7 @@ def upload_asset_from_temp_path( expected_asset_hash: str | None = None, ) -> schemas_out.AssetCreated: try: - digest = hashing.blake3_hash(temp_path) + digest = hashing.compute_blake3_hash(temp_path) except Exception as e: raise RuntimeError(f"failed to hash uploaded file: {e}") asset_hash = "blake3:" + digest @@ -205,7 +205,7 @@ def upload_asset_from_temp_path( if temp_path and os.path.exists(temp_path): os.remove(temp_path) - display_name = _safe_filename(spec.name or (client_filename or ""), fallback=digest) + display_name = _sanitize_filename(spec.name or (client_filename or ""), fallback=digest) result = register_existing_asset( asset_hash=asset_hash, name=display_name, @@ -266,7 +266,7 @@ def upload_asset_from_temp_path( size_bytes=size_bytes, mtime_ns=mtime_ns, mime_type=content_type, - info_name=_safe_filename(spec.name or (client_filename or ""), fallback=digest), + info_name=_sanitize_filename(spec.name or (client_filename or ""), fallback=digest), owner_id=owner_id, preview_id=None, user_metadata=spec.user_metadata or {}, @@ -456,7 +456,7 @@ def create_asset_from_hash( result = register_existing_asset( asset_hash=canonical, - name=_safe_filename(name, fallback=canonical.split(":", 1)[1] if ":" in canonical else canonical), + name=_sanitize_filename(name, fallback=canonical.split(":", 1)[1] if ":" in canonical else canonical), user_metadata=user_metadata or {}, tags=tags or [], tag_origin="manual", diff --git a/app/assets/services/asset_management.py b/app/assets/services/asset_management.py index 2a4ce55aa..acbd57874 100644 --- a/app/assets/services/asset_management.py +++ b/app/assets/services/asset_management.py @@ -13,7 +13,7 @@ from typing import Sequence from app.assets.database.models import Asset from app.database.db import create_session -from app.assets.helpers import pick_best_live_path, utcnow +from app.assets.helpers import pick_best_live_path, get_utc_now from app.assets.services.path_utils import compute_relative_filename from app.assets.database.queries import ( asset_info_exists_for_asset_id, @@ -108,7 +108,7 @@ def update_asset_metadata( touched = True if touched and user_metadata is None: - info.updated_at = utcnow() + info.updated_at = get_utc_now() session.flush() # Fetch updated info with tags diff --git a/app/assets/services/hashing.py b/app/assets/services/hashing.py index 4b72084b9..4b8db4e9b 100644 --- a/app/assets/services/hashing.py +++ b/app/assets/services/hashing.py @@ -7,7 +7,7 @@ import asyncio DEFAULT_CHUNK = 8 * 1024 *1024 # 8MB # NOTE: this allows hashing different representations of a file-like object -def blake3_hash( +def compute_blake3_hash( fp: str | IO[bytes], chunk_size: int = DEFAULT_CHUNK, ) -> str: @@ -27,16 +27,16 @@ def blake3_hash( return _hash_file_obj(f, chunk_size) -async def blake3_hash_async( +async def compute_compute_blake3_hash_async( fp: str | IO[bytes], chunk_size: int = DEFAULT_CHUNK, ) -> str: - """Async wrapper for ``blake3_hash_sync``. + """Async wrapper for ``compute_blake3_hash_sync``. Uses a worker thread so the event loop remains responsive. """ # If it is a path, open inside the worker thread to keep I/O off the loop. if hasattr(fp, "read"): - return await asyncio.to_thread(blake3_hash, fp, chunk_size) + return await asyncio.to_thread(compute_blake3_hash, fp, chunk_size) def _worker() -> str: with open(os.fspath(fp), "rb") as f: diff --git a/app/assets/services/path_utils.py b/app/assets/services/path_utils.py index 8fa2454b8..f4d8897b2 100644 --- a/app/assets/services/path_utils.py +++ b/app/assets/services/path_utils.py @@ -101,33 +101,33 @@ def get_relative_to_root_category_path_of_asset(file_path: str) -> tuple[Literal """ fp_abs = os.path.abspath(file_path) - def _is_within(child: str, parent: str) -> bool: + def _check_is_within(child: str, parent: str) -> bool: try: return os.path.commonpath([child, parent]) == parent except Exception: return False - def _rel(child: str, parent: str) -> str: + def _compute_relative(child: str, parent: str) -> str: return os.path.relpath(os.path.join(os.sep, os.path.relpath(child, parent)), os.sep) # 1) input input_base = os.path.abspath(folder_paths.get_input_directory()) - if _is_within(fp_abs, input_base): - return "input", _rel(fp_abs, input_base) + if _check_is_within(fp_abs, input_base): + return "input", _compute_relative(fp_abs, input_base) # 2) output output_base = os.path.abspath(folder_paths.get_output_directory()) - if _is_within(fp_abs, output_base): - return "output", _rel(fp_abs, output_base) + if _check_is_within(fp_abs, output_base): + return "output", _compute_relative(fp_abs, output_base) # 3) models (check deepest matching base to avoid ambiguity) best: tuple[int, str, str] | None = None # (base_len, bucket, rel_inside_bucket) for bucket, bases in get_comfy_models_folders(): for b in bases: base_abs = os.path.abspath(b) - if not _is_within(fp_abs, base_abs): + if not _check_is_within(fp_abs, base_abs): continue - cand = (len(base_abs), bucket, _rel(fp_abs, base_abs)) + cand = (len(base_abs), bucket, _compute_relative(fp_abs, base_abs)) if best is None or cand[0] > best[0]: best = cand diff --git a/app/assets/services/scanner.py b/app/assets/services/scanner.py index 4b3d0c95c..f2866bb50 100644 --- a/app/assets/services/scanner.py +++ b/app/assets/services/scanner.py @@ -26,7 +26,7 @@ from app.assets.database.queries import ( get_asset_info_ids_by_ids, bulk_insert_tags_and_meta, ) -from app.assets.helpers import utcnow +from app.assets.helpers import get_utc_now from app.assets.services.path_utils import ( compute_relative_filename, get_comfy_models_folders, @@ -38,7 +38,7 @@ from app.database.db import create_session, dependencies_available RootType = Literal["models", "input", "output"] -def fast_asset_file_check( +def check_asset_file_fast( mtime_db: int | None, size_db: int | None, stat_result: os.stat_result, @@ -65,7 +65,7 @@ def list_tree(base_dir: str) -> list[str]: return out -def prefixes_for_root(root: RootType) -> list[str]: +def get_prefixes_for_root(root: RootType) -> list[str]: if root == "models": bases: list[str] = [] for _bucket, paths in get_comfy_models_folders(): @@ -128,7 +128,7 @@ def _seed_from_paths_batch( if not specs: return {"inserted_infos": 0, "won_states": 0, "lost_states": 0} - now = utcnow() + now = get_utc_now() asset_rows: list[dict] = [] state_rows: list[dict] = [] path_to_asset: dict[str, str] = {} @@ -283,7 +283,7 @@ def reconcile_cache_states_for_root( Returns: Set of surviving absolute paths if collect_existing_paths=True, else None """ - prefixes = prefixes_for_root(root) + prefixes = get_prefixes_for_root(root) if not prefixes: return set() if collect_existing_paths else None @@ -299,7 +299,7 @@ def reconcile_cache_states_for_root( fast_ok = False try: exists = True - fast_ok = fast_asset_file_check( + fast_ok = check_asset_file_fast( mtime_db=row.mtime_ns, size_db=acc["size_db"], stat_result=os.stat(row.file_path, follow_symlinks=True), @@ -400,7 +400,7 @@ def seed_assets(roots: tuple[RootType, ...], enable_logging: bool = False) -> No try: with create_session() as sess: all_prefixes = [ - os.path.abspath(p) for r in roots for p in prefixes_for_root(r) + os.path.abspath(p) for r in roots for p in get_prefixes_for_root(r) ] orphans_pruned = prune_orphaned_assets(sess, all_prefixes) sess.commit() diff --git a/tests-unit/assets_test/queries/test_asset_info.py b/tests-unit/assets_test/queries/test_asset_info.py index 981a0b297..354ad4e11 100644 --- a/tests-unit/assets_test/queries/test_asset_info.py +++ b/tests-unit/assets_test/queries/test_asset_info.py @@ -22,7 +22,7 @@ from app.assets.database.queries import ( ensure_tags_exist, add_tags_to_asset_info, ) -from app.assets.helpers import utcnow +from app.assets.helpers import get_utc_now def _make_asset(session: Session, hash_val: str | None = None, size: int = 1024) -> Asset: @@ -38,7 +38,7 @@ def _make_asset_info( name: str = "test", owner_id: str = "", ) -> AssetInfo: - now = utcnow() + now = get_utc_now() info = AssetInfo( owner_id=owner_id, name=name, @@ -423,7 +423,7 @@ class TestReplaceAssetInfoMetadataProjection: class TestBulkInsertAssetInfosIgnoreConflicts: def test_inserts_multiple_infos(self, session: Session): asset = _make_asset(session, "hash1") - now = utcnow() + now = get_utc_now() rows = [ { "id": str(uuid.uuid4()), @@ -459,7 +459,7 @@ class TestBulkInsertAssetInfosIgnoreConflicts: _make_asset_info(session, asset, name="existing.bin", owner_id="") session.commit() - now = utcnow() + now = get_utc_now() rows = [ { "id": str(uuid.uuid4()), diff --git a/tests-unit/assets_test/queries/test_cache_state.py b/tests-unit/assets_test/queries/test_cache_state.py index 3068db1ec..5cde91f5f 100644 --- a/tests-unit/assets_test/queries/test_cache_state.py +++ b/tests-unit/assets_test/queries/test_cache_state.py @@ -15,7 +15,7 @@ from app.assets.database.queries import ( bulk_insert_cache_states_ignore_conflicts, get_cache_states_by_paths_and_asset_ids, ) -from app.assets.helpers import pick_best_live_path, utcnow +from app.assets.helpers import pick_best_live_path, get_utc_now def _make_asset(session: Session, hash_val: str | None = None, size: int = 1024) -> Asset: @@ -228,7 +228,7 @@ class TestGetOrphanedSeedAssetIds: class TestDeleteAssetsByIds: def test_deletes_assets_and_infos(self, session: Session): asset = _make_asset(session, "hash1") - now = utcnow() + now = get_utc_now() info = AssetInfo( owner_id="", name="test", asset_id=asset.id, created_at=now, updated_at=now, last_access_time=now @@ -325,7 +325,7 @@ class TestDeleteCacheStatesByIds: class TestDeleteOrphanedSeedAsset: def test_deletes_seed_asset_and_infos(self, session: Session): asset = _make_asset(session, hash_val=None) - now = utcnow() + now = get_utc_now() info = AssetInfo( owner_id="", name="test", asset_id=asset.id, created_at=now, updated_at=now, last_access_time=now diff --git a/tests-unit/assets_test/queries/test_metadata.py b/tests-unit/assets_test/queries/test_metadata.py index 233b3c012..368c7c282 100644 --- a/tests-unit/assets_test/queries/test_metadata.py +++ b/tests-unit/assets_test/queries/test_metadata.py @@ -3,8 +3,8 @@ from sqlalchemy.orm import Session from app.assets.database.models import Asset, AssetInfo, AssetInfoMeta from app.assets.database.queries import list_asset_infos_page -from app.assets.database.queries.asset_info import project_kv -from app.assets.helpers import utcnow +from app.assets.database.queries.asset_info import expand_metadata_to_rows +from app.assets.helpers import get_utc_now def _make_asset(session: Session, hash_val: str) -> Asset: @@ -20,7 +20,7 @@ def _make_asset_info( name: str, metadata: dict | None = None, ) -> AssetInfo: - now = utcnow() + now = get_utc_now() info = AssetInfo( owner_id="", name=name, @@ -35,7 +35,7 @@ def _make_asset_info( if metadata: for key, val in metadata.items(): - for row in project_kv(key, val): + for row in expand_metadata_to_rows(key, val): meta_row = AssetInfoMeta( asset_info_id=info.id, key=row["key"], diff --git a/tests-unit/assets_test/queries/test_tags.py b/tests-unit/assets_test/queries/test_tags.py index aaf4d3099..83073a87c 100644 --- a/tests-unit/assets_test/queries/test_tags.py +++ b/tests-unit/assets_test/queries/test_tags.py @@ -13,7 +13,7 @@ from app.assets.database.queries import ( list_tags_with_usage, bulk_insert_tags_and_meta, ) -from app.assets.helpers import utcnow +from app.assets.helpers import get_utc_now def _make_asset(session: Session, hash_val: str | None = None) -> Asset: @@ -24,7 +24,7 @@ def _make_asset(session: Session, hash_val: str | None = None) -> Asset: def _make_asset_info(session: Session, asset: Asset, name: str = "test", owner_id: str = "") -> AssetInfo: - now = utcnow() + now = get_utc_now() info = AssetInfo( owner_id=owner_id, name=name, @@ -87,8 +87,8 @@ class TestGetAssetTags: ensure_tags_exist(session, ["tag1", "tag2"]) session.add_all([ - AssetInfoTag(asset_info_id=info.id, tag_name="tag1", origin="manual", added_at=utcnow()), - AssetInfoTag(asset_info_id=info.id, tag_name="tag2", origin="manual", added_at=utcnow()), + AssetInfoTag(asset_info_id=info.id, tag_name="tag1", origin="manual", added_at=get_utc_now()), + AssetInfoTag(asset_info_id=info.id, tag_name="tag2", origin="manual", added_at=get_utc_now()), ]) session.flush() @@ -305,7 +305,7 @@ class TestBulkInsertTagsAndMeta: ensure_tags_exist(session, ["bulk-tag1", "bulk-tag2"]) session.commit() - now = utcnow() + now = get_utc_now() tag_rows = [ {"asset_info_id": info.id, "tag_name": "bulk-tag1", "origin": "manual", "added_at": now}, {"asset_info_id": info.id, "tag_name": "bulk-tag2", "origin": "manual", "added_at": now}, @@ -347,7 +347,7 @@ class TestBulkInsertTagsAndMeta: add_tags_to_asset_info(session, asset_info_id=info.id, tags=["existing-tag"]) session.commit() - now = utcnow() + now = get_utc_now() tag_rows = [ {"asset_info_id": info.id, "tag_name": "existing-tag", "origin": "duplicate", "added_at": now}, ] diff --git a/tests-unit/assets_test/services/test_asset_management.py b/tests-unit/assets_test/services/test_asset_management.py index 1bff4344c..9c1112b77 100644 --- a/tests-unit/assets_test/services/test_asset_management.py +++ b/tests-unit/assets_test/services/test_asset_management.py @@ -4,7 +4,7 @@ from sqlalchemy.orm import Session from app.assets.database.models import Asset, AssetInfo from app.assets.database.queries import ensure_tags_exist, add_tags_to_asset_info -from app.assets.helpers import utcnow +from app.assets.helpers import get_utc_now from app.assets.services import ( get_asset_detail, update_asset_metadata, @@ -26,7 +26,7 @@ def _make_asset_info( name: str = "test", owner_id: str = "", ) -> AssetInfo: - now = utcnow() + now = get_utc_now() info = AssetInfo( owner_id=owner_id, name=name, diff --git a/tests-unit/assets_test/services/test_ingest.py b/tests-unit/assets_test/services/test_ingest.py index 9270a3ce0..faf3319bf 100644 --- a/tests-unit/assets_test/services/test_ingest.py +++ b/tests-unit/assets_test/services/test_ingest.py @@ -177,14 +177,14 @@ class TestRegisterExistingAsset: session.add(asset) session.flush() - from app.assets.helpers import utcnow + from app.assets.helpers import get_utc_now info = AssetInfo( owner_id="", name="Existing Info", asset_id=asset.id, - created_at=utcnow(), - updated_at=utcnow(), - last_access_time=utcnow(), + created_at=get_utc_now(), + updated_at=get_utc_now(), + last_access_time=get_utc_now(), ) session.add(info) session.flush() # Flush to get the ID diff --git a/tests-unit/assets_test/services/test_tagging.py b/tests-unit/assets_test/services/test_tagging.py index 396ae407f..d9e7b2a5b 100644 --- a/tests-unit/assets_test/services/test_tagging.py +++ b/tests-unit/assets_test/services/test_tagging.py @@ -4,7 +4,7 @@ from sqlalchemy.orm import Session from app.assets.database.models import Asset, AssetInfo from app.assets.database.queries import ensure_tags_exist, add_tags_to_asset_info -from app.assets.helpers import utcnow +from app.assets.helpers import get_utc_now from app.assets.services import apply_tags, remove_tags, list_tags @@ -21,7 +21,7 @@ def _make_asset_info( name: str = "test", owner_id: str = "", ) -> AssetInfo: - now = utcnow() + now = get_utc_now() info = AssetInfo( owner_id=owner_id, name=name,