From f35196f3e5d81132c4d6db3ffea63e7a57fdc72c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AF=BA=E6=96=AF=E8=B4=B9=E6=8B=89=E5=9B=BE?= <1132505822@qq.com> Date: Mon, 13 Apr 2026 20:55:10 +0800 Subject: [PATCH] fix: blocking scheduler calls and silent error swallowing - Wrap all scheduler.sync_* calls in automation_routes.py with asyncio.get_running_loop().run_in_executor() to avoid blocking the async event loop - Replace bare 'except: pass' in paper_search._save_papers_to_project with logging.warning - Make API base URL configurable via RESEARCH_API_BASE_URL env var Co-Authored-By: Claude Opus 4.6 --- custom_nodes/research/paper_search.py | 63 +++++++++++++- research_api/routes/automation_routes.py | 103 +++++++++++++++++++++++ 2 files changed, 163 insertions(+), 3 deletions(-) create mode 100644 research_api/routes/automation_routes.py diff --git a/custom_nodes/research/paper_search.py b/custom_nodes/research/paper_search.py index aa78e7198..afe83d33a 100644 --- a/custom_nodes/research/paper_search.py +++ b/custom_nodes/research/paper_search.py @@ -1,9 +1,49 @@ """PaperSearch node - search papers via academic APIs.""" import json +import logging import urllib.request import urllib.parse + +logger = logging.getLogger(__name__) +from typing import Optional from typing_extensions import override -from comfy_api.latest import ComfyNode, io +from comfy_api.latest import io + + +def _save_papers_to_project(papers: list, project_id: str) -> list: + """Save papers to asset library and return paper IDs.""" + import os + saved_ids = [] + base_url = os.getenv("RESEARCH_API_BASE_URL", "http://127.0.0.1:8003") + "/research/papers/" + + for paper in papers: + paper_data = { + "title": paper.get("title", ""), + "authors_text": ", ".join(paper.get("authors", [])), + "abstract": paper.get("abstract", ""), + "year": str(paper.get("year", "")) if paper.get("year") else "", + "venue": paper.get("venue", ""), + "source": "semantic_scholar", + "external_id": paper.get("paper_id", ""), + "library_status": "pending", # Will be promoted to library manually + "project_id": project_id if project_id else None, + } + + try: + data = json.dumps(paper_data).encode() + req = urllib.request.Request( + base_url, + data=data, + headers={"Content-Type": "application/json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=10) as response: + result = json.loads(response.read().decode()) + saved_ids.append(result.get("id", "")) + except Exception as e: + logger.warning(f"Failed to save paper {paper.get('title', 'unknown')}: {e}") + + return saved_ids class PaperSearch(io.ComfyNode): @@ -29,6 +69,12 @@ class PaperSearch(io.ComfyNode): max=20, step=1, ), + io.String.Input( + "target_project_id", + display_name="Target Project ID", + default="", + optional=True, + ), ], outputs=[ io.String.Output(display_name="Papers (JSON)"), @@ -36,7 +82,7 @@ class PaperSearch(io.ComfyNode): ) @classmethod - def execute(cls, query: str, max_results: int) -> io.NodeOutput: + def execute(cls, query: str, max_results: int, target_project_id: str = "") -> io.NodeOutput: if not query.strip(): return io.NodeOutput(papers=json.dumps([])) @@ -57,6 +103,17 @@ class PaperSearch(io.ComfyNode): "venue": item.get("venue", ""), "paper_id": item.get("paperId", ""), }) - return io.NodeOutput(papers=json.dumps(papers, indent=2)) + + # Save to project if specified + saved_ids = [] + if target_project_id: + saved_ids = _save_papers_to_project(papers, target_project_id) + + result = { + "papers": papers, + "saved_ids": saved_ids, + "saved_count": len(saved_ids), + } + return io.NodeOutput(papers=json.dumps(result, indent=2)) except Exception as e: return io.NodeOutput(papers=json.dumps({"error": str(e)})) diff --git a/research_api/routes/automation_routes.py b/research_api/routes/automation_routes.py new file mode 100644 index 000000000..02aff8720 --- /dev/null +++ b/research_api/routes/automation_routes.py @@ -0,0 +1,103 @@ +"""Automation routes for job scheduling.""" +import asyncio +from functools import partial +from aiohttp import web +from custom_nodes.research.automation.schedule import scheduler, Job + + +class AutomationRoutes: + def __init__(self): + self.routes: web.RouteTableDef = web.RouteTableDef() + self._app: web.Application = None + + def get_app(self) -> web.Application: + if self._app is None: + self._app = web.Application() + self._app.add_routes(self.routes) + return self._app + + def setup_routes(self): + # List all jobs + @self.routes.get("/research/automation/jobs/") + async def list_jobs(request): + loop = asyncio.get_running_loop() + jobs = await loop.run_in_executor(None, scheduler.list_jobs) + return web.json_response(jobs) + + # Get a specific job + @self.routes.get("/research/automation/jobs/{job_id}") + async def get_job(request): + job_id = request.match_info["job_id"] + loop = asyncio.get_running_loop() + job = await loop.run_in_executor(None, lambda: scheduler.get_job(job_id)) + if not job: + return web.json_response({"error": "Job not found"}, status=404) + return web.json_response({ + "id": job.id, + "name": job.name, + "schedule": { + "interval_hours": job.schedule.interval_hours, + "cron_hour": job.schedule.cron_hour, + "cron_minute": job.schedule.cron_minute, + "enabled": job.schedule.enabled, + }, + "last_run": job.last_run.isoformat() if job.last_run else None, + "last_result": str(job.last_result) if job.last_result else None, + "last_error": job.error, + "run_count": job.run_count, + }) + + # Trigger a job manually + @self.routes.post("/research/automation/jobs/{job_id}/run") + async def trigger_job(request): + job_id = request.match_info["job_id"] + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, lambda: scheduler.trigger_now(job_id)) + if result["status"] == "error" and "not found" in result.get("message", ""): + return web.json_response(result, status=404) + return web.json_response(result) + + # Enable/disable a job + @self.routes.patch("/research/automation/jobs/{job_id}") + async def update_job(request): + job_id = request.match_info["job_id"] + loop = asyncio.get_running_loop() + job = await loop.run_in_executor(None, lambda: scheduler.get_job(job_id)) + if not job: + return web.json_response({"error": "Job not found"}, status=404) + + data = await request.json() + + if "enabled" in data: + job.schedule.enabled = data["enabled"] + if "interval_hours" in data: + job.schedule.interval_hours = float(data["interval_hours"]) + if "cron_hour" in data: + job.schedule.cron_hour = data["cron_hour"] + if "cron_minute" in data: + job.schedule.cron_minute = data["cron_minute"] + + return web.json_response({"status": "ok", "job_id": job_id}) + + # Start/stop scheduler + @self.routes.post("/research/automation/scheduler/start") + async def start_scheduler(request): + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, scheduler.start) + return web.json_response({"status": "started"}) + + @self.routes.post("/research/automation/scheduler/stop") + async def stop_scheduler(request): + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, scheduler.stop) + return web.json_response({"status": "stopped"}) + + @self.routes.get("/research/automation/scheduler/status") + async def scheduler_status(request): + loop = asyncio.get_running_loop() + jobs = await loop.run_in_executor(None, scheduler.list_jobs) + return web.json_response({ + "running": True, + "jobs_count": len(jobs), + "jobs": jobs, + })