Finished @ROUTES.post("/api/assets/from-hash")

This commit is contained in:
Jedrzej Kosinski 2026-01-15 23:09:54 -08:00
parent fab9b71f5d
commit 41d364030b
6 changed files with 382 additions and 16 deletions

View File

@ -142,3 +142,25 @@ async def download_asset_content(request: web.Request) -> web.Response:
resp.headers["Content-Disposition"] = cd resp.headers["Content-Disposition"] = cd
return resp return resp
@ROUTES.post("/api/assets/from-hash")
async def create_asset_from_hash(request: web.Request) -> web.Response:
try:
payload = await request.json()
body = schemas_in.CreateFromHashBody.model_validate(payload)
except ValidationError as ve:
return _validation_error_response("INVALID_BODY", ve)
except Exception:
return _error_response(400, "INVALID_JSON", "Request body must be valid JSON.")
result = manager.create_asset_from_hash(
hash_str=body.hash,
name=body.name,
tags=body.tags,
user_metadata=body.user_metadata,
owner_id=USER_MANAGER.get_request_user_id(request),
)
if result is None:
return _error_response(404, "ASSET_NOT_FOUND", f"Asset content {body.hash} does not exist")
return web.json_response(result.model_dump(mode="json"), status=201)

View File

@ -57,6 +57,47 @@ class ListAssetsQuery(BaseModel):
return None return None
class CreateFromHashBody(BaseModel):
model_config = ConfigDict(extra="ignore", str_strip_whitespace=True)
hash: str
name: str
tags: list[str] = Field(default_factory=list)
user_metadata: dict[str, Any] = Field(default_factory=dict)
@field_validator("hash")
@classmethod
def _require_blake3(cls, v):
s = (v or "").strip().lower()
if ":" not in s:
raise ValueError("hash must be 'blake3:<hex>'")
algo, digest = s.split(":", 1)
if algo != "blake3":
raise ValueError("only canonical 'blake3:<hex>' is accepted here")
if not digest or any(c for c in digest if c not in "0123456789abcdef"):
raise ValueError("hash digest must be lowercase hex")
return s
@field_validator("tags", mode="before")
@classmethod
def _tags_norm(cls, v):
if v is None:
return []
if isinstance(v, list):
out = [str(t).strip().lower() for t in v if str(t).strip()]
seen = set()
dedup = []
for t in out:
if t not in seen:
seen.add(t)
dedup.append(t)
return dedup
if isinstance(v, str):
return [t.strip().lower() for t in v.split(",") if t.strip()]
return []
class TagsListQuery(BaseModel): class TagsListQuery(BaseModel):
model_config = ConfigDict(extra="ignore", str_strip_whitespace=True) model_config = ConfigDict(extra="ignore", str_strip_whitespace=True)

View File

@ -48,6 +48,10 @@ class AssetDetail(BaseModel):
return v.isoformat() if v else None return v.isoformat() if v else None
class AssetCreated(AssetDetail):
created_new: bool
class TagUsage(BaseModel): class TagUsage(BaseModel):
name: str name: str
count: int count: int

View File

@ -2,10 +2,15 @@ import os
import sqlalchemy as sa import sqlalchemy as sa
from collections import defaultdict from collections import defaultdict
from datetime import datetime from datetime import datetime
from sqlalchemy import select, exists, func from typing import Iterable
from sqlalchemy import select, delete, exists, func
from sqlalchemy.dialects import sqlite
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, contains_eager, noload from sqlalchemy.orm import Session, contains_eager, noload
from app.assets.database.models import Asset, AssetInfo, AssetCacheState, AssetInfoMeta, AssetInfoTag, Tag from app.assets.database.models import Asset, AssetInfo, AssetCacheState, AssetInfoMeta, AssetInfoTag, Tag
from app.assets.helpers import escape_like_prefix, normalize_tags, utcnow from app.assets.helpers import (
compute_relative_filename, escape_like_prefix, normalize_tags, project_kv, utcnow
)
from typing import Sequence from typing import Sequence
@ -17,6 +22,22 @@ def visible_owner_clause(owner_id: str) -> sa.sql.ClauseElement:
return AssetInfo.owner_id.in_(["", owner_id]) return AssetInfo.owner_id.in_(["", owner_id])
def pick_best_live_path(states: Sequence[AssetCacheState]) -> str:
"""
Return the best on-disk path among cache states:
1) Prefer a path that exists with needs_verify == False (already verified).
2) Otherwise, pick the first path that exists.
3) Otherwise return empty string.
"""
alive = [s for s in states if getattr(s, "file_path", None) and os.path.isfile(s.file_path)]
if not alive:
return ""
for s in alive:
if not getattr(s, "needs_verify", False):
return s.file_path
return alive[0].file_path
def apply_tag_filters( def apply_tag_filters(
stmt: sa.sql.Select, stmt: sa.sql.Select,
include_tags: Sequence[str] | None = None, include_tags: Sequence[str] | None = None,
@ -109,6 +130,12 @@ def asset_exists_by_hash(session: Session, asset_hash: str) -> bool:
return row is not None return row is not None
def get_asset_by_hash(session: Session, *, asset_hash: str) -> Asset | None:
return (
session.execute(select(Asset).where(Asset.hash == asset_hash).limit(1))
).scalars().first()
def get_asset_info_by_id(session: Session, asset_info_id: str) -> AssetInfo | None: def get_asset_info_by_id(session: Session, asset_info_id: str) -> AssetInfo | None:
return session.get(AssetInfo, asset_info_id) return session.get(AssetInfo, asset_info_id)
@ -265,6 +292,155 @@ def touch_asset_info_by_id(
session.execute(stmt.values(last_access_time=ts)) session.execute(stmt.values(last_access_time=ts))
def create_asset_info_for_existing_asset(
session: Session,
*,
asset_hash: str,
name: str,
user_metadata: dict | None = None,
tags: Sequence[str] | None = None,
tag_origin: str = "manual",
owner_id: str = "",
) -> AssetInfo:
"""Create or return an existing AssetInfo for an Asset identified by asset_hash."""
now = utcnow()
asset = get_asset_by_hash(session, asset_hash=asset_hash)
if not asset:
raise ValueError(f"Unknown asset hash {asset_hash}")
info = AssetInfo(
owner_id=owner_id,
name=name,
asset_id=asset.id,
preview_id=None,
created_at=now,
updated_at=now,
last_access_time=now,
)
try:
with session.begin_nested():
session.add(info)
session.flush()
except IntegrityError:
existing = (
session.execute(
select(AssetInfo)
.options(noload(AssetInfo.tags))
.where(
AssetInfo.asset_id == asset.id,
AssetInfo.name == name,
AssetInfo.owner_id == owner_id,
)
.limit(1)
)
).unique().scalars().first()
if not existing:
raise RuntimeError("AssetInfo upsert failed to find existing row after conflict.")
return existing
# metadata["filename"] hack
new_meta = dict(user_metadata or {})
computed_filename = None
try:
p = pick_best_live_path(list_cache_states_by_asset_id(session, asset_id=asset.id))
if p:
computed_filename = compute_relative_filename(p)
except Exception:
computed_filename = None
if computed_filename:
new_meta["filename"] = computed_filename
if new_meta:
replace_asset_info_metadata_projection(
session,
asset_info_id=info.id,
user_metadata=new_meta,
)
if tags is not None:
set_asset_info_tags(
session,
asset_info_id=info.id,
tags=tags,
origin=tag_origin,
)
return info
def set_asset_info_tags(
session: Session,
*,
asset_info_id: str,
tags: Sequence[str],
origin: str = "manual",
) -> dict:
desired = normalize_tags(tags)
current = set(
tag_name for (tag_name,) in (
session.execute(select(AssetInfoTag.tag_name).where(AssetInfoTag.asset_info_id == asset_info_id))
).all()
)
to_add = [t for t in desired if t not in current]
to_remove = [t for t in current if t not in desired]
if to_add:
ensure_tags_exist(session, to_add, tag_type="user")
session.add_all([
AssetInfoTag(asset_info_id=asset_info_id, tag_name=t, origin=origin, added_at=utcnow())
for t in to_add
])
session.flush()
if to_remove:
session.execute(
delete(AssetInfoTag)
.where(AssetInfoTag.asset_info_id == asset_info_id, AssetInfoTag.tag_name.in_(to_remove))
)
session.flush()
return {"added": to_add, "removed": to_remove, "total": desired}
def replace_asset_info_metadata_projection(
session: Session,
*,
asset_info_id: str,
user_metadata: dict | None = None,
) -> None:
info = session.get(AssetInfo, asset_info_id)
if not info:
raise ValueError(f"AssetInfo {asset_info_id} not found")
info.user_metadata = user_metadata or {}
info.updated_at = utcnow()
session.flush()
session.execute(delete(AssetInfoMeta).where(AssetInfoMeta.asset_info_id == asset_info_id))
session.flush()
if not user_metadata:
return
rows: list[AssetInfoMeta] = []
for k, v in user_metadata.items():
for r in project_kv(k, v):
rows.append(
AssetInfoMeta(
asset_info_id=asset_info_id,
key=r["key"],
ordinal=int(r["ordinal"]),
val_str=r.get("val_str"),
val_num=r.get("val_num"),
val_bool=r.get("val_bool"),
val_json=r.get("val_json"),
)
)
if rows:
session.add_all(rows)
session.flush()
def list_tags_with_usage( def list_tags_with_usage(
session: Session, session: Session,
prefix: str | None = None, prefix: str | None = None,
@ -324,17 +500,24 @@ def list_tags_with_usage(
return rows_norm, int(total or 0) return rows_norm, int(total or 0)
def pick_best_live_path(states: Sequence[AssetCacheState]) -> str: def ensure_tags_exist(session: Session, names: Iterable[str], tag_type: str = "user") -> None:
""" wanted = normalize_tags(list(names))
Return the best on-disk path among cache states: if not wanted:
1) Prefer a path that exists with needs_verify == False (already verified). return
2) Otherwise, pick the first path that exists. rows = [{"name": n, "tag_type": tag_type} for n in list(dict.fromkeys(wanted))]
3) Otherwise return empty string. ins = (
""" sqlite.insert(Tag)
alive = [s for s in states if getattr(s, "file_path", None) and os.path.isfile(s.file_path)] .values(rows)
if not alive: .on_conflict_do_nothing(index_elements=[Tag.name])
return "" )
for s in alive: session.execute(ins)
if not getattr(s, "needs_verify", False):
return s.file_path
return alive[0].file_path def get_asset_tags(session: Session, *, asset_info_id: str) -> list[str]:
return [
tag_name for (tag_name,) in (
session.execute(
select(AssetInfoTag.tag_name).where(AssetInfoTag.asset_info_id == asset_info_id)
)
).all()
]

View File

@ -1,5 +1,6 @@
import contextlib import contextlib
import os import os
from decimal import Decimal
from aiohttp import web from aiohttp import web
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@ -215,3 +216,64 @@ def collect_models_files() -> list[str]:
if allowed: if allowed:
out.append(abs_path) out.append(abs_path)
return out return out
def is_scalar(v):
if v is None:
return True
if isinstance(v, bool):
return True
if isinstance(v, (int, float, Decimal, str)):
return True
return False
def project_kv(key: str, value):
"""
Turn a metadata key/value into typed projection rows.
Returns list[dict] with keys:
key, ordinal, and one of val_str / val_num / val_bool / val_json (others None)
"""
rows: list[dict] = []
def _null_row(ordinal: int) -> dict:
return {
"key": key, "ordinal": ordinal,
"val_str": None, "val_num": None, "val_bool": None, "val_json": None
}
if value is None:
rows.append(_null_row(0))
return rows
if is_scalar(value):
if isinstance(value, bool):
rows.append({"key": key, "ordinal": 0, "val_bool": bool(value)})
elif isinstance(value, (int, float, Decimal)):
num = value if isinstance(value, Decimal) else Decimal(str(value))
rows.append({"key": key, "ordinal": 0, "val_num": num})
elif isinstance(value, str):
rows.append({"key": key, "ordinal": 0, "val_str": value})
else:
rows.append({"key": key, "ordinal": 0, "val_json": value})
return rows
if isinstance(value, list):
if all(is_scalar(x) for x in value):
for i, x in enumerate(value):
if x is None:
rows.append(_null_row(i))
elif isinstance(x, bool):
rows.append({"key": key, "ordinal": i, "val_bool": bool(x)})
elif isinstance(x, (int, float, Decimal)):
num = x if isinstance(x, Decimal) else Decimal(str(x))
rows.append({"key": key, "ordinal": i, "val_num": num})
elif isinstance(x, str):
rows.append({"key": key, "ordinal": i, "val_str": x})
else:
rows.append({"key": key, "ordinal": i, "val_json": x})
return rows
for i, x in enumerate(value):
rows.append({"key": key, "ordinal": i, "val_json": x})
return rows
rows.append({"key": key, "ordinal": 0, "val_json": value})
return rows

View File

@ -6,12 +6,15 @@ from app.database.db import create_session
from app.assets.api import schemas_out from app.assets.api import schemas_out
from app.assets.database.queries import ( from app.assets.database.queries import (
asset_exists_by_hash, asset_exists_by_hash,
get_asset_by_hash,
fetch_asset_info_asset_and_tags, fetch_asset_info_asset_and_tags,
fetch_asset_info_and_asset, fetch_asset_info_and_asset,
create_asset_info_for_existing_asset,
touch_asset_info_by_id, touch_asset_info_by_id,
list_cache_states_by_asset_id, list_cache_states_by_asset_id,
list_asset_infos_page, list_asset_infos_page,
list_tags_with_usage, list_tags_with_usage,
get_asset_tags,
pick_best_live_path, pick_best_live_path,
) )
@ -25,6 +28,13 @@ def _safe_sort_field(requested: str | None) -> str:
return "created_at" return "created_at"
def _safe_filename(name: str | None, fallback: str) -> str:
n = os.path.basename((name or "").strip() or fallback)
if n:
return n
return fallback
def asset_exists(*, asset_hash: str) -> bool: def asset_exists(*, asset_hash: str) -> bool:
""" """
Check if an asset with a given hash exists in database. Check if an asset with a given hash exists in database.
@ -86,6 +96,7 @@ def list_assets(
has_more=(offset + len(summaries)) < total, has_more=(offset + len(summaries)) < total,
) )
def get_asset( def get_asset(
*, *,
asset_info_id: str, asset_info_id: str,
@ -111,6 +122,7 @@ def get_asset(
last_access_time=info.last_access_time, last_access_time=info.last_access_time,
) )
def resolve_asset_content_for_download( def resolve_asset_content_for_download(
*, *,
asset_info_id: str, asset_info_id: str,
@ -134,6 +146,48 @@ def resolve_asset_content_for_download(
download_name = info.name or os.path.basename(abs_path) download_name = info.name or os.path.basename(abs_path)
return abs_path, ctype, download_name return abs_path, ctype, download_name
def create_asset_from_hash(
*,
hash_str: str,
name: str,
tags: list[str] | None = None,
user_metadata: dict | None = None,
owner_id: str = "",
) -> schemas_out.AssetCreated | None:
canonical = hash_str.strip().lower()
with create_session() as session:
asset = get_asset_by_hash(session, asset_hash=canonical)
if not asset:
return None
info = create_asset_info_for_existing_asset(
session,
asset_hash=canonical,
name=_safe_filename(name, fallback=canonical.split(":", 1)[1]),
user_metadata=user_metadata or {},
tags=tags or [],
tag_origin="manual",
owner_id=owner_id,
)
tag_names = get_asset_tags(session, asset_info_id=info.id)
session.commit()
return schemas_out.AssetCreated(
id=info.id,
name=info.name,
asset_hash=asset.hash,
size=int(asset.size_bytes),
mime_type=asset.mime_type,
tags=tag_names,
user_metadata=info.user_metadata or {},
preview_id=info.preview_id,
created_at=info.created_at,
last_access_time=info.last_access_time,
created_new=False,
)
def list_tags( def list_tags(
prefix: str | None = None, prefix: str | None = None,
limit: int = 100, limit: int = 100,