ComfyUI/research_api/routes/_db_helpers.py

310 lines
9.9 KiB
Python

"""Async DB helpers that wrap sync SQLAlchemy with run_in_executor."""
import asyncio
from functools import partial
from research_api.db import create_session
from research_api.models import Project, Intent, PaperAsset, ClaimAsset, Source, FeedItem
from app.database.models import to_dict
def _sync_list_projects():
with create_session() as session:
from sqlalchemy import select
result = session.execute(select(Project).order_by(Project.updated_at.desc()))
return [to_dict(p) for p in result.scalars().all()]
def _sync_create_project(data):
with create_session() as session:
project = Project(**data)
session.add(project)
session.commit()
session.refresh(project)
return to_dict(project)
def _sync_get_project(project_id):
with create_session() as session:
from sqlalchemy import select
result = session.execute(select(Project).where(Project.id == project_id))
p = result.scalar_one_or_none()
return to_dict(p) if p else None
def _sync_list_intents(project_id):
with create_session() as session:
from sqlalchemy import select
result = session.execute(
select(Intent).where(Intent.project_id == project_id).order_by(Intent.priority.desc())
)
return [to_dict(i) for i in result.scalars().all()]
def _sync_create_intent(data):
with create_session() as session:
intent = Intent(**data)
session.add(intent)
session.commit()
session.refresh(intent)
return to_dict(intent)
def _sync_update_intent(intent_id, data):
with create_session() as session:
from sqlalchemy import select
result = session.execute(select(Intent).where(Intent.id == intent_id))
intent = result.scalar_one_or_none()
if not intent:
return None
for key, value in data.items():
if hasattr(intent, key):
setattr(intent, key, value)
session.commit()
session.refresh(intent)
return to_dict(intent)
def _sync_list_papers(library_status=None, read_status=None):
with create_session() as session:
from sqlalchemy import select
query = select(PaperAsset)
if library_status:
query = query.where(PaperAsset.library_status == library_status)
if read_status:
query = query.where(PaperAsset.read_status == read_status)
query = query.order_by(PaperAsset.updated_at.desc())
result = session.execute(query)
return [to_dict(p) for p in result.scalars().all()]
def _sync_create_paper(data):
with create_session() as session:
paper = PaperAsset(**data)
session.add(paper)
session.commit()
session.refresh(paper)
return to_dict(paper)
def _sync_get_paper(paper_id):
with create_session() as session:
from sqlalchemy import select
result = session.execute(select(PaperAsset).where(PaperAsset.id == paper_id))
p = result.scalar_one_or_none()
return to_dict(p) if p else None
def _sync_update_paper(paper_id, data):
with create_session() as session:
from sqlalchemy import select
result = session.execute(select(PaperAsset).where(PaperAsset.id == paper_id))
paper = result.scalar_one_or_none()
if not paper:
return None
for key, value in data.items():
if hasattr(paper, key):
setattr(paper, key, value)
session.commit()
session.refresh(paper)
return to_dict(paper)
def _sync_list_claims(project_id=None, support_level=None):
with create_session() as session:
from sqlalchemy import select
query = select(ClaimAsset)
if project_id:
query = query.where(ClaimAsset.project_id == project_id)
if support_level:
query = query.where(ClaimAsset.support_level == support_level)
query = query.order_by(ClaimAsset.updated_at.desc())
result = session.execute(query)
return [to_dict(c) for c in result.scalars().all()]
def _sync_create_claim(data):
with create_session() as session:
claim = ClaimAsset(**data)
session.add(claim)
session.commit()
session.refresh(claim)
return to_dict(claim)
def _sync_update_claim(claim_id, data):
with create_session() as session:
from sqlalchemy import select
result = session.execute(select(ClaimAsset).where(ClaimAsset.id == claim_id))
claim = result.scalar_one_or_none()
if not claim:
return None
for key, value in data.items():
if hasattr(claim, key):
setattr(claim, key, value)
session.commit()
session.refresh(claim)
return to_dict(claim)
def _sync_list_sources():
with create_session() as session:
from sqlalchemy import select
result = session.execute(select(Source).order_by(Source.priority.desc()))
return [to_dict(s) for s in result.scalars().all()]
def _sync_create_source(data):
with create_session() as session:
source = Source(**data)
session.add(source)
session.commit()
session.refresh(source)
return to_dict(source)
def _sync_update_source(source_id, data):
with create_session() as session:
from sqlalchemy import select
result = session.execute(select(Source).where(Source.id == source_id))
source = result.scalar_one_or_none()
if not source:
return None
for key, value in data.items():
if hasattr(source, key):
setattr(source, key, value)
session.commit()
session.refresh(source)
return to_dict(source)
def _sync_get_today_feed():
with create_session() as session:
from sqlalchemy import select
result = session.execute(
select(FeedItem)
.where(FeedItem.status.in_(["discovered", "ranked", "presented"]))
.order_by(FeedItem.rank_score.desc())
.limit(20)
)
return [to_dict(i) for i in result.scalars().all()]
def _sync_list_feed(source_id=None, status=None):
with create_session() as session:
from sqlalchemy import select
query = select(FeedItem)
if source_id:
query = query.where(FeedItem.source_id == source_id)
if status:
query = query.where(FeedItem.status == status)
query = query.order_by(FeedItem.rank_score.desc())
result = session.execute(query)
return [to_dict(i) for i in result.scalars().all()]
def _sync_create_feed_item(data):
with create_session() as session:
item = FeedItem(**data)
session.add(item)
session.commit()
session.refresh(item)
return to_dict(item)
def _sync_update_feed_item(item_id, data):
with create_session() as session:
from sqlalchemy import select
result = session.execute(select(FeedItem).where(FeedItem.id == item_id))
item = result.scalar_one_or_none()
if not item:
return None
for key, value in data.items():
if hasattr(item, key):
setattr(item, key, value)
session.commit()
session.refresh(item)
return to_dict(item)
# Async wrappers using run_in_executor
loop = asyncio.get_event_loop
async def asyncio_get_projects():
return await loop().run_in_executor(None, _sync_list_projects)
async def asyncio_create_project(data):
return await loop().run_in_executor(None, partial(_sync_create_project, data))
async def asyncio_get_project(project_id):
return await loop().run_in_executor(None, partial(_sync_get_project, project_id))
async def asyncio_list_intents(project_id):
return await loop().run_in_executor(None, partial(_sync_list_intents, project_id))
async def asyncio_create_intent(data):
return await loop().run_in_executor(None, partial(_sync_create_intent, data))
async def asyncio_update_intent(intent_id, data):
return await loop().run_in_executor(None, partial(_sync_update_intent, intent_id, data))
async def asyncio_list_papers(library_status=None, read_status=None):
return await loop().run_in_executor(None, partial(_sync_list_papers, library_status, read_status))
async def asyncio_create_paper(data):
return await loop().run_in_executor(None, partial(_sync_create_paper, data))
async def asyncio_get_paper(paper_id):
return await loop().run_in_executor(None, partial(_sync_get_paper, paper_id))
async def asyncio_update_paper(paper_id, data):
return await loop().run_in_executor(None, partial(_sync_update_paper, paper_id, data))
async def asyncio_list_claims(project_id=None, support_level=None):
return await loop().run_in_executor(None, partial(_sync_list_claims, project_id, support_level))
async def asyncio_create_claim(data):
return await loop().run_in_executor(None, partial(_sync_create_claim, data))
async def asyncio_update_claim(claim_id, data):
return await loop().run_in_executor(None, partial(_sync_update_claim, claim_id, data))
async def asyncio_list_sources():
return await loop().run_in_executor(None, _sync_list_sources)
async def asyncio_create_source(data):
return await loop().run_in_executor(None, partial(_sync_create_source, data))
async def asyncio_update_source(source_id, data):
return await loop().run_in_executor(None, partial(_sync_update_source, source_id, data))
async def asyncio_get_today_feed():
return await loop().run_in_executor(None, _sync_get_today_feed)
async def asyncio_list_feed(source_id=None, status=None):
return await loop().run_in_executor(None, partial(_sync_list_feed, source_id, status))
async def asyncio_create_feed_item(data):
return await loop().run_in_executor(None, partial(_sync_create_feed_item, data))
async def asyncio_update_feed_item(item_id, data):
return await loop().run_in_executor(None, partial(_sync_update_feed_item, item_id, data))