diff --git a/research_api/db.py b/research_api/db.py index f9a04362e..a72ac0014 100644 --- a/research_api/db.py +++ b/research_api/db.py @@ -1,18 +1,17 @@ """Research Workbench database session management.""" -from pathlib import Path from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker -from sqlalchemy.pool import StaticPool +from sqlalchemy.pool import NullPool -_COMFYUI_ROOT = Path(__file__).parent.parent -_DB_PATH = _COMFYUI_ROOT / "research_workbench.db" +from research_app.config import get_research_paths +_DB_PATH = get_research_paths().db_path DATABASE_URL = f"sqlite:///{_DB_PATH}" engine = create_engine( DATABASE_URL, connect_args={"check_same_thread": False}, - poolclass=StaticPool, + poolclass=NullPool, ) session_maker = sessionmaker(bind=engine) diff --git a/research_api/models.py b/research_api/models.py index 33f56c03d..c9d84534d 100644 --- a/research_api/models.py +++ b/research_api/models.py @@ -1,11 +1,11 @@ """Research Workbench SQLAlchemy models.""" import uuid from datetime import datetime -from sqlalchemy import Column, String, Float, Integer, DateTime, Boolean, ForeignKey, Text, JSON -from sqlalchemy.orm import relationship -import app.database.models as models -Base = models.Base +from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, JSON, String, Text +from sqlalchemy.orm import relationship + +from research_api.base import Base def new_id(): @@ -14,12 +14,19 @@ def new_id(): class Project(Base): __tablename__ = "projects" + __table_args__ = ( + Index("ix_projects_updated_at", "updated_at"), + Index("ix_projects_status", "status"), + ) id = Column(String, primary_key=True, default=new_id) title = Column(String, nullable=False) goal = Column(String, nullable=True) current_direction = Column(String, nullable=True) status = Column(String, default="active") # active, paused, completed + stage = Column(String, default="文献调研") # 文献调研, 实验设计, 实验实施, 论文写作, 投稿, 完成 + urgency = Column(String, default="normal") # urgent, normal, low + expected_completion = Column(DateTime, nullable=True) # 预期完成时间 last_active_at = Column(DateTime, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) @@ -47,8 +54,14 @@ class Intent(Base): class PaperAsset(Base): __tablename__ = "paper_assets" + __table_args__ = ( + Index("ix_paper_assets_library_status", "library_status"), + Index("ix_paper_assets_updated_at", "updated_at"), + Index("ix_paper_assets_project_id", "project_id"), + ) id = Column(String, primary_key=True, default=new_id) + project_id = Column(String, ForeignKey("projects.id"), nullable=True) title = Column(String, nullable=False) authors_text = Column(String, nullable=True) journal_or_source = Column(String, nullable=True) @@ -64,12 +77,20 @@ class PaperAsset(Base): read_status = Column(String, default="unread") # unread, quick-reviewed, skimmed, deeply-read library_status = Column(String, default="pending") # pending, library style_candidate = Column(Boolean, default=False) + asset_type = Column(String, default="paper") # paper, figure, table, code, dataset + tags = Column(JSON, nullable=True) # List of tags stored as JSON + local_path = Column(String, nullable=True) # Path to local MinerU folder or asset file + content_text = Column(Text, nullable=True) # Extracted text/markdown content for papers created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) class ClaimAsset(Base): __tablename__ = "claim_assets" + __table_args__ = ( + Index("ix_claim_assets_project_id", "project_id"), + Index("ix_claim_assets_support_level", "support_level"), + ) id = Column(String, primary_key=True, default=new_id) project_id = Column(String, ForeignKey("projects.id"), nullable=False) @@ -107,14 +128,20 @@ class Source(Base): class FeedItem(Base): __tablename__ = "feed_items" + __table_args__ = ( + Index("ix_feed_items_status", "status"), + Index("ix_feed_items_source_id", "source_id"), + ) id = Column(String, primary_key=True, default=new_id) source_id = Column(String, ForeignKey("sources.id"), nullable=True) external_key = Column(String, nullable=True) title = Column(String, nullable=False) + title_zh = Column(String, nullable=True) # 中文标题 authors_text = Column(String, nullable=True) published_at = Column(String, nullable=True) abstract = Column(Text, nullable=True) + abstract_zh = Column(Text, nullable=True) # 中文摘要 source_url = Column(String, nullable=True) pdf_url = Column(String, nullable=True) doi = Column(String, nullable=True) diff --git a/research_api/routes/_db_helpers.py b/research_api/routes/_db_helpers.py index 051c93c19..f9d1ef717 100644 --- a/research_api/routes/_db_helpers.py +++ b/research_api/routes/_db_helpers.py @@ -1,9 +1,10 @@ """Async DB helpers that wrap sync SQLAlchemy with run_in_executor.""" import asyncio from functools import partial + +from research_api.base import to_dict from research_api.db import create_session from research_api.models import Project, Intent, PaperAsset, ClaimAsset, Source, FeedItem, StyleAsset -from app.database.models import to_dict def _sync_list_projects(): @@ -30,6 +31,28 @@ def _sync_get_project(project_id): return to_dict(p) if p else None +def _sync_update_project(project_id, data): + from datetime import datetime + with create_session() as session: + from sqlalchemy import select + result = session.execute(select(Project).where(Project.id == project_id)) + project = result.scalar_one_or_none() + if not project: + return None + for key, value in data.items(): + if hasattr(project, key): + # Handle date conversion for DateTime columns + if key == "expected_completion" and isinstance(value, str): + try: + value = datetime.fromisoformat(value.replace("Z", "+00:00")) + except (ValueError, AttributeError): + pass # Keep original value if parsing fails + setattr(project, key, value) + session.commit() + session.refresh(project) + return to_dict(project) + + def _sync_list_intents(project_id): with create_session() as session: from sqlalchemy import select @@ -176,19 +199,20 @@ def _sync_update_source(source_id, data): return to_dict(source) -def _sync_get_today_feed(): +def _sync_get_today_feed(limit: int = 50, offset: int = 0): with create_session() as session: from sqlalchemy import select result = session.execute( select(FeedItem) .where(FeedItem.status.in_(["discovered", "ranked", "presented"])) .order_by(FeedItem.rank_score.desc()) - .limit(20) + .limit(limit) + .offset(offset) ) return [to_dict(i) for i in result.scalars().all()] -def _sync_list_feed(source_id=None, status=None): +def _sync_list_feed(source_id=None, status=None, limit: int = 50, offset: int = 0): with create_session() as session: from sqlalchemy import select query = select(FeedItem) @@ -196,7 +220,7 @@ def _sync_list_feed(source_id=None, status=None): query = query.where(FeedItem.source_id == source_id) if status: query = query.where(FeedItem.status == status) - query = query.order_by(FeedItem.rank_score.desc()) + query = query.order_by(FeedItem.rank_score.desc()).limit(limit).offset(offset) result = session.execute(query) return [to_dict(i) for i in result.scalars().all()] @@ -226,87 +250,90 @@ def _sync_update_feed_item(item_id, data): # Async wrappers using run_in_executor -loop = asyncio.get_event_loop async def asyncio_get_projects(): - return await loop().run_in_executor(None, _sync_list_projects) + return await asyncio.get_running_loop().run_in_executor(None, _sync_list_projects) async def asyncio_create_project(data): - return await loop().run_in_executor(None, partial(_sync_create_project, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_project, data)) async def asyncio_get_project(project_id): - return await loop().run_in_executor(None, partial(_sync_get_project, project_id)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_get_project, project_id)) + + +async def asyncio_update_project(project_id, data): + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_project, project_id, data)) async def asyncio_list_intents(project_id): - return await loop().run_in_executor(None, partial(_sync_list_intents, project_id)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_list_intents, project_id)) async def asyncio_create_intent(data): - return await loop().run_in_executor(None, partial(_sync_create_intent, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_intent, data)) async def asyncio_update_intent(intent_id, data): - return await loop().run_in_executor(None, partial(_sync_update_intent, intent_id, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_intent, intent_id, data)) async def asyncio_list_papers(library_status=None, read_status=None): - return await loop().run_in_executor(None, partial(_sync_list_papers, library_status, read_status)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_list_papers, library_status, read_status)) async def asyncio_create_paper(data): - return await loop().run_in_executor(None, partial(_sync_create_paper, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_paper, data)) async def asyncio_get_paper(paper_id): - return await loop().run_in_executor(None, partial(_sync_get_paper, paper_id)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_get_paper, paper_id)) async def asyncio_update_paper(paper_id, data): - return await loop().run_in_executor(None, partial(_sync_update_paper, paper_id, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_paper, paper_id, data)) async def asyncio_list_claims(project_id=None, support_level=None): - return await loop().run_in_executor(None, partial(_sync_list_claims, project_id, support_level)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_list_claims, project_id, support_level)) async def asyncio_create_claim(data): - return await loop().run_in_executor(None, partial(_sync_create_claim, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_claim, data)) async def asyncio_update_claim(claim_id, data): - return await loop().run_in_executor(None, partial(_sync_update_claim, claim_id, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_claim, claim_id, data)) async def asyncio_list_sources(): - return await loop().run_in_executor(None, _sync_list_sources) + return await asyncio.get_running_loop().run_in_executor(None, _sync_list_sources) async def asyncio_create_source(data): - return await loop().run_in_executor(None, partial(_sync_create_source, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_source, data)) async def asyncio_update_source(source_id, data): - return await loop().run_in_executor(None, partial(_sync_update_source, source_id, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_source, source_id, data)) -async def asyncio_get_today_feed(): - return await loop().run_in_executor(None, _sync_get_today_feed) +async def asyncio_get_today_feed(limit: int = 50, offset: int = 0): + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_get_today_feed, limit, offset)) -async def asyncio_list_feed(source_id=None, status=None): - return await loop().run_in_executor(None, partial(_sync_list_feed, source_id, status)) +async def asyncio_list_feed(source_id=None, status=None, limit: int = 50, offset: int = 0): + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_list_feed, source_id, status, limit, offset)) async def asyncio_create_feed_item(data): - return await loop().run_in_executor(None, partial(_sync_create_feed_item, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_feed_item, data)) async def asyncio_update_feed_item(item_id, data): - return await loop().run_in_executor(None, partial(_sync_update_feed_item, item_id, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_feed_item, item_id, data)) # StyleAsset helpers @@ -362,20 +389,29 @@ def _sync_delete_style(style_id): async def asyncio_list_styles(): - return await loop().run_in_executor(None, _sync_list_styles) + return await asyncio.get_running_loop().run_in_executor(None, _sync_list_styles) async def asyncio_create_style(data): - return await loop().run_in_executor(None, partial(_sync_create_style, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_create_style, data)) async def asyncio_get_style(style_id): - return await loop().run_in_executor(None, partial(_sync_get_style, style_id)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_get_style, style_id)) async def asyncio_update_style(style_id, data): - return await loop().run_in_executor(None, partial(_sync_update_style, style_id, data)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_update_style, style_id, data)) async def asyncio_delete_style(style_id): - return await loop().run_in_executor(None, partial(_sync_delete_style, style_id)) + return await asyncio.get_running_loop().run_in_executor(None, partial(_sync_delete_style, style_id)) + + +async def asyncio_run_feed_discovery(categories, keywords, limit_per_keyword): + """Run feed discovery using academic APIs (Semantic Scholar, CrossRef).""" + def _sync_run(): + from custom_nodes.research.feed_discovery import run_discovery + return run_discovery(categories, keywords, limit_per_keyword) + + return await asyncio.get_running_loop().run_in_executor(None, _sync_run) diff --git a/research_api/routes/research_routes.py b/research_api/routes/research_routes.py index 1d64c77c3..cc1924fc5 100644 --- a/research_api/routes/research_routes.py +++ b/research_api/routes/research_routes.py @@ -1,9 +1,19 @@ """Research API routes using aiohttp.""" +import os from aiohttp import web + +from research_app.config import resolve_external_asset_path +from research_app.integrations import ( + ensure_default_jobs, + get_default_keywords, + get_feed_categories, + get_scheduler, +) from research_api.routes._db_helpers import ( asyncio_get_projects, asyncio_create_project, asyncio_get_project, + asyncio_update_project, asyncio_list_intents, asyncio_create_intent, asyncio_update_intent, @@ -26,13 +36,19 @@ from research_api.routes._db_helpers import ( asyncio_get_style, asyncio_update_style, asyncio_delete_style, + asyncio_run_feed_discovery, ) +# In-memory store for latest evidence graph (per session) +_latest_evidence_graph = {} + class ResearchRoutes: def __init__(self): self.routes: web.RouteTableDef = web.RouteTableDef() self._app: web.Application = None + self.scheduler = get_scheduler() + self._jobs_registered = False def get_app(self) -> web.Application: if self._app is None: @@ -61,6 +77,15 @@ class ResearchRoutes: return web.json_response({"error": "Not found"}, status=404) return web.json_response(project) + @self.routes.patch("/research/projects/{project_id}") + async def update_project(request): + project_id = request.match_info["project_id"] + data = await request.json() + project = await asyncio_update_project(project_id, data) + if not project: + return web.json_response({"error": "Not found"}, status=404) + return web.json_response(project) + @self.routes.get("/research/projects/{project_id}/intents") async def list_intents(request): project_id = request.match_info["project_id"] @@ -115,6 +140,92 @@ class ResearchRoutes: return web.json_response({"error": "Not found"}, status=404) return web.json_response(paper) + @self.routes.get("/research/papers/{paper_id}/content") + async def get_paper_content(request): + paper_id = request.match_info["paper_id"] + paper = await asyncio_get_paper(paper_id) + if not paper: + return web.json_response({"error": "Not found"}, status=404) + + content = {"type": paper.get("asset_type", "paper")} + + if paper.get("asset_type") == "paper" and paper.get("local_path"): + import os + local_path = paper.get("local_path") + + # Extract folder name from local_path for image URLs + # local_path is like C:/r-assests/papers/FolderName or /c/r-assests/papers/FolderName + folder_name = os.path.basename(local_path.rstrip('/\\')) + content["folder_name"] = folder_name + + # Read meta.json if exists + meta_path = os.path.join(local_path, "meta.json") + if os.path.exists(meta_path): + import json + with open(meta_path, 'r', encoding='utf-8') as f: + content["meta"] = json.load(f) + + # Read paper.md if exists + md_path = os.path.join(local_path, "paper.md") + if os.path.exists(md_path): + with open(md_path, 'r', encoding='utf-8') as f: + content["markdown"] = f.read() + + # List images + img_path = os.path.join(local_path, "images") + if os.path.exists(img_path): + content["images"] = [f for f in os.listdir(img_path) if f.endswith(('.jpg', '.png', '.gif', '.jpeg'))] + + elif paper.get("asset_type") in ("figure", "table") and paper.get("local_path"): + import os + content["local_path"] = paper.get("local_path") + if os.path.exists(paper.get("local_path")): + content["filename"] = os.path.basename(paper.get("local_path")) + + return web.json_response(content) + + @self.routes.get("/research/assets/{type}/{asset_id}") + async def get_asset(request): + """Get a specific asset by type and ID.""" + asset_type = request.match_info["type"] + asset_id = request.match_info["asset_id"] + + if asset_type == "paper": + paper = await asyncio_get_paper(asset_id) + if not paper: + return web.json_response({"error": "Not found"}, status=404) + return web.json_response({"type": "paper", **paper}) + elif asset_type == "claim": + # asyncio_get_claim not yet implemented in _db_helpers + return web.json_response({"error": "Not implemented"}, status=501) + elif asset_type == "style": + style = await asyncio_get_style(asset_id) + if not style: + return web.json_response({"error": "Not found"}, status=404) + return web.json_response({"type": "style", **style}) + else: + return web.json_response({"error": "Unknown asset type"}, status=400) + + @self.routes.get("/research/assets/{path:.*}") + async def get_asset_file(request): + """Serve asset files from local paths.""" + file_path = request.match_info["path"] + full_path = resolve_external_asset_path(file_path) + if full_path is None: + return web.json_response({"error": "Forbidden"}, status=403) + + if not full_path.exists(): + return web.json_response({"error": "Not found"}, status=404) + + if full_path.is_dir(): + # Return directory listing + files = os.listdir(full_path) + return web.json_response({"type": "directory", "files": files}) + + # Return file + from aiohttp.web_fileresponse import FileResponse + return FileResponse(full_path) + # Claims @self.routes.get("/research/claims/") async def list_claims(request): @@ -162,14 +273,18 @@ class ResearchRoutes: # Feed @self.routes.get("/research/feed/today") async def get_today_feed(request): - items = await asyncio_get_today_feed() + limit = int(request.query.get("limit", 50)) + offset = int(request.query.get("offset", 0)) + items = await asyncio_get_today_feed(limit=limit, offset=offset) return web.json_response(items) @self.routes.get("/research/feed/") async def list_feed(request): source_id = request.query.get("source_id") status = request.query.get("status") - items = await asyncio_list_feed(source_id, status) + limit = int(request.query.get("limit", 50)) + offset = int(request.query.get("offset", 0)) + items = await asyncio_list_feed(source_id, status, limit=limit, offset=offset) return web.json_response(items) @self.routes.post("/research/feed/") @@ -187,6 +302,16 @@ class ResearchRoutes: return web.json_response({"error": "Not found"}, status=404) return web.json_response(item) + @self.routes.post("/research/feed/discover") + async def discover_feed(request): + """Discover new papers from configured journal sources using academic APIs.""" + data = await request.json() if request.can_read_body else {} + categories = data.get("categories", get_feed_categories()) + keywords = data.get("keywords", get_default_keywords()) + limit_per_keyword = data.get("limit_per_keyword", 3) + result = await asyncio_run_feed_discovery(categories, keywords, limit_per_keyword) + return web.json_response(result) + # Styles @self.routes.get("/research/assets/styles/") async def list_styles(request): @@ -223,3 +348,116 @@ class ResearchRoutes: if not result: return web.json_response({"error": "Not found"}, status=404) return web.json_response({"status": "deleted"}) + + # Unified asset listing by type + @self.routes.get("/research/assets/") + async def list_assets(request): + """List assets filtered by type: ?type=paper|claim|style""" + asset_type = request.query.get("type") + + if asset_type == "paper" or not asset_type: + papers = await asyncio_list_papers( + library_status=request.query.get("library_status"), + read_status=request.query.get("read_status") + ) + return web.json_response([{"type": "paper", **p} for p in papers]) + elif asset_type == "claim": + claims = await asyncio_list_claims( + project_id=request.query.get("project_id"), + support_level=request.query.get("support_level") + ) + return web.json_response([{"type": "claim", **c} for c in claims]) + elif asset_type == "style": + styles = await asyncio_list_styles() + return web.json_response([{"type": "style", **s} for s in styles]) + else: + return web.json_response({"error": "Unknown asset type"}, status=400) + + # Evidence Graph API + @self.routes.get("/research/api/graph/latest") + async def get_latest_graph(request): + """Get the latest evidence graph from node execution.""" + return web.json_response(_latest_evidence_graph) + + @self.routes.post("/research/api/graph/latest") + async def set_latest_graph(request): + """Set the latest evidence graph from node execution.""" + global _latest_evidence_graph + data = await request.json() + _latest_evidence_graph = data + return web.json_response({"status": "ok"}) + + # Automation routes + @self.routes.get("/research/automation/jobs/") + async def list_jobs(request): + jobs = self.scheduler.list_jobs() + return web.json_response(jobs) + + @self.routes.get("/research/automation/jobs/{job_id}") + async def get_job(request): + job_id = request.match_info["job_id"] + job = self.scheduler.get_job(job_id) + if not job: + return web.json_response({"error": "Job not found"}, status=404) + return web.json_response({ + "id": job.id, + "name": job.name, + "schedule": { + "interval_hours": job.schedule.interval_hours, + "cron_hour": job.schedule.cron_hour, + "cron_minute": job.schedule.cron_minute, + "enabled": job.schedule.enabled, + }, + "last_run": job.last_run.isoformat() if job.last_run else None, + "last_result": str(job.last_result) if job.last_result else None, + "last_error": job.error, + "run_count": job.run_count, + }) + + @self.routes.post("/research/automation/jobs/{job_id}/run") + async def trigger_job(request): + job_id = request.match_info["job_id"] + result = self.scheduler.trigger_now(job_id) + if result["status"] == "error" and "not found" in result.get("message", ""): + return web.json_response(result, status=404) + return web.json_response(result) + + @self.routes.patch("/research/automation/jobs/{job_id}") + async def update_job(request): + job_id = request.match_info["job_id"] + job = self.scheduler.get_job(job_id) + if not job: + return web.json_response({"error": "Job not found"}, status=404) + data = await request.json() + if "enabled" in data: + job.schedule.enabled = data["enabled"] + if "interval_hours" in data: + job.schedule.interval_hours = float(data["interval_hours"]) + if "cron_hour" in data: + job.schedule.cron_hour = data["cron_hour"] + if "cron_minute" in data: + job.schedule.cron_minute = data["cron_minute"] + return web.json_response({"status": "ok", "job_id": job_id}) + + @self.routes.post("/research/automation/scheduler/start") + async def start_scheduler(request): + self.scheduler.start() + return web.json_response({"status": "started"}) + + @self.routes.post("/research/automation/scheduler/stop") + async def stop_scheduler(request): + self.scheduler.stop() + return web.json_response({"status": "stopped"}) + + @self.routes.get("/research/automation/scheduler/status") + async def scheduler_status(request): + jobs = self.scheduler.list_jobs() + return web.json_response({ + "running": True, + "jobs_count": len(jobs), + "jobs": jobs, + }) + + if not self._jobs_registered: + ensure_default_jobs() + self._jobs_registered = True