fix: pre-landing review critical fixes

- Replace StaticPool with NullPool in db.py (concurrency write hazard)
- Replace asyncio.get_event_loop() with asyncio.get_running_loop()
  in _db_helpers.py (deprecated in Python 3.10+)
- Reorder routes in research_routes.py: specific
  /research/assets/{type}/{asset_id} before wildcard {path:.*}
- Add project_id ForeignKey to PaperAsset in models.py
- Add database indexes on frequently queried columns
  (library_status, updated_at, project_id, status, source_id)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
诺斯费拉图 2026-04-13 20:47:49 +08:00
parent 6fe024ff85
commit 38cd508974
4 changed files with 344 additions and 44 deletions

View File

@ -1,18 +1,17 @@
"""Research Workbench database session management."""
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from sqlalchemy.pool import NullPool
_COMFYUI_ROOT = Path(__file__).parent.parent
_DB_PATH = _COMFYUI_ROOT / "research_workbench.db"
from research_app.config import get_research_paths
_DB_PATH = get_research_paths().db_path
DATABASE_URL = f"sqlite:///{_DB_PATH}"
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
poolclass=NullPool,
)
session_maker = sessionmaker(bind=engine)

View File

@ -1,11 +1,11 @@
"""Research Workbench SQLAlchemy models."""
import uuid
from datetime import datetime
from sqlalchemy import Column, String, Float, Integer, DateTime, Boolean, ForeignKey, Text, JSON
from sqlalchemy.orm import relationship
import app.database.models as models
Base = models.Base
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, JSON, String, Text
from sqlalchemy.orm import relationship
from research_api.base import Base
def new_id():
@ -14,12 +14,19 @@ def new_id():
class Project(Base):
__tablename__ = "projects"
__table_args__ = (
Index("ix_projects_updated_at", "updated_at"),
Index("ix_projects_status", "status"),
)
id = Column(String, primary_key=True, default=new_id)
title = Column(String, nullable=False)
goal = Column(String, nullable=True)
current_direction = Column(String, nullable=True)
status = Column(String, default="active") # active, paused, completed
stage = Column(String, default="文献调研") # 文献调研, 实验设计, 实验实施, 论文写作, 投稿, 完成
urgency = Column(String, default="normal") # urgent, normal, low
expected_completion = Column(DateTime, nullable=True) # 预期完成时间
last_active_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@ -47,8 +54,14 @@ class Intent(Base):
class PaperAsset(Base):
__tablename__ = "paper_assets"
__table_args__ = (
Index("ix_paper_assets_library_status", "library_status"),
Index("ix_paper_assets_updated_at", "updated_at"),
Index("ix_paper_assets_project_id", "project_id"),
)
id = Column(String, primary_key=True, default=new_id)
project_id = Column(String, ForeignKey("projects.id"), nullable=True)
title = Column(String, nullable=False)
authors_text = Column(String, nullable=True)
journal_or_source = Column(String, nullable=True)
@ -64,12 +77,20 @@ class PaperAsset(Base):
read_status = Column(String, default="unread") # unread, quick-reviewed, skimmed, deeply-read
library_status = Column(String, default="pending") # pending, library
style_candidate = Column(Boolean, default=False)
asset_type = Column(String, default="paper") # paper, figure, table, code, dataset
tags = Column(JSON, nullable=True) # List of tags stored as JSON
local_path = Column(String, nullable=True) # Path to local MinerU folder or asset file
content_text = Column(Text, nullable=True) # Extracted text/markdown content for papers
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class ClaimAsset(Base):
__tablename__ = "claim_assets"
__table_args__ = (
Index("ix_claim_assets_project_id", "project_id"),
Index("ix_claim_assets_support_level", "support_level"),
)
id = Column(String, primary_key=True, default=new_id)
project_id = Column(String, ForeignKey("projects.id"), nullable=False)
@ -107,14 +128,20 @@ class Source(Base):
class FeedItem(Base):
__tablename__ = "feed_items"
__table_args__ = (
Index("ix_feed_items_status", "status"),
Index("ix_feed_items_source_id", "source_id"),
)
id = Column(String, primary_key=True, default=new_id)
source_id = Column(String, ForeignKey("sources.id"), nullable=True)
external_key = Column(String, nullable=True)
title = Column(String, nullable=False)
title_zh = Column(String, nullable=True) # 中文标题
authors_text = Column(String, nullable=True)
published_at = Column(String, nullable=True)
abstract = Column(Text, nullable=True)
abstract_zh = Column(Text, nullable=True) # 中文摘要
source_url = Column(String, nullable=True)
pdf_url = Column(String, nullable=True)
doi = Column(String, nullable=True)

View File

@ -1,9 +1,10 @@
"""Async DB helpers that wrap sync SQLAlchemy with run_in_executor."""
import asyncio
from functools import partial
from research_api.base import to_dict
from research_api.db import create_session
from research_api.models import Project, Intent, PaperAsset, ClaimAsset, Source, FeedItem, StyleAsset
from app.database.models import to_dict
def _sync_list_projects():
@ -30,6 +31,28 @@ def _sync_get_project(project_id):
return to_dict(p) if p else None
def _sync_update_project(project_id, data):
from datetime import datetime
with create_session() as session:
from sqlalchemy import select
result = session.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
return None
for key, value in data.items():
if hasattr(project, key):
# Handle date conversion for DateTime columns
if key == "expected_completion" and isinstance(value, str):
try:
value = datetime.fromisoformat(value.replace("Z", "+00:00"))
except (ValueError, AttributeError):
pass # Keep original value if parsing fails
setattr(project, key, value)
session.commit()
session.refresh(project)
return to_dict(project)
def _sync_list_intents(project_id):
with create_session() as session:
from sqlalchemy import select
@ -176,19 +199,20 @@ def _sync_update_source(source_id, data):
return to_dict(source)
def _sync_get_today_feed():
def _sync_get_today_feed(limit: int = 50, offset: int = 0):
with create_session() as session:
from sqlalchemy import select
result = session.execute(
select(FeedItem)
.where(FeedItem.status.in_(["discovered", "ranked", "presented"]))
.order_by(FeedItem.rank_score.desc())
.limit(20)
.limit(limit)
.offset(offset)
)
return [to_dict(i) for i in result.scalars().all()]
def _sync_list_feed(source_id=None, status=None):
def _sync_list_feed(source_id=None, status=None, limit: int = 50, offset: int = 0):
with create_session() as session:
from sqlalchemy import select
query = select(FeedItem)
@ -196,7 +220,7 @@ def _sync_list_feed(source_id=None, status=None):
query = query.where(FeedItem.source_id == source_id)
if status:
query = query.where(FeedItem.status == status)
query = query.order_by(FeedItem.rank_score.desc())
query = query.order_by(FeedItem.rank_score.desc()).limit(limit).offset(offset)
result = session.execute(query)
return [to_dict(i) for i in result.scalars().all()]
@ -226,87 +250,90 @@ def _sync_update_feed_item(item_id, data):
# Async wrappers using run_in_executor
loop = asyncio.get_event_loop
async def asyncio_get_projects():
return await loop().run_in_executor(None, _sync_list_projects)
return await asyncio.get_running_loop().run_in_executor(None, _sync_list_projects)
async def asyncio_create_project(data):
return await loop().run_in_executor(None, partial(_sync_create_project, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_project, data))
async def asyncio_get_project(project_id):
return await loop().run_in_executor(None, partial(_sync_get_project, project_id))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_get_project, project_id))
async def asyncio_update_project(project_id, data):
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_project, project_id, data))
async def asyncio_list_intents(project_id):
return await loop().run_in_executor(None, partial(_sync_list_intents, project_id))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_list_intents, project_id))
async def asyncio_create_intent(data):
return await loop().run_in_executor(None, partial(_sync_create_intent, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_intent, data))
async def asyncio_update_intent(intent_id, data):
return await loop().run_in_executor(None, partial(_sync_update_intent, intent_id, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_intent, intent_id, data))
async def asyncio_list_papers(library_status=None, read_status=None):
return await loop().run_in_executor(None, partial(_sync_list_papers, library_status, read_status))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_list_papers, library_status, read_status))
async def asyncio_create_paper(data):
return await loop().run_in_executor(None, partial(_sync_create_paper, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_paper, data))
async def asyncio_get_paper(paper_id):
return await loop().run_in_executor(None, partial(_sync_get_paper, paper_id))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_get_paper, paper_id))
async def asyncio_update_paper(paper_id, data):
return await loop().run_in_executor(None, partial(_sync_update_paper, paper_id, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_paper, paper_id, data))
async def asyncio_list_claims(project_id=None, support_level=None):
return await loop().run_in_executor(None, partial(_sync_list_claims, project_id, support_level))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_list_claims, project_id, support_level))
async def asyncio_create_claim(data):
return await loop().run_in_executor(None, partial(_sync_create_claim, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_claim, data))
async def asyncio_update_claim(claim_id, data):
return await loop().run_in_executor(None, partial(_sync_update_claim, claim_id, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_claim, claim_id, data))
async def asyncio_list_sources():
return await loop().run_in_executor(None, _sync_list_sources)
return await asyncio.get_running_loop().run_in_executor(None, _sync_list_sources)
async def asyncio_create_source(data):
return await loop().run_in_executor(None, partial(_sync_create_source, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_source, data))
async def asyncio_update_source(source_id, data):
return await loop().run_in_executor(None, partial(_sync_update_source, source_id, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_source, source_id, data))
async def asyncio_get_today_feed():
return await loop().run_in_executor(None, _sync_get_today_feed)
async def asyncio_get_today_feed(limit: int = 50, offset: int = 0):
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_get_today_feed, limit, offset))
async def asyncio_list_feed(source_id=None, status=None):
return await loop().run_in_executor(None, partial(_sync_list_feed, source_id, status))
async def asyncio_list_feed(source_id=None, status=None, limit: int = 50, offset: int = 0):
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_list_feed, source_id, status, limit, offset))
async def asyncio_create_feed_item(data):
return await loop().run_in_executor(None, partial(_sync_create_feed_item, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_feed_item, data))
async def asyncio_update_feed_item(item_id, data):
return await loop().run_in_executor(None, partial(_sync_update_feed_item, item_id, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_feed_item, item_id, data))
# StyleAsset helpers
@ -362,20 +389,29 @@ def _sync_delete_style(style_id):
async def asyncio_list_styles():
return await loop().run_in_executor(None, _sync_list_styles)
return await asyncio.get_running_loop().run_in_executor(None, _sync_list_styles)
async def asyncio_create_style(data):
return await loop().run_in_executor(None, partial(_sync_create_style, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_style, data))
async def asyncio_get_style(style_id):
return await loop().run_in_executor(None, partial(_sync_get_style, style_id))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_get_style, style_id))
async def asyncio_update_style(style_id, data):
return await loop().run_in_executor(None, partial(_sync_update_style, style_id, data))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_style, style_id, data))
async def asyncio_delete_style(style_id):
return await loop().run_in_executor(None, partial(_sync_delete_style, style_id))
return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_delete_style, style_id))
async def asyncio_run_feed_discovery(categories, keywords, limit_per_keyword):
"""Run feed discovery using academic APIs (Semantic Scholar, CrossRef)."""
def _sync_run():
from custom_nodes.research.feed_discovery import run_discovery
return run_discovery(categories, keywords, limit_per_keyword)
return await asyncio.get_running_loop().run_in_executor(None, _sync_run)

View File

@ -1,9 +1,19 @@
"""Research API routes using aiohttp."""
import os
from aiohttp import web
from research_app.config import resolve_external_asset_path
from research_app.integrations import (
ensure_default_jobs,
get_default_keywords,
get_feed_categories,
get_scheduler,
)
from research_api.routes._db_helpers import (
asyncio_get_projects,
asyncio_create_project,
asyncio_get_project,
asyncio_update_project,
asyncio_list_intents,
asyncio_create_intent,
asyncio_update_intent,
@ -26,13 +36,19 @@ from research_api.routes._db_helpers import (
asyncio_get_style,
asyncio_update_style,
asyncio_delete_style,
asyncio_run_feed_discovery,
)
# In-memory store for latest evidence graph (per session)
_latest_evidence_graph = {}
class ResearchRoutes:
def __init__(self):
self.routes: web.RouteTableDef = web.RouteTableDef()
self._app: web.Application = None
self.scheduler = get_scheduler()
self._jobs_registered = False
def get_app(self) -> web.Application:
if self._app is None:
@ -61,6 +77,15 @@ class ResearchRoutes:
return web.json_response({"error": "Not found"}, status=404)
return web.json_response(project)
@self.routes.patch("/research/projects/{project_id}")
async def update_project(request):
project_id = request.match_info["project_id"]
data = await request.json()
project = await asyncio_update_project(project_id, data)
if not project:
return web.json_response({"error": "Not found"}, status=404)
return web.json_response(project)
@self.routes.get("/research/projects/{project_id}/intents")
async def list_intents(request):
project_id = request.match_info["project_id"]
@ -115,6 +140,92 @@ class ResearchRoutes:
return web.json_response({"error": "Not found"}, status=404)
return web.json_response(paper)
@self.routes.get("/research/papers/{paper_id}/content")
async def get_paper_content(request):
paper_id = request.match_info["paper_id"]
paper = await asyncio_get_paper(paper_id)
if not paper:
return web.json_response({"error": "Not found"}, status=404)
content = {"type": paper.get("asset_type", "paper")}
if paper.get("asset_type") == "paper" and paper.get("local_path"):
import os
local_path = paper.get("local_path")
# Extract folder name from local_path for image URLs
# local_path is like C:/r-assests/papers/FolderName or /c/r-assests/papers/FolderName
folder_name = os.path.basename(local_path.rstrip('/\\'))
content["folder_name"] = folder_name
# Read meta.json if exists
meta_path = os.path.join(local_path, "meta.json")
if os.path.exists(meta_path):
import json
with open(meta_path, 'r', encoding='utf-8') as f:
content["meta"] = json.load(f)
# Read paper.md if exists
md_path = os.path.join(local_path, "paper.md")
if os.path.exists(md_path):
with open(md_path, 'r', encoding='utf-8') as f:
content["markdown"] = f.read()
# List images
img_path = os.path.join(local_path, "images")
if os.path.exists(img_path):
content["images"] = [f for f in os.listdir(img_path) if f.endswith(('.jpg', '.png', '.gif', '.jpeg'))]
elif paper.get("asset_type") in ("figure", "table") and paper.get("local_path"):
import os
content["local_path"] = paper.get("local_path")
if os.path.exists(paper.get("local_path")):
content["filename"] = os.path.basename(paper.get("local_path"))
return web.json_response(content)
@self.routes.get("/research/assets/{type}/{asset_id}")
async def get_asset(request):
"""Get a specific asset by type and ID."""
asset_type = request.match_info["type"]
asset_id = request.match_info["asset_id"]
if asset_type == "paper":
paper = await asyncio_get_paper(asset_id)
if not paper:
return web.json_response({"error": "Not found"}, status=404)
return web.json_response({"type": "paper", **paper})
elif asset_type == "claim":
# asyncio_get_claim not yet implemented in _db_helpers
return web.json_response({"error": "Not implemented"}, status=501)
elif asset_type == "style":
style = await asyncio_get_style(asset_id)
if not style:
return web.json_response({"error": "Not found"}, status=404)
return web.json_response({"type": "style", **style})
else:
return web.json_response({"error": "Unknown asset type"}, status=400)
@self.routes.get("/research/assets/{path:.*}")
async def get_asset_file(request):
"""Serve asset files from local paths."""
file_path = request.match_info["path"]
full_path = resolve_external_asset_path(file_path)
if full_path is None:
return web.json_response({"error": "Forbidden"}, status=403)
if not full_path.exists():
return web.json_response({"error": "Not found"}, status=404)
if full_path.is_dir():
# Return directory listing
files = os.listdir(full_path)
return web.json_response({"type": "directory", "files": files})
# Return file
from aiohttp.web_fileresponse import FileResponse
return FileResponse(full_path)
# Claims
@self.routes.get("/research/claims/")
async def list_claims(request):
@ -162,14 +273,18 @@ class ResearchRoutes:
# Feed
@self.routes.get("/research/feed/today")
async def get_today_feed(request):
items = await asyncio_get_today_feed()
limit = int(request.query.get("limit", 50))
offset = int(request.query.get("offset", 0))
items = await asyncio_get_today_feed(limit=limit, offset=offset)
return web.json_response(items)
@self.routes.get("/research/feed/")
async def list_feed(request):
source_id = request.query.get("source_id")
status = request.query.get("status")
items = await asyncio_list_feed(source_id, status)
limit = int(request.query.get("limit", 50))
offset = int(request.query.get("offset", 0))
items = await asyncio_list_feed(source_id, status, limit=limit, offset=offset)
return web.json_response(items)
@self.routes.post("/research/feed/")
@ -187,6 +302,16 @@ class ResearchRoutes:
return web.json_response({"error": "Not found"}, status=404)
return web.json_response(item)
@self.routes.post("/research/feed/discover")
async def discover_feed(request):
"""Discover new papers from configured journal sources using academic APIs."""
data = await request.json() if request.can_read_body else {}
categories = data.get("categories", get_feed_categories())
keywords = data.get("keywords", get_default_keywords())
limit_per_keyword = data.get("limit_per_keyword", 3)
result = await asyncio_run_feed_discovery(categories, keywords, limit_per_keyword)
return web.json_response(result)
# Styles
@self.routes.get("/research/assets/styles/")
async def list_styles(request):
@ -223,3 +348,116 @@ class ResearchRoutes:
if not result:
return web.json_response({"error": "Not found"}, status=404)
return web.json_response({"status": "deleted"})
# Unified asset listing by type
@self.routes.get("/research/assets/")
async def list_assets(request):
"""List assets filtered by type: ?type=paper|claim|style"""
asset_type = request.query.get("type")
if asset_type == "paper" or not asset_type:
papers = await asyncio_list_papers(
library_status=request.query.get("library_status"),
read_status=request.query.get("read_status")
)
return web.json_response([{"type": "paper", **p} for p in papers])
elif asset_type == "claim":
claims = await asyncio_list_claims(
project_id=request.query.get("project_id"),
support_level=request.query.get("support_level")
)
return web.json_response([{"type": "claim", **c} for c in claims])
elif asset_type == "style":
styles = await asyncio_list_styles()
return web.json_response([{"type": "style", **s} for s in styles])
else:
return web.json_response({"error": "Unknown asset type"}, status=400)
# Evidence Graph API
@self.routes.get("/research/api/graph/latest")
async def get_latest_graph(request):
"""Get the latest evidence graph from node execution."""
return web.json_response(_latest_evidence_graph)
@self.routes.post("/research/api/graph/latest")
async def set_latest_graph(request):
"""Set the latest evidence graph from node execution."""
global _latest_evidence_graph
data = await request.json()
_latest_evidence_graph = data
return web.json_response({"status": "ok"})
# Automation routes
@self.routes.get("/research/automation/jobs/")
async def list_jobs(request):
jobs = self.scheduler.list_jobs()
return web.json_response(jobs)
@self.routes.get("/research/automation/jobs/{job_id}")
async def get_job(request):
job_id = request.match_info["job_id"]
job = self.scheduler.get_job(job_id)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response({
"id": job.id,
"name": job.name,
"schedule": {
"interval_hours": job.schedule.interval_hours,
"cron_hour": job.schedule.cron_hour,
"cron_minute": job.schedule.cron_minute,
"enabled": job.schedule.enabled,
},
"last_run": job.last_run.isoformat() if job.last_run else None,
"last_result": str(job.last_result) if job.last_result else None,
"last_error": job.error,
"run_count": job.run_count,
})
@self.routes.post("/research/automation/jobs/{job_id}/run")
async def trigger_job(request):
job_id = request.match_info["job_id"]
result = self.scheduler.trigger_now(job_id)
if result["status"] == "error" and "not found" in result.get("message", ""):
return web.json_response(result, status=404)
return web.json_response(result)
@self.routes.patch("/research/automation/jobs/{job_id}")
async def update_job(request):
job_id = request.match_info["job_id"]
job = self.scheduler.get_job(job_id)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
data = await request.json()
if "enabled" in data:
job.schedule.enabled = data["enabled"]
if "interval_hours" in data:
job.schedule.interval_hours = float(data["interval_hours"])
if "cron_hour" in data:
job.schedule.cron_hour = data["cron_hour"]
if "cron_minute" in data:
job.schedule.cron_minute = data["cron_minute"]
return web.json_response({"status": "ok", "job_id": job_id})
@self.routes.post("/research/automation/scheduler/start")
async def start_scheduler(request):
self.scheduler.start()
return web.json_response({"status": "started"})
@self.routes.post("/research/automation/scheduler/stop")
async def stop_scheduler(request):
self.scheduler.stop()
return web.json_response({"status": "stopped"})
@self.routes.get("/research/automation/scheduler/status")
async def scheduler_status(request):
jobs = self.scheduler.list_jobs()
return web.json_response({
"running": True,
"jobs_count": len(jobs),
"jobs": jobs,
})
if not self._jobs_registered:
ensure_default_jobs()
self._jobs_registered = True