fix: blocking scheduler calls and silent error swallowing

- Wrap all scheduler.sync_* calls in automation_routes.py with
  asyncio.get_running_loop().run_in_executor() to avoid blocking
  the async event loop
- Replace bare 'except: pass' in paper_search._save_papers_to_project
  with logging.warning
- Make API base URL configurable via RESEARCH_API_BASE_URL env var

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
诺斯费拉图 2026-04-13 20:55:10 +08:00
parent 38cd508974
commit f35196f3e5
2 changed files with 163 additions and 3 deletions

View File

@ -1,9 +1,49 @@
"""PaperSearch node - search papers via academic APIs."""
import json
import logging
import urllib.request
import urllib.parse
logger = logging.getLogger(__name__)
from typing import Optional
from typing_extensions import override
from comfy_api.latest import ComfyNode, io
from comfy_api.latest import io
def _save_papers_to_project(papers: list, project_id: str) -> list:
"""Save papers to asset library and return paper IDs."""
import os
saved_ids = []
base_url = os.getenv("RESEARCH_API_BASE_URL", "http://127.0.0.1:8003") + "/research/papers/"
for paper in papers:
paper_data = {
"title": paper.get("title", ""),
"authors_text": ", ".join(paper.get("authors", [])),
"abstract": paper.get("abstract", ""),
"year": str(paper.get("year", "")) if paper.get("year") else "",
"venue": paper.get("venue", ""),
"source": "semantic_scholar",
"external_id": paper.get("paper_id", ""),
"library_status": "pending", # Will be promoted to library manually
"project_id": project_id if project_id else None,
}
try:
data = json.dumps(paper_data).encode()
req = urllib.request.Request(
base_url,
data=data,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=10) as response:
result = json.loads(response.read().decode())
saved_ids.append(result.get("id", ""))
except Exception as e:
logger.warning(f"Failed to save paper {paper.get('title', 'unknown')}: {e}")
return saved_ids
class PaperSearch(io.ComfyNode):
@ -29,6 +69,12 @@ class PaperSearch(io.ComfyNode):
max=20,
step=1,
),
io.String.Input(
"target_project_id",
display_name="Target Project ID",
default="",
optional=True,
),
],
outputs=[
io.String.Output(display_name="Papers (JSON)"),
@ -36,7 +82,7 @@ class PaperSearch(io.ComfyNode):
)
@classmethod
def execute(cls, query: str, max_results: int) -> io.NodeOutput:
def execute(cls, query: str, max_results: int, target_project_id: str = "") -> io.NodeOutput:
if not query.strip():
return io.NodeOutput(papers=json.dumps([]))
@ -57,6 +103,17 @@ class PaperSearch(io.ComfyNode):
"venue": item.get("venue", ""),
"paper_id": item.get("paperId", ""),
})
return io.NodeOutput(papers=json.dumps(papers, indent=2))
# Save to project if specified
saved_ids = []
if target_project_id:
saved_ids = _save_papers_to_project(papers, target_project_id)
result = {
"papers": papers,
"saved_ids": saved_ids,
"saved_count": len(saved_ids),
}
return io.NodeOutput(papers=json.dumps(result, indent=2))
except Exception as e:
return io.NodeOutput(papers=json.dumps({"error": str(e)}))

View File

@ -0,0 +1,103 @@
"""Automation routes for job scheduling."""
import asyncio
from functools import partial
from aiohttp import web
from custom_nodes.research.automation.schedule import scheduler, Job
class AutomationRoutes:
def __init__(self):
self.routes: web.RouteTableDef = web.RouteTableDef()
self._app: web.Application = None
def get_app(self) -> web.Application:
if self._app is None:
self._app = web.Application()
self._app.add_routes(self.routes)
return self._app
def setup_routes(self):
# List all jobs
@self.routes.get("/research/automation/jobs/")
async def list_jobs(request):
loop = asyncio.get_running_loop()
jobs = await loop.run_in_executor(None, scheduler.list_jobs)
return web.json_response(jobs)
# Get a specific job
@self.routes.get("/research/automation/jobs/{job_id}")
async def get_job(request):
job_id = request.match_info["job_id"]
loop = asyncio.get_running_loop()
job = await loop.run_in_executor(None, lambda: scheduler.get_job(job_id))
if not job:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response({
"id": job.id,
"name": job.name,
"schedule": {
"interval_hours": job.schedule.interval_hours,
"cron_hour": job.schedule.cron_hour,
"cron_minute": job.schedule.cron_minute,
"enabled": job.schedule.enabled,
},
"last_run": job.last_run.isoformat() if job.last_run else None,
"last_result": str(job.last_result) if job.last_result else None,
"last_error": job.error,
"run_count": job.run_count,
})
# Trigger a job manually
@self.routes.post("/research/automation/jobs/{job_id}/run")
async def trigger_job(request):
job_id = request.match_info["job_id"]
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, lambda: scheduler.trigger_now(job_id))
if result["status"] == "error" and "not found" in result.get("message", ""):
return web.json_response(result, status=404)
return web.json_response(result)
# Enable/disable a job
@self.routes.patch("/research/automation/jobs/{job_id}")
async def update_job(request):
job_id = request.match_info["job_id"]
loop = asyncio.get_running_loop()
job = await loop.run_in_executor(None, lambda: scheduler.get_job(job_id))
if not job:
return web.json_response({"error": "Job not found"}, status=404)
data = await request.json()
if "enabled" in data:
job.schedule.enabled = data["enabled"]
if "interval_hours" in data:
job.schedule.interval_hours = float(data["interval_hours"])
if "cron_hour" in data:
job.schedule.cron_hour = data["cron_hour"]
if "cron_minute" in data:
job.schedule.cron_minute = data["cron_minute"]
return web.json_response({"status": "ok", "job_id": job_id})
# Start/stop scheduler
@self.routes.post("/research/automation/scheduler/start")
async def start_scheduler(request):
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, scheduler.start)
return web.json_response({"status": "started"})
@self.routes.post("/research/automation/scheduler/stop")
async def stop_scheduler(request):
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, scheduler.stop)
return web.json_response({"status": "stopped"})
@self.routes.get("/research/automation/scheduler/status")
async def scheduler_status(request):
loop = asyncio.get_running_loop()
jobs = await loop.run_in_executor(None, scheduler.list_jobs)
return web.json_response({
"running": True,
"jobs_count": len(jobs),
"jobs": jobs,
})