diff --git a/README.md b/README.md index 348ef8a..81ee52c 100644 --- a/README.md +++ b/README.md @@ -159,7 +159,7 @@ cd LyraNote ./lyra local # start local dev mode ``` -The CLI automatically: detects/starts database containers → creates a Python venv → installs dependencies → runs DB migrations → starts FastAPI, Celery Worker, and Next.js Dev Server in parallel. +The CLI automatically: detects/starts database containers → creates a Python venv → installs dependencies → runs DB migrations → starts FastAPI, Celery Worker, Celery Beat, and Next.js Dev Server in parallel. Press `Ctrl+C` to stop local processes; database containers are unaffected. @@ -167,7 +167,7 @@ Press `Ctrl+C` to stop local processes; database containers are unaffected. ### Option 2 — Docker Compose (All-in-one) -Runs everything — frontend, backend, worker, and all infrastructure — in containers. Good for a quick full-stack preview or self-hosted server deployment. +Runs everything — frontend, backend, worker, beat, and all infrastructure — in containers. Good for a quick full-stack preview or self-hosted server deployment. **1. Configure environment variables** @@ -218,7 +218,7 @@ Deploy the backend via Docker Compose on your server, and the frontend separatel ```bash ./lyra init # generates root .env (choose "production server" mode) -docker compose -f docker-compose.prod.yml up -d db redis minio minio-init api worker +docker compose -f docker-compose.prod.yml up -d db redis minio minio-init api worker beat ``` Open host Nginx ports `80`/`443` in your firewall. Do not expose `3000`/`8000` publicly; they are loopback-only. diff --git a/README.zh-CN.md b/README.zh-CN.md index 39be4f6..38e97c5 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -159,7 +159,7 @@ cd LyraNote ./lyra local # 启动本地开发模式 ``` -CLI 会自动:检测/启动数据库容器 → 创建 Python 虚拟环境 → 安装依赖 → 执行数据库迁移 → 并行启动 FastAPI、Celery Worker 和 Next.js Dev Server。 +CLI 会自动:检测/启动数据库容器 → 创建 Python 虚拟环境 → 安装依赖 → 执行数据库迁移 → 并行启动 FastAPI、Celery Worker、Celery Beat 和 Next.js Dev Server。 按 `Ctrl+C` 停止本地进程,数据库容器不受影响。 @@ -218,7 +218,7 @@ CLI 会自动:检测/启动数据库容器 → 创建 Python 虚拟环境 → ```bash ./lyra init # 生成根目录 .env(选择「生产服务器」模式) -docker compose -f docker-compose.prod.yml up -d db redis minio minio-init api worker +docker compose -f docker-compose.prod.yml up -d db redis minio minio-init api worker beat ``` 确保防火墙开放宿主机 Nginx 的 `80`/`443`;业务端口仅监听回环,勿对公网放行 3000/8000。 diff --git a/apps/api/app/agents/rag/retrieval.py b/apps/api/app/agents/rag/retrieval.py index 948e128..12e3b3d 100644 --- a/apps/api/app/agents/rag/retrieval.py +++ b/apps/api/app/agents/rag/retrieval.py @@ -94,6 +94,12 @@ def _mmr_filter(chunks: list[dict], threshold: float = MMR_SIMILARITY_THRESHOLD) # DB search functions # --------------------------------------------------------------------------- +def _uuid_or_none(value: UUID | str | None) -> UUID | None: + if value is None: + return None + return value if isinstance(value, UUID) else UUID(str(value)) + + async def _vector_search( query_vec: list[float], notebook_id: str | None, @@ -101,6 +107,9 @@ async def _vector_search( top_k: int, global_search: bool, user_id: UUID | None, + *, + exclude_notebook_id: UUID | None = None, + source_id: UUID | None = None, ) -> list[dict]: """Vector-only search. Returns top candidates with updated_at for recency scoring.""" candidate_k = min(200, max(RERANK_CANDIDATE_K * 2, top_k * 4)) @@ -133,6 +142,11 @@ async def _vector_search( Chunk.notebook_id == UUID(notebook_id) if notebook_id else text("1=1") ) + if exclude_notebook_id is not None: + stmt = stmt.where(Chunk.notebook_id != exclude_notebook_id) + if source_id is not None: + stmt = stmt.where(Chunk.source_id == source_id) + result = await db.execute(stmt) return [ { @@ -160,6 +174,9 @@ async def _fts_search( top_k: int, global_search: bool, user_id: UUID | None, + *, + exclude_notebook_id: UUID | None = None, + source_id: UUID | None = None, ) -> list[dict]: """ PostgreSQL FTS search via plainto_tsquery (handles Chinese via 'simple' config). @@ -192,6 +209,11 @@ async def _fts_search( Chunk.notebook_id == UUID(notebook_id) if notebook_id else text("1=1") ) + if exclude_notebook_id is not None: + stmt = stmt.where(Chunk.notebook_id != exclude_notebook_id) + if source_id is not None: + stmt = stmt.where(Chunk.source_id == source_id) + result = await db.execute(stmt) rows = result.all() if not rows: @@ -412,6 +434,8 @@ async def retrieve_chunks( user_id: UUID | None = None, history: list[dict] | None = None, _precomputed_variants: list[str] | None = None, + exclude_notebook_id: UUID | str | None = None, + source_id: UUID | str | None = None, ) -> list[dict]: """ Full RAG retrieval pipeline: @@ -427,6 +451,8 @@ async def retrieve_chunks( - global_search=False: restrict to chunks in `notebook_id` - global_search=True: search across ALL notebooks owned by `user_id` + - exclude_notebook_id: with global_search, drop chunks in this notebook + - source_id: only chunks from this source """ import asyncio @@ -439,6 +465,8 @@ async def retrieve_chunks( top_k=top_k, global_search=global_search, user_id=user_id, + exclude_notebook_id=_uuid_or_none(exclude_notebook_id), + source_id=_uuid_or_none(source_id), ) # ── Step 1 & 2: Variant generation + primary embed IN PARALLEL ─────── diff --git a/apps/api/app/agents/writing/composer.py b/apps/api/app/agents/writing/composer.py index e95af7f..1a7a252 100644 --- a/apps/api/app/agents/writing/composer.py +++ b/apps/api/app/agents/writing/composer.py @@ -338,11 +338,20 @@ async def compose_answer( user_memories: list[dict] | None = None, notebook_summary: dict | None = None, db: "AsyncSession | None" = None, + *, + extra_graph_context: str | None = None, ) -> tuple[str, list[dict]]: """Non-streaming: return (answer_text, citations).""" from app.providers.llm import chat context, citations = _build_context(chunks) + eg = (extra_graph_context or "").strip() + if eg: + context = ( + f"## 结构化知识关联(图谱)\n{eg}\n\n---\n\n{context}" + if context + else f"## 结构化知识关联(图谱)\n{eg}" + ) messages = await _build_messages(query, context, history, user_memories, notebook_summary, db) answer = await chat(messages) return answer, citations diff --git a/apps/api/app/domains/ai/routers/knowledge.py b/apps/api/app/domains/ai/routers/knowledge.py index 9855be6..61a1a86 100644 --- a/apps/api/app/domains/ai/routers/knowledge.py +++ b/apps/api/app/domains/ai/routers/knowledge.py @@ -2,12 +2,12 @@ from uuid import UUID -from fastapi import APIRouter +from fastapi import APIRouter, HTTPException from sqlalchemy import select from app.dependencies import CurrentUser, DbDep from app.domains.ai.schemas import CrossNotebookChunk, CrossNotebookOut -from app.models import Chunk, Notebook, Source, NotebookSummary +from app.models import Chunk, Notebook, NotebookSummary from app.schemas.response import ApiResponse, success router = APIRouter() @@ -18,8 +18,17 @@ response_model=ApiResponse[CrossNotebookOut], ) async def get_related_knowledge(notebook_id: UUID, current_user: CurrentUser, db: DbDep): - """Find related content in other notebooks based on the current notebook's summary.""" - from app.providers.embedding import embed_query + """Find related content in other notebooks based on the current notebook's hybrid RAG search.""" + from app.agents.rag.retrieval import retrieve_chunks + + own = await db.execute( + select(Notebook.id).where( + Notebook.id == notebook_id, + Notebook.user_id == current_user.id, + ) + ) + if own.scalar_one_or_none() is None: + raise HTTPException(status_code=404, detail="Notebook not found") summary_result = await db.execute( select(NotebookSummary.summary_md).where(NotebookSummary.notebook_id == notebook_id) @@ -28,43 +37,42 @@ async def get_related_knowledge(notebook_id: UUID, current_user: CurrentUser, db if not summary_md or len(summary_md.strip()) < 20: return success(CrossNotebookOut(chunks=[])) - query_vec = await embed_query(summary_md[:300]) + q = summary_md[:500].strip() + raw = await retrieve_chunks( + q, + None, + db, + top_k=5, + global_search=True, + user_id=current_user.id, + exclude_notebook_id=notebook_id, + _precomputed_variants=[q], + ) + if not raw: + return success(CrossNotebookOut(chunks=[])) - stmt = ( - select( - Chunk.id, - Chunk.content, - Chunk.source_id, - Chunk.notebook_id, - Source.title.label("source_title"), - Notebook.title.label("notebook_title"), - (1 - Chunk.embedding.cosine_distance(query_vec)).label("score"), - ) - .outerjoin(Source, Chunk.source_id == Source.id) + ids = [UUID(c["chunk_id"]) for c in raw] + meta_result = await db.execute( + select(Chunk.id, Chunk.notebook_id, Notebook.title.label("notebook_title")) .join(Notebook, Chunk.notebook_id == Notebook.id) - .where( - Notebook.user_id == current_user.id, - Chunk.notebook_id != notebook_id, - ((Source.status == "indexed") | (Chunk.source_type == "note")), - ) - .order_by(Chunk.embedding.cosine_distance(query_vec)) - .limit(10) + .where(Chunk.id.in_(ids)) ) - - result = await db.execute(stmt) - rows = result.all() + meta = { + str(r.id): (str(r.notebook_id), r.notebook_title or "未命名笔记本") + for r in meta_result.all() + } chunks = [ CrossNotebookChunk( - notebook_title=row.notebook_title or "未命名笔记本", - source_title=row.source_title or "📝 笔记", - excerpt=row.content[:300], - score=round(float(row.score), 3), - chunk_id=str(row.id), - notebook_id=str(row.notebook_id), + notebook_title=meta.get(c["chunk_id"], (None, "未命名笔记本"))[1], + source_title=c.get("source_title") or "📝 笔记", + excerpt=(c.get("excerpt") or c.get("content") or "")[:300], + score=round(float(c.get("score") or 0), 3), + chunk_id=str(c.get("chunk_id", "")), + notebook_id=meta.get(c["chunk_id"], ("",))[0] or "", ) - for row in rows - if float(row.score) >= 0.35 - ][:5] + for c in raw + if c.get("chunk_id") in meta + ] return success(CrossNotebookOut(chunks=chunks)) diff --git a/apps/api/app/domains/ai/routers/suggestions.py b/apps/api/app/domains/ai/routers/suggestions.py index 5b17e34..d92a207 100644 --- a/apps/api/app/domains/ai/routers/suggestions.py +++ b/apps/api/app/domains/ai/routers/suggestions.py @@ -164,7 +164,11 @@ async def get_context_greeting(notebook_id: UUID, current_user: CurrentUser, db: @router.get("/sources/{source_id}/suggestions", response_model=ApiResponse[SourceSuggestionsOut]) async def get_source_suggestions(source_id: UUID, current_user: CurrentUser, db: DbDep): """Generate suggested questions for a newly indexed source.""" - source_result = await db.execute(select(Source).where(Source.id == source_id)) + source_result = await db.execute( + select(Source) + .join(Notebook, Source.notebook_id == Notebook.id) + .where(Source.id == source_id, Notebook.user_id == current_user.id) + ) source = source_result.scalar_one_or_none() if source is None or source.status != "indexed": return success(SourceSuggestionsOut(summary=None, questions=[])) @@ -175,13 +179,28 @@ async def get_source_suggestions(source_id: UUID, current_user: CurrentUser, db: questions=source.metadata_["suggestions"], )) - chunks_result = await db.execute( - select(Chunk.content) - .where(Chunk.source_id == source_id) - .order_by(Chunk.chunk_index) - .limit(3) + from app.agents.rag.retrieval import retrieve_chunks + + q = f"{source.title or ''}\n{source.summary or ''}".strip()[:800] or "资料要点" + chunk_dicts = await retrieve_chunks( + q, + str(source.notebook_id), + db, + top_k=5, + user_id=current_user.id, + source_id=source.id, + _precomputed_variants=[q], ) - context = "\n".join(row[0][:500] for row in chunks_result.all()) + if chunk_dicts: + context = "\n".join((c.get("content") or "")[:500] for c in chunk_dicts) + else: + chunks_result = await db.execute( + select(Chunk.content) + .where(Chunk.source_id == source_id) + .order_by(Chunk.chunk_index) + .limit(3) + ) + context = "\n".join(row[0][:500] for row in chunks_result.all()) client = get_utility_client() try: diff --git a/apps/api/app/domains/ai/routers/writing.py b/apps/api/app/domains/ai/routers/writing.py index f1f6f29..7754945 100644 --- a/apps/api/app/domains/ai/routers/writing.py +++ b/apps/api/app/domains/ai/routers/writing.py @@ -3,7 +3,7 @@ import json from uuid import UUID -from fastapi import APIRouter +from fastapi import APIRouter, HTTPException from fastapi.responses import StreamingResponse from sqlalchemy import select @@ -14,9 +14,9 @@ WritingContextOut, WritingContextRequest, ) -from app.models import Chunk, Source +from app.models import Notebook from app.schemas.response import ApiResponse, success -from app.providers.llm import get_client, get_utility_model, get_utility_client +from app.providers.llm import get_utility_model, get_utility_client router = APIRouter() @@ -64,43 +64,39 @@ async def generate(): @router.post("/ai/writing-context", response_model=ApiResponse[WritingContextOut]) async def get_writing_context(body: WritingContextRequest, current_user: CurrentUser, db: DbDep): """Return top-3 related knowledge chunks based on what the user is currently writing.""" - from app.providers.embedding import embed_query + from app.agents.rag.retrieval import retrieve_chunks text = body.text_around_cursor[:500] if len(text.strip()) < 20: return success(WritingContextOut(chunks=[])) - query_vec = await embed_query(text) - - stmt = ( - select( - Chunk.id, - Chunk.content, - Chunk.source_id, - Source.title.label("source_title"), - (1 - Chunk.embedding.cosine_distance(query_vec)).label("score"), - ) - .join(Source, Chunk.source_id == Source.id) - .where( - Source.status == "indexed", - Chunk.notebook_id == UUID(body.notebook_id), + nb_row = await db.execute( + select(Notebook.id).where( + Notebook.id == UUID(body.notebook_id), + Notebook.user_id == current_user.id, ) - .order_by(Chunk.embedding.cosine_distance(query_vec)) - .limit(12) ) - - result = await db.execute(stmt) - rows = result.all() + if nb_row.scalar_one_or_none() is None: + raise HTTPException(status_code=404, detail="Notebook not found") + + q = text.strip() + rows = await retrieve_chunks( + q, + body.notebook_id, + db, + top_k=3, + user_id=current_user.id, + _precomputed_variants=[q], + ) chunks = [ WritingContextChunk( - source_title=row.source_title or "未知来源", - excerpt=row.content[:300], - score=round(float(row.score), 3), - chunk_id=str(row.id), + source_title=r.get("source_title") or "未知来源", + excerpt=(r.get("excerpt") or r.get("content") or "")[:300], + score=round(float(r.get("score") or 0), 3), + chunk_id=str(r.get("chunk_id", "")), ) - for row in rows - if float(row.score) >= 0.35 - ][:3] + for r in rows + ] return success(WritingContextOut(chunks=chunks)) diff --git a/apps/api/app/domains/config/router.py b/apps/api/app/domains/config/router.py index fafcdb1..fb5e4f6 100644 --- a/apps/api/app/domains/config/router.py +++ b/apps/api/app/domains/config/router.py @@ -173,7 +173,7 @@ async def test_email(_current_user: CurrentUser, db: DbDep):

— LyraNote

""" - sent = await send_email( + result = await send_email( to=to, subject="LyraNote 测试邮件", html_body=html, @@ -181,8 +181,10 @@ async def test_email(_current_user: CurrentUser, db: DbDep): smtp_config=cfg, ) - if sent: + if result.ok: return success(TestEmailResult(ok=True, message=f"测试邮件已发送至 {to}")) + if result.error: + return success(TestEmailResult(ok=False, message=f"发送失败:{result.error}")) return success(TestEmailResult(ok=False, message="发送失败,请检查 SMTP 配置")) @@ -383,4 +385,4 @@ async def test_reranker_connection(_current_user: CurrentUser, db: DbDep): return success(TestRerankerResult(ok=True, model=model, message=f"Score {score}")) return success(TestRerankerResult(ok=True, model=model, message="OK")) except Exception as exc: - return success(TestRerankerResult(ok=False, model=model, message=str(exc)[:200])) \ No newline at end of file + return success(TestRerankerResult(ok=False, model=model, message=str(exc)[:200])) diff --git a/apps/api/app/logging_config.py b/apps/api/app/logging_config.py index 0b7e08b..40065af 100644 --- a/apps/api/app/logging_config.py +++ b/apps/api/app/logging_config.py @@ -141,8 +141,24 @@ def format(self, record: logging.LogRecord) -> str: return f"{self.DIM}{ts}{self.RESET} {color}{msg}{self.RESET}" -def _make_file_handler(logs_dir: Path, use_json: bool = False) -> TimedRotatingFileHandler: - """Create a daily file handler: each day gets its own YYYY-MM-DD.log file.""" +class LoggerPrefixFilter(logging.Filter): + """Allow only records emitted from specific logger prefixes.""" + + def __init__(self, *prefixes: str): + super().__init__() + self._prefixes = tuple(prefixes) + + def filter(self, record: logging.LogRecord) -> bool: + name = record.name + return any(name == prefix or name.startswith(f"{prefix}.") for prefix in self._prefixes) + + +def _make_file_handler( + logs_dir: Path, + use_json: bool = False, + filename_prefix: str = "", +) -> TimedRotatingFileHandler: + """Create a daily file handler: each day gets its own dated log file.""" import os import time @@ -154,9 +170,10 @@ class _DailyHandler(TimedRotatingFileHandler): def __init__(self, directory: Path): self._logs_dir = directory + self._filename_prefix = filename_prefix today = datetime.now().strftime("%Y-%m-%d") super().__init__( - filename=str(directory / f"{today}.log"), + filename=str(directory / f"{self._filename_prefix}{today}.log"), when="midnight", backupCount=30, encoding="utf-8", @@ -169,7 +186,9 @@ def doRollover(self) -> None: self.stream.close() self.stream = None # type: ignore[assignment] today = datetime.now().strftime("%Y-%m-%d") - self.baseFilename = os.path.abspath(str(self._logs_dir / f"{today}.log")) + self.baseFilename = os.path.abspath( + str(self._logs_dir / f"{self._filename_prefix}{today}.log") + ) self.stream = self._open() self.rolloverAt = self.computeRollover(int(time.time())) @@ -178,7 +197,7 @@ def doRollover(self) -> None: return handler -def setup_logging(debug: bool = False) -> None: +def setup_logging(debug: bool = False, logs_dir: Path | None = None) -> None: """Configure logging for the entire application.""" root = logging.getLogger() root.setLevel(logging.DEBUG if debug else logging.INFO) @@ -193,11 +212,21 @@ def setup_logging(debug: bool = False) -> None: # Daily rotating file handler — apps/api/logs/YYYY-MM-DD.log (WARNING+ only) # Production: JSON structured format; development: plain text - logs_dir = Path(__file__).resolve().parent.parent / "logs" + logs_dir = logs_dir or (Path(__file__).resolve().parent.parent / "logs") file_handler = _make_file_handler(logs_dir, use_json=not debug) file_handler.setLevel(logging.WARNING) root.addHandler(file_handler) + # Background task file handler — captures task lifecycle INFO logs separately. + task_file_handler = _make_file_handler( + logs_dir, + use_json=not debug, + filename_prefix="scheduled-tasks-", + ) + task_file_handler.setLevel(logging.INFO) + task_file_handler.addFilter(LoggerPrefixFilter("app.workers.tasks")) + root.addHandler(task_file_handler) + # Suppress noisy loggers for name in [ "sqlalchemy.engine", diff --git a/apps/api/app/providers/email.py b/apps/api/app/providers/email.py index 480c536..cdf6927 100644 --- a/apps/api/app/providers/email.py +++ b/apps/api/app/providers/email.py @@ -2,6 +2,7 @@ Email sending provider using SMTP configuration from app_config. """ +from dataclasses import dataclass import logging import ssl from email.mime.multipart import MIMEMultipart @@ -16,6 +17,20 @@ logger = logging.getLogger(__name__) +@dataclass(slots=True) +class EmailSendResult: + ok: bool + error: str | None = None + + +def _format_email_exception(exc: Exception) -> str: + """Return a stable, user-visible SMTP error message.""" + message = str(exc).strip() + if message: + return message + return exc.__class__.__name__ + + async def _get_smtp_config(db: AsyncSession) -> dict: keys = ["smtp_host", "smtp_port", "smtp_username", "smtp_password", "smtp_from"] result = await db.execute( @@ -31,7 +46,7 @@ async def send_email( text_body: str = "", db: AsyncSession | None = None, smtp_config: dict | None = None, -) -> bool: +) -> EmailSendResult: config = smtp_config or (await _get_smtp_config(db) if db else {}) host = config.get("smtp_host", "") @@ -43,14 +58,16 @@ async def send_email( from_addr = (config.get("smtp_from") or username or "").strip() if not host or not username: - logger.error("SMTP not configured: missing host or username") - return False + error = "SMTP not configured: missing host or username" + logger.error(error) + return EmailSendResult(ok=False, error=error) if not from_addr: - logger.error( + error = ( "SMTP not configured: empty smtp_from and username; set From address or SMTP user" ) - return False + logger.error(error) + return EmailSendResult(ok=False, error=error) msg = MIMEMultipart("alternative") msg["Subject"] = subject @@ -81,7 +98,7 @@ async def send_email( await smtp.send_message(msg) await smtp.quit() logger.info("Email sent to %s: %s", to, subject) - return True + return EmailSendResult(ok=True) except Exception as exc: logger.error("Failed to send email to %s: %s", to, exc) - return False + return EmailSendResult(ok=False, error=_format_email_exception(exc)) diff --git a/apps/api/app/services/conversation_service.py b/apps/api/app/services/conversation_service.py index 8bc3253..dbc1a78 100644 --- a/apps/api/app/services/conversation_service.py +++ b/apps/api/app/services/conversation_service.py @@ -167,17 +167,40 @@ async def send_message(self, conversation_id: UUID, content: str) -> Message: history = await self._load_history(conversation_id) + from app.agents.rag.graph_retrieval import graph_augmented_context from app.agents.rag.retrieval import retrieve_chunks from app.agents.writing.composer import compose_answer from app.agents.memory import get_user_memories, get_notebook_summary user_memories = await get_user_memories(self.user_id, self.db) notebook_summary = await get_notebook_summary(conv.notebook_id, self.db) - chunks = await retrieve_chunks(content, str(conv.notebook_id), self.db) + if conv.notebook_id: + chunks, graph_ctx = await asyncio.gather( + retrieve_chunks( + content, + str(conv.notebook_id), + self.db, + user_id=self.user_id, + ), + graph_augmented_context(content, str(conv.notebook_id), self.db), + ) + else: + chunks = await retrieve_chunks( + content, + None, + self.db, + global_search=True, + user_id=self.user_id, + ) + graph_ctx = "" answer, citations = await compose_answer( - content, chunks, history, + content, + chunks, + history, user_memories=user_memories, notebook_summary=notebook_summary, + db=self.db, + extra_graph_context=graph_ctx or None, ) assistant_msg = Message( diff --git a/apps/api/app/skills/builtin/summarize.py b/apps/api/app/skills/builtin/summarize.py index f29a3b2..d4c8420 100644 --- a/apps/api/app/skills/builtin/summarize.py +++ b/apps/api/app/skills/builtin/summarize.py @@ -71,7 +71,7 @@ async def execute(self, args: dict, ctx) -> str: async def _fetch_chunks_direct(ctx) -> list[dict]: """Direct DB fetch without similarity filter — used as a fallback.""" from uuid import UUID - from sqlalchemy import func, select + from sqlalchemy import select from app.models import Chunk, Source try: @@ -89,7 +89,7 @@ async def _fetch_chunks_direct(ctx) -> list[dict]: Chunk.notebook_id == UUID(ctx.notebook_id), (Source.status == "indexed") | (Chunk.source_type == "note"), ) - .order_by(func.random()) + .order_by(Chunk.created_at.desc()) .limit(10) ) rows = result.all() diff --git a/apps/api/app/workers/tasks/scheduler.py b/apps/api/app/workers/tasks/scheduler.py index 6ad28a4..0b94d74 100644 --- a/apps/api/app/workers/tasks/scheduler.py +++ b/apps/api/app/workers/tasks/scheduler.py @@ -14,6 +14,36 @@ from app.workers._helpers import _run_async, _task_db +def summarize_delivery_outcome(delivery_status: dict[str, object]) -> tuple[str | None, str | None]: + """Build user-facing delivery summary and error message from per-channel results.""" + summary_parts: list[str] = [] + issue_parts: list[str] = [] + + email_status = delivery_status.get("email") + email_error = delivery_status.get("email_error") + note_status = delivery_status.get("note") + + if email_status == "sent": + summary_parts.append("邮件已发送") + elif email_status == "failed": + summary_parts.append("邮件发送失败") + detail = str(email_error).strip() if email_error else "请检查 SMTP 配置" + issue_parts.append(f"邮件投递失败:{detail}") + elif email_status == "skipped_no_address": + summary_parts.append("缺少收件邮箱") + issue_parts.append("邮件投递失败:未配置收件邮箱") + + if note_status == "created": + summary_parts.append("已写入笔记") + elif note_status == "skipped_no_notebook": + summary_parts.append("未找到笔记本") + issue_parts.append("笔记投递失败:未找到可写入的系统笔记本") + + summary = ",".join(summary_parts) if summary_parts else None + issues = ";".join(issue_parts) if issue_parts else None + return summary, issues + + async def _fetch_rss_feeds(feed_urls: list[str], max_items: int = 10) -> list[dict]: """Fetch and parse RSS/Atom feeds, returning items in the same format as web search results.""" import logging @@ -199,11 +229,13 @@ async def _run(): from app.utils.markdown_email import markdown_to_email_html html = markdown_to_email_html(article_md, article_title) - sent = await send_email( + email_result = await send_email( to=email_to, subject=article_title, html_body=html, text_body=article_md, db=db, ) - delivery_status["email"] = "sent" if sent else "failed" + delivery_status["email"] = "sent" if email_result.ok else "failed" + if email_result.error: + delivery_status["email_error"] = email_result.error else: delivery_status["email"] = "skipped_no_address" @@ -238,18 +270,24 @@ async def _run(): delivery_status["note"] = "skipped_no_notebook" elapsed_ms = int((time.monotonic() - start_time) * 1000) + delivery_summary, delivery_issues = summarize_delivery_outcome(delivery_status) + result_summary = f"生成 {len(article_md)} 字文章,{len(all_sources)} 个来源" + if delivery_summary: + result_summary = f"{result_summary},{delivery_summary}" + run.status = "success" run.finished_at = datetime.now(timezone.utc) run.duration_ms = elapsed_ms - run.result_summary = f"生成 {len(article_md)} 字文章,{len(all_sources)} 个来源" + run.result_summary = result_summary + run.error_message = delivery_issues run.generated_content = article_md run.sources_count = len(all_sources) run.delivery_status = delivery_status task.run_count += 1 task.last_run_at = datetime.now(timezone.utc) - task.last_result = run.result_summary - task.last_error = None + task.last_result = result_summary + task.last_error = delivery_issues task.consecutive_failures = 0 try: @@ -265,7 +303,14 @@ async def _run(): pass await db.commit() - logger.info("Scheduled task %s executed successfully", task.name) + if delivery_issues: + logger.warning( + "Scheduled task %s completed with delivery issues: %s", + task.name, + delivery_issues, + ) + else: + logger.info("Scheduled task %s executed successfully", task.name) except Exception as exc: elapsed_ms = int((time.monotonic() - start_time) * 1000) diff --git a/apps/api/tests/integration/test_conversation_service.py b/apps/api/tests/integration/test_conversation_service.py index b5b1126..f843d2b 100644 --- a/apps/api/tests/integration/test_conversation_service.py +++ b/apps/api/tests/integration/test_conversation_service.py @@ -224,3 +224,84 @@ async def test_messages_isolated_between_conversations(self, db_session): msgs2 = await svc.list_messages(conv2.id) assert msgs2 == [] + + +# ── Non-streaming send_message (RAG + graph) ───────────────────────────────── + +@pytest.mark.skipif( + __import__("os").environ.get("DATABASE_URL", "sqlite").startswith("sqlite"), + reason="SQLite test DB cannot compile JSONB schema; set DATABASE_URL to PostgreSQL", +) +class TestConversationSendMessageRag: + """send Message wires retrieve_chunks, graph_augmented_context, and compose_answer.""" + + @pytest.mark.asyncio + async def test_send_message_passes_graph_context_to_compose(self, db_session, test_user, monkeypatch): + user, _ = test_user + nb = await _create_notebook(db_session, user.id) + svc = ConversationService(db_session, user.id) + conv = await svc.create(nb.id, title="RAG test") + + compose_kw: dict = {} + + async def fake_retrieve(*_a, **_k): + return [ + { + "chunk_id": "c1", + "source_id": "s1", + "source_title": "T", + "excerpt": "ex", + "content": "body", + "score": 0.5, + } + ] + + async def fake_graph(*_a, **_k): + return "GRAPHCTX" + + async def fake_compose(_q, _chunks, _hist, **kwargs): + compose_kw.update(kwargs) + return "composed", [] + + monkeypatch.setattr("app.agents.rag.retrieval.retrieve_chunks", fake_retrieve) + monkeypatch.setattr( + "app.agents.rag.graph_retrieval.graph_augmented_context", + fake_graph, + ) + monkeypatch.setattr("app.agents.writing.composer.compose_answer", fake_compose) + + msg = await svc.send_message(conv.id, "hello") + + assert msg.content == "composed" + assert compose_kw.get("extra_graph_context") == "GRAPHCTX" + + @pytest.mark.asyncio + async def test_send_message_global_conversation_skips_graph(self, db_session, test_user, monkeypatch): + user, _ = test_user + svc = ConversationService(db_session, user.id) + conv = await svc.create(None, title="global") + + compose_kw: dict = {} + + async def fake_retrieve(*_a, **k): + compose_kw["retrieve_kw"] = k + return [] + + async def fake_graph(*_a, **_k): + raise AssertionError("graph should not run for global conversation") + + async def fake_compose(_q, _chunks, _hist, **kwargs): + compose_kw.update(kwargs) + return "ok", [] + + monkeypatch.setattr("app.agents.rag.retrieval.retrieve_chunks", fake_retrieve) + monkeypatch.setattr( + "app.agents.rag.graph_retrieval.graph_augmented_context", + fake_graph, + ) + monkeypatch.setattr("app.agents.writing.composer.compose_answer", fake_compose) + + await svc.send_message(conv.id, "hi") + + assert compose_kw["retrieve_kw"].get("global_search") is True + assert compose_kw.get("extra_graph_context") is None diff --git a/apps/api/tests/unit/test_compose_answer_extra_graph.py b/apps/api/tests/unit/test_compose_answer_extra_graph.py new file mode 100644 index 0000000..11dd1ee --- /dev/null +++ b/apps/api/tests/unit/test_compose_answer_extra_graph.py @@ -0,0 +1,43 @@ +"""compose_answer: extra_graph_context is merged into the RAG context message.""" + +from __future__ import annotations + +import pytest + +from app.agents.writing.composer import compose_answer + + +@pytest.mark.asyncio +async def test_compose_answer_prepends_graph_section(monkeypatch): + captured: dict = {} + + async def fake_chat(messages): + captured["messages"] = messages + return "assistant reply" + + monkeypatch.setattr("app.providers.llm.chat", fake_chat) + + chunks = [ + { + "chunk_id": "c1", + "source_id": "s1", + "source_title": "Doc", + "excerpt": "ex", + "content": "chunk body", + "score": 0.9, + } + ] + + await compose_answer( + "user question", + chunks, + [], + extra_graph_context="Entity A --rel--> Entity B", + ) + + ref_msg = next( + m for m in captured["messages"] if m.get("role") == "user" and "参考资料" in m["content"] + ) + assert "结构化知识关联(图谱)" in ref_msg["content"] + assert "Entity A --rel--> Entity B" in ref_msg["content"] + assert "chunk body" in ref_msg["content"] diff --git a/apps/api/tests/unit/test_config_test_email.py b/apps/api/tests/unit/test_config_test_email.py index 3686b0d..737af42 100644 --- a/apps/api/tests/unit/test_config_test_email.py +++ b/apps/api/tests/unit/test_config_test_email.py @@ -20,6 +20,7 @@ # Alias: importing as `test_email` is picked up by pytest as a test function. from app.domains.config.router import test_email as config_test_email_handler +from app.providers.email import EmailSendResult class _CfgRow: @@ -67,7 +68,10 @@ async def test_ok_when_config_complete_and_send_succeeds(self): ] db = _ConfigFakeSession(rows) user = MagicMock() - with patch("app.providers.email.send_email", AsyncMock(return_value=True)) as send: + with patch( + "app.providers.email.send_email", + AsyncMock(return_value=EmailSendResult(ok=True)), + ) as send: resp = await config_test_email_handler(user, db) # type: ignore[arg-type] assert resp.code == 0 assert resp.data is not None @@ -108,7 +112,11 @@ async def test_fail_response_when_send_returns_false(self): _CfgRow("smtp_username", "u@example.com"), _CfgRow("smtp_password", "pw"), ] - with patch("app.providers.email.send_email", AsyncMock(return_value=False)): + with patch( + "app.providers.email.send_email", + AsyncMock(return_value=EmailSendResult(ok=False, error="auth failed")), + ): resp = await config_test_email_handler(MagicMock(), _ConfigFakeSession(rows)) # type: ignore[arg-type] assert resp.data is not None assert resp.data.ok is False + assert "auth failed" in resp.data.message diff --git a/apps/api/tests/unit/test_email_provider.py b/apps/api/tests/unit/test_email_provider.py index 94fdf7a..fba7f73 100644 --- a/apps/api/tests/unit/test_email_provider.py +++ b/apps/api/tests/unit/test_email_provider.py @@ -19,7 +19,7 @@ os.environ.setdefault("OPENAI_API_KEY", "sk-test") os.environ.setdefault("DEBUG", "false") -from app.providers.email import _get_smtp_config, send_email +from app.providers.email import _format_email_exception, _get_smtp_config, send_email def _smtp_instance_mock() -> MagicMock: @@ -35,7 +35,7 @@ def _smtp_instance_mock() -> MagicMock: class TestSendEmail: async def test_returns_false_when_host_missing(self): with patch("app.providers.email.aiosmtplib.SMTP") as smtp_cls: - ok = await send_email( + result = await send_email( to="a@b.com", subject="s", html_body="

x

", @@ -45,12 +45,13 @@ async def test_returns_false_when_host_missing(self): "smtp_from": "", }, ) - assert ok is False + assert result.ok is False + assert result.error == "SMTP not configured: missing host or username" smtp_cls.assert_not_called() async def test_returns_false_when_username_missing(self): with patch("app.providers.email.aiosmtplib.SMTP") as smtp_cls: - ok = await send_email( + result = await send_email( to="a@b.com", subject="s", html_body="

x

", @@ -59,13 +60,14 @@ async def test_returns_false_when_username_missing(self): "smtp_password": "p", }, ) - assert ok is False + assert result.ok is False + assert result.error == "SMTP not configured: missing host or username" smtp_cls.assert_not_called() async def test_returns_false_when_smtp_username_empty_string(self): """Early exit: missing username before from_addr is evaluated.""" with patch("app.providers.email.aiosmtplib.SMTP") as smtp_cls: - ok = await send_email( + result = await send_email( to="a@b.com", subject="s", html_body="

x

", @@ -75,13 +77,14 @@ async def test_returns_false_when_smtp_username_empty_string(self): "smtp_password": "p", }, ) - assert ok is False + assert result.ok is False + assert result.error == "SMTP not configured: missing host or username" smtp_cls.assert_not_called() async def test_returns_false_when_from_addr_empty_after_strip(self): """Hits `if not from_addr`: smtp_from is whitespace-only (truthy before .strip()).""" with patch("app.providers.email.aiosmtplib.SMTP") as smtp_cls: - ok = await send_email( + result = await send_email( to="a@b.com", subject="s", html_body="

x

", @@ -92,13 +95,14 @@ async def test_returns_false_when_from_addr_empty_after_strip(self): "smtp_from": " ", }, ) - assert ok is False + assert result.ok is False + assert result.error == "SMTP not configured: empty smtp_from and username; set From address or SMTP user" smtp_cls.assert_not_called() async def test_returns_false_when_whitespace_only_username_no_from(self): """from_addr = ('' or ' ' or '').strip() -> empty.""" with patch("app.providers.email.aiosmtplib.SMTP") as smtp_cls: - ok = await send_email( + result = await send_email( to="a@b.com", subject="s", html_body="

x

", @@ -108,13 +112,14 @@ async def test_returns_false_when_whitespace_only_username_no_from(self): "smtp_password": "p", }, ) - assert ok is False + assert result.ok is False + assert result.error == "SMTP not configured: empty smtp_from and username; set From address or SMTP user" smtp_cls.assert_not_called() async def test_smtp_from_empty_string_falls_back_to_username(self): inst = _smtp_instance_mock() with patch("app.providers.email.aiosmtplib.SMTP", return_value=inst) as smtp_cls: - ok = await send_email( + result = await send_email( to="to@example.com", subject="Subj", html_body="

h

", @@ -127,7 +132,8 @@ async def test_smtp_from_empty_string_falls_back_to_username(self): "smtp_from": "", }, ) - assert ok is True + assert result.ok is True + assert result.error is None smtp_cls.assert_called_once() _, kwargs = smtp_cls.call_args assert kwargs["hostname"] == "smtp.example.com" @@ -145,7 +151,7 @@ async def test_smtp_from_empty_string_falls_back_to_username(self): async def test_smtp_from_set_uses_config_value(self): inst = _smtp_instance_mock() with patch("app.providers.email.aiosmtplib.SMTP", return_value=inst): - ok = await send_email( + result = await send_email( to="to@example.com", subject="S", html_body="

x

", @@ -157,14 +163,14 @@ async def test_smtp_from_set_uses_config_value(self): "smtp_from": "Custom ", }, ) - assert ok is True + assert result.ok is True msg = inst.send_message.await_args[0][0] assert msg["From"] == "Custom " async def test_port_465_uses_tls_not_start_tls(self): inst = _smtp_instance_mock() with patch("app.providers.email.aiosmtplib.SMTP", return_value=inst) as smtp_cls: - ok = await send_email( + result = await send_email( to="t@e.com", subject="S", html_body="

x

", @@ -176,7 +182,7 @@ async def test_port_465_uses_tls_not_start_tls(self): "smtp_from": "", }, ) - assert ok is True + assert result.ok is True _, kwargs = smtp_cls.call_args assert kwargs["port"] == 465 assert kwargs["use_tls"] is True @@ -186,7 +192,7 @@ async def test_returns_false_on_smtp_error(self): inst = _smtp_instance_mock() inst.login = AsyncMock(side_effect=RuntimeError("auth failed")) with patch("app.providers.email.aiosmtplib.SMTP", return_value=inst): - ok = await send_email( + result = await send_email( to="t@e.com", subject="S", html_body="

x

", @@ -197,7 +203,8 @@ async def test_returns_false_on_smtp_error(self): "smtp_from": "u@e.com", }, ) - assert ok is False + assert result.ok is False + assert result.error == "auth failed" class _KeyValueResult: @@ -243,3 +250,8 @@ async def test_missing_keys_omitted_from_dict(self): sess = _FakeAsyncSession([("smtp_host", "only-host")]) cfg = await _get_smtp_config(sess) # type: ignore[arg-type] assert cfg == {"smtp_host": "only-host"} + + +def test_format_email_exception_falls_back_to_exception_type(): + assert _format_email_exception(RuntimeError("")) == "RuntimeError" + assert _format_email_exception(RuntimeError("auth failed")) == "auth failed" diff --git a/apps/api/tests/unit/test_logging_config.py b/apps/api/tests/unit/test_logging_config.py new file mode 100644 index 0000000..4b5ccdd --- /dev/null +++ b/apps/api/tests/unit/test_logging_config.py @@ -0,0 +1,45 @@ +""" +Regression tests for logging configuration. +""" + +from __future__ import annotations + +import logging +from datetime import datetime + +from app.logging_config import setup_logging + + +def test_setup_logging_writes_task_info_to_dedicated_file(tmp_path): + root = logging.getLogger() + original_handlers = list(root.handlers) + original_level = root.level + + try: + setup_logging(debug=True, logs_dir=tmp_path) + + logging.getLogger("app.workers.tasks.scheduler").info("scheduled task dispatched") + logging.getLogger("app.main").info("api info should stay out") + logging.getLogger("app.main").warning("api warning should stay in default log") + + today = datetime.now().strftime("%Y-%m-%d") + default_log = tmp_path / f"{today}.log" + task_log = tmp_path / f"scheduled-tasks-{today}.log" + + assert task_log.exists() + assert default_log.exists() + + task_content = task_log.read_text(encoding="utf-8") + default_content = default_log.read_text(encoding="utf-8") + + assert "scheduled task dispatched" in task_content + assert "api info should stay out" not in task_content + assert "api warning should stay in default log" in default_content + assert "scheduled task dispatched" not in default_content + finally: + for handler in list(root.handlers): + handler.close() + root.handlers.clear() + for handler in original_handlers: + root.addHandler(handler) + root.setLevel(original_level) diff --git a/apps/api/tests/unit/test_retrieve_routers_auth.py b/apps/api/tests/unit/test_retrieve_routers_auth.py new file mode 100644 index 0000000..fbaa3f1 --- /dev/null +++ b/apps/api/tests/unit/test_retrieve_routers_auth.py @@ -0,0 +1,59 @@ +"""HTTP coverage for retrieve-related routers (auth + missing notebook).""" + +from __future__ import annotations + +import os +import uuid + +import pytest +from httpx import AsyncClient + +requires_postgres = os.environ.get("DATABASE_URL", "sqlite").startswith("sqlite") is False +pytestmark = pytest.mark.skipif( + not requires_postgres, + reason="SQLite test DB cannot compile JSONB schema; set DATABASE_URL to PostgreSQL", +) + + +@pytest.mark.asyncio +async def test_writing_context_unknown_notebook_404(client: AsyncClient, auth_headers: dict): + r = await client.post( + "/api/v1/ai/writing-context", + json={ + "notebook_id": str(uuid.uuid4()), + "text_around_cursor": "x" * 30, + }, + headers=auth_headers, + ) + assert r.status_code == 404 + + +@pytest.mark.asyncio +async def test_related_knowledge_unknown_notebook_404(client: AsyncClient, auth_headers: dict): + r = await client.get( + f"/api/v1/notebooks/{uuid.uuid4()}/related-knowledge", + headers=auth_headers, + ) + assert r.status_code == 404 + + +@pytest.mark.asyncio +async def test_writing_context_short_text_skips_retrieval_no_404( + client: AsyncClient, auth_headers: dict, db_session, test_user +): + """Fewer than 20 non-whitespace chars returns empty chunks without notebook lookup.""" + from app.models import Notebook + + user, _ = test_user + nb = Notebook(id=uuid.uuid4(), user_id=user.id, title="NB") + db_session.add(nb) + await db_session.commit() + + r = await client.post( + "/api/v1/ai/writing-context", + json={"notebook_id": str(nb.id), "text_around_cursor": "short"}, + headers=auth_headers, + ) + assert r.status_code == 200 + body = r.json() + assert body.get("data", {}).get("chunks") == [] diff --git a/apps/api/tests/unit/test_scheduler_delivery.py b/apps/api/tests/unit/test_scheduler_delivery.py new file mode 100644 index 0000000..b72c78a --- /dev/null +++ b/apps/api/tests/unit/test_scheduler_delivery.py @@ -0,0 +1,41 @@ +""" +Unit tests for scheduled task delivery summaries. +""" + +from app.workers.tasks.scheduler import summarize_delivery_outcome + + +def test_summarize_delivery_outcome_reports_email_failure_reason(): + summary, issues = summarize_delivery_outcome( + { + "email": "failed", + "email_error": "SMTP not configured: missing host or username", + } + ) + + assert summary == "邮件发送失败" + assert issues == "邮件投递失败:SMTP not configured: missing host or username" + + +def test_summarize_delivery_outcome_reports_combined_success(): + summary, issues = summarize_delivery_outcome( + { + "email": "sent", + "note": "created", + } + ) + + assert summary == "邮件已发送,已写入笔记" + assert issues is None + + +def test_summarize_delivery_outcome_reports_multiple_issues(): + summary, issues = summarize_delivery_outcome( + { + "email": "skipped_no_address", + "note": "skipped_no_notebook", + } + ) + + assert summary == "缺少收件邮箱,未找到笔记本" + assert issues == "邮件投递失败:未配置收件邮箱;笔记投递失败:未找到可写入的系统笔记本" diff --git a/apps/web/messages/en.json b/apps/web/messages/en.json index 2db1ee2..5ae7e86 100644 --- a/apps/web/messages/en.json +++ b/apps/web/messages/en.json @@ -1033,7 +1033,12 @@ "newTaskCardLabel": "New Task", "historyTitle": "{name} · Execution History", "noHistory": "No execution records", - "sourcesCount": "{count} sources" + "sourcesCount": "{count} sources", + "deliveryEmailSent": "Email sent", + "deliveryEmailFailed": "Email failed", + "deliveryEmailSkippedNoAddress": "Missing recipient email", + "deliveryNoteCreated": "Saved to note", + "deliveryNoteSkippedNoNotebook": "No notebook available" }, "genui": { "kanbanStreaming": "Generating kanban...", diff --git a/apps/web/messages/zh.json b/apps/web/messages/zh.json index 1d46362..8008cb9 100644 --- a/apps/web/messages/zh.json +++ b/apps/web/messages/zh.json @@ -1034,7 +1034,12 @@ "newTaskCardLabel": "新建任务", "historyTitle": "{name} · 执行历史", "noHistory": "暂无执行记录", - "sourcesCount": "{count} 来源" + "sourcesCount": "{count} 来源", + "deliveryEmailSent": "邮件已发送", + "deliveryEmailFailed": "邮件发送失败", + "deliveryEmailSkippedNoAddress": "未配置收件邮箱", + "deliveryNoteCreated": "已写入笔记", + "deliveryNoteSkippedNoNotebook": "未找到投递笔记本" }, "genui": { "kanbanStreaming": "正在生成看板...", diff --git a/apps/web/src/features/tasks/task-delivery.test.ts b/apps/web/src/features/tasks/task-delivery.test.ts new file mode 100644 index 0000000..9b8d24d --- /dev/null +++ b/apps/web/src/features/tasks/task-delivery.test.ts @@ -0,0 +1,70 @@ +import { describe, expect, it } from "vitest"; + +import { getTaskDeliveryBadges } from "./task-delivery"; + +describe("getTaskDeliveryBadges", () => { + it("maps email failures with detail", () => { + expect( + getTaskDeliveryBadges({ + email: "failed", + email_error: "auth failed", + }), + ).toEqual([ + { + key: "deliveryEmailFailed", + tone: "error", + detail: "auth failed", + }, + ]); + }); + + it("falls back to a default detail when backend reason is missing", () => { + expect( + getTaskDeliveryBadges({ + email: "failed", + }), + ).toEqual([ + { + key: "deliveryEmailFailed", + tone: "error", + detail: "请检查 SMTP 配置或认证信息", + }, + ]); + }); + + it("maps successful multi-channel delivery", () => { + expect( + getTaskDeliveryBadges({ + email: "sent", + note: "created", + }), + ).toEqual([ + { + key: "deliveryEmailSent", + tone: "success", + }, + { + key: "deliveryNoteCreated", + tone: "success", + }, + ]); + }); + + it("maps skipped delivery states", () => { + expect( + getTaskDeliveryBadges({ + email: "skipped_no_address", + note: "skipped_no_notebook", + }), + ).toEqual([ + { + key: "deliveryEmailSkippedNoAddress", + tone: "muted", + }, + { + key: "deliveryNoteSkippedNoNotebook", + tone: "muted", + }, + ]); + }); +}); diff --git a/apps/web/src/features/tasks/task-delivery.ts b/apps/web/src/features/tasks/task-delivery.ts new file mode 100644 index 0000000..e69da6c --- /dev/null +++ b/apps/web/src/features/tasks/task-delivery.ts @@ -0,0 +1,42 @@ +import type { TaskRun } from "@/services/task-service"; + +export type DeliveryBadgeTone = "success" | "error" | "muted"; + +export type DeliveryBadge = { + key: string; + tone: DeliveryBadgeTone; + detail?: string; +}; + +export function getTaskDeliveryBadges( + deliveryStatus: TaskRun["delivery_status"], +): DeliveryBadge[] { + if (!deliveryStatus) return []; + + const badges: DeliveryBadge[] = []; + const emailStatus = deliveryStatus.email; + const emailError = deliveryStatus.email_error; + const noteStatus = deliveryStatus.note; + + if (emailStatus === "sent") { + badges.push({ key: "deliveryEmailSent", tone: "success" }); + } else if (emailStatus === "failed") { + badges.push({ + key: "deliveryEmailFailed", + tone: "error", + detail: typeof emailError === "string" && emailError.trim() + ? emailError + : "请检查 SMTP 配置或认证信息", + }); + } else if (emailStatus === "skipped_no_address") { + badges.push({ key: "deliveryEmailSkippedNoAddress", tone: "muted" }); + } + + if (noteStatus === "created") { + badges.push({ key: "deliveryNoteCreated", tone: "success" }); + } else if (noteStatus === "skipped_no_notebook") { + badges.push({ key: "deliveryNoteSkippedNoNotebook", tone: "muted" }); + } + + return badges; +} diff --git a/apps/web/src/features/tasks/task-history-dialog.tsx b/apps/web/src/features/tasks/task-history-dialog.tsx index ed6a9de..248cfb5 100644 --- a/apps/web/src/features/tasks/task-history-dialog.tsx +++ b/apps/web/src/features/tasks/task-history-dialog.tsx @@ -5,6 +5,7 @@ import { AlertCircle, CheckCircle2, Clock, Loader2, X } from "lucide-react"; import { AnimatePresence, m } from "framer-motion"; import { useTranslations } from "next-intl"; import { getTaskRuns, type TaskRun } from "@/services/task-service"; +import { getTaskDeliveryBadges } from "./task-delivery"; function formatDuration(ms: number | null) { if (!ms) return "—"; @@ -16,6 +17,7 @@ function RunItem({ run }: { run: TaskRun }) { const t = useTranslations("tasks"); const isSuccess = run.status === "success"; const isFailed = run.status === "failed"; + const deliveryBadges = getTaskDeliveryBadges(run.delivery_status); return (
@@ -43,6 +45,33 @@ function RunItem({ run }: { run: TaskRun }) { {run.error_message && (

{run.error_message}

)} + {deliveryBadges.length > 0 && ( +
+
+ {deliveryBadges.map((badge) => ( + + {t(badge.key)} + + ))} +
+ {deliveryBadges + .filter((badge) => badge.detail) + .map((badge) => ( +

+ {badge.detail} +

+ ))} +
+ )}
); } diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 324909b..8467c6f 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -124,6 +124,39 @@ services: condition: service_healthy command: celery -A app.workers.tasks.celery_app worker --loglevel=info --concurrency=4 + beat: + image: ghcr.io/linmoqc/lyranote/api:main + restart: unless-stopped + env_file: + - path: ./.env + required: false + environment: + DATABASE_URL: postgresql+asyncpg://lyranote:${POSTGRES_PASSWORD:-lyranote}@db:5432/lyranote + REDIS_URL: redis://redis:6379/0 + STORAGE_S3_ENDPOINT_URL: http://minio:9000 + STORAGE_S3_ACCESS_KEY: ${MINIO_ROOT_USER:-lyranote} + STORAGE_S3_SECRET_KEY: ${MINIO_ROOT_PASSWORD:-lyranote123} + STORAGE_S3_BUCKET: lyranote + STORAGE_S3_REGION: us-east-1 + APP_BASE_URL: ${APP_BASE_URL:-https://your-domain.com} + API_PREFIX: ${API_PREFIX:-/api/v1} + FRONTEND_URL: ${FRONTEND_URL:-https://your-domain.com} + CORS_ORIGINS: ${CORS_ORIGINS:-https://your-domain.com} + JWT_SECRET: ${JWT_SECRET:-} + JWT_EXPIRE_DAYS: ${JWT_EXPIRE_DAYS:-30} + GOOGLE_CLIENT_ID: ${GOOGLE_CLIENT_ID:-} + GOOGLE_CLIENT_SECRET: ${GOOGLE_CLIENT_SECRET:-} + GITHUB_CLIENT_ID: ${GITHUB_CLIENT_ID:-} + GITHUB_CLIENT_SECRET: ${GITHUB_CLIENT_SECRET:-} + MEMORY_MODE: ${MEMORY_MODE:-server} + DEBUG: ${DEBUG:-false} + volumes: + - storage_data:/app/storage + depends_on: + api: + condition: service_healthy + command: celery -A app.workers.tasks.celery_app beat --loglevel=info + web: image: ghcr.io/linmoqc/lyranote/web:main restart: unless-stopped diff --git a/docker-compose.yml b/docker-compose.yml index e2eaf94..4e1c899 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -110,6 +110,29 @@ services: condition: service_healthy command: celery -A app.workers.tasks.celery_app worker --loglevel=info --concurrency=4 + beat: + build: + context: ./apps/api + restart: unless-stopped + env_file: + - ./apps/api/.env + environment: + DATABASE_URL: postgresql+asyncpg://lyranote:lyranote@db:5432/lyranote + REDIS_URL: redis://redis:6379/0 + STORAGE_S3_ENDPOINT_URL: http://minio:9000 + STORAGE_S3_PUBLIC_URL: http://localhost:9000 + APP_BASE_URL: http://localhost:8000 + API_PREFIX: /api/v1 + FRONTEND_URL: http://localhost:3000 + CORS_ORIGINS: http://localhost:3000 + DEBUG: "false" + volumes: + - storage_data:/app/storage + depends_on: + api: + condition: service_healthy + command: celery -A app.workers.tasks.celery_app beat --loglevel=info + web: build: context: . diff --git a/docs/exec-plans/active/local-startup-cleanup.md b/docs/exec-plans/active/local-startup-cleanup.md new file mode 100644 index 0000000..233e779 --- /dev/null +++ b/docs/exec-plans/active/local-startup-cleanup.md @@ -0,0 +1,65 @@ +# Exec Plan: 本地启动预清理策略 + +**状态**: 进行中 +**创建时间**: 2026-03-27 +**完成时间**: — +**负责人**: Agent + +--- + +## 目标 + +为 `./lyra local` 增加启动前预清理策略,自动清掉 LyraNote 自己遗留的本地 API / Celery 进程,以及误跑的应用层 Docker 容器,避免旧 worker 抢占同一 Redis 队列。 + +--- + +## 背景 & 上下文 + +- 相关启动入口:`packages/cli/src/commands/local.js` +- 相关工具模块:`packages/cli/src/utils/proc.js` +- 相关状态页:`packages/cli/src/commands/status.js` +- 影响范围:CLI / 本地开发运维 + +--- + +## 任务分解 + +### CLI +- [ ] 在 `local.js` 中增加启动前预清理步骤 +- [ ] 仅清理 LyraNote 自己的旧进程与应用层容器,不动 DB/Redis/MinIO 数据层 +- [ ] 保留端口检查作为最后一道兜底 + +### 状态展示 +- [ ] 在 `status.js` 中纳入 `beat` 容器状态 + +### 测试 / 验证 +- [ ] 用 `node --check` 校验修改后的 CLI 脚本语法 +- [ ] 手动验证 `./lyra local` 的预清理行为 + +--- + +## 测试策略 + +**手动验证覆盖**: +- 启动前存在残留 `celery worker/beat` 时,`./lyra local` 会先清理再启动 +- 存在 `lyranote-api-1 / worker-1 / beat-1 / web-1` 容器时,仅应用层容器会被停止 +- DB/Redis/MinIO 容器与数据不受影响 + +**脚本验证**: +- `node --check packages/cli/src/commands/local.js` +- `node --check packages/cli/src/commands/status.js` + +--- + +## 验收标准(全部满足才算完成) + +- [ ] `./lyra local` 启动前会自动清理旧 API / worker / beat +- [ ] 不会误停数据层容器 +- [ ] `status` 可以显示 `beat` 容器 +- [ ] CLI 脚本语法校验通过 + +--- + +## 决策日志 + +- 2026-03-27: 预清理范围限定为 LyraNote 自身进程与应用层容器,避免为了清队列问题误伤数据库和缓存服务。 diff --git a/docs/exec-plans/active/scheduled-task-email-delivery-status.md b/docs/exec-plans/active/scheduled-task-email-delivery-status.md new file mode 100644 index 0000000..e20d83b --- /dev/null +++ b/docs/exec-plans/active/scheduled-task-email-delivery-status.md @@ -0,0 +1,70 @@ +# Exec Plan: 定时任务邮件投递状态修复 + +**状态**: 进行中 +**创建时间**: 2026-03-27 +**完成时间**: — +**负责人**: Agent + +--- + +## 目标 + +修复定时任务“内容生成成功但邮件未投递时仍显示成功”的误导状态,让邮件/笔记投递结果与失败原因能够在后端状态和前端界面上被明确看到。 + +--- + +## 背景 & 上下文 + +- 相关后端模块:`apps/api/app/workers/tasks/scheduler.py`、`apps/api/app/providers/email.py` +- 相关前端模块:`apps/web/src/features/tasks/task-card.tsx`、`apps/web/src/features/tasks/task-history-dialog.tsx` +- 影响范围:后端 / 前端 + +--- + +## 任务分解 + +### 后端 +- [ ] 为邮件发送结果补充结构化返回值与失败原因 +- [ ] 在定时任务执行结果中记录投递状态摘要与错误信息 +- [ ] 保持现有任务执行主流程兼容,不引入额外 API 破坏性变更 + +### 前端 +- [ ] 在执行历史中显示投递状态明细 +- [ ] 保持任务卡片对最近一次错误的可见性 +- [ ] 补充中英文文案 + +### 测试 +- [ ] 编写后端单元测试:覆盖投递状态汇总逻辑 +- [ ] 编写前端单元测试:覆盖投递状态展示辅助逻辑 +- [ ] 跑测试全绿:`pytest tests/unit/test_scheduler_delivery.py -v` + `pnpm test -- task-delivery.test.ts` + +--- + +## 测试策略 + +**后端单元测试覆盖**: +- 邮件投递失败时,能生成明确错误摘要 +- 多种投递状态组合时,能得到正确的结果摘要与错误信息 + +**前端单元测试覆盖**: +- `delivery_status` 中的 `sent / failed / skipped` 能映射为正确展示文案 key +- 含 `email_error` 时能显示错误详情 + +**测试文件位置**: +- `apps/api/tests/unit/test_scheduler_delivery.py` +- `apps/web/src/features/tasks/task-delivery.test.ts` + +--- + +## 验收标准(全部满足才算完成) + +- [ ] 邮件发送失败时,任务卡片能显示明确错误 +- [ ] 执行历史能显示投递状态而不是仅显示“生成成功” +- [ ] `pytest tests/unit/test_scheduler_delivery.py -v` 全绿 +- [ ] `pnpm test -- task-delivery.test.ts` 全绿 + +--- + +## 决策日志 + +- 2026-03-27: 采用“保留任务生成成功语义,但额外暴露投递失败原因”的方案,避免将所有投递问题都算作任务主流程失败,同时消除 UI 误导。 diff --git a/docs/exec-plans/active/scheduled-task-email-error-fallback.md b/docs/exec-plans/active/scheduled-task-email-error-fallback.md new file mode 100644 index 0000000..4bc7efe --- /dev/null +++ b/docs/exec-plans/active/scheduled-task-email-error-fallback.md @@ -0,0 +1,58 @@ +# Exec Plan: 定时任务邮件错误原因兜底 + +**状态**: 进行中 +**创建时间**: 2026-03-27 +**完成时间**: — +**负责人**: Agent + +--- + +## 目标 + +修复定时任务邮件发送失败时“没有显示任何原因”的问题,即使 SMTP 库抛出的异常消息为空,也要生成稳定、可读的错误说明。 + +--- + +## 背景 & 上下文 + +- 相关后端模块:`apps/api/app/providers/email.py` +- 相关前端模块:`apps/web/src/features/tasks/task-delivery.ts` +- 影响范围:后端 / 前端 + +--- + +## 任务分解 + +### 后端 +- [ ] 为 SMTP 异常生成稳定的错误消息回退 +- [ ] 保留现有错误传播链路,确保 `delivery_status.email_error` 不为空 + +### 前端 +- [ ] 当前端收到 `email=failed` 但无细节时,显示默认说明 + +### 测试 +- [ ] 编写后端单元测试:覆盖空异常消息回退 +- [ ] 编写前端单元测试:覆盖前端默认兜底展示 + +--- + +## 测试策略 + +**后端单元测试覆盖**: +- SMTP 异常消息为空时,返回异常类型名而不是空字符串 + +**前端单元测试覆盖**: +- `email=failed` 且没有 `email_error` 时,展示默认提示 + +--- + +## 验收标准(全部满足才算完成) + +- [ ] 邮件发送失败时,执行历史至少显示一个默认原因 +- [ ] 后端和前端对应测试通过 + +--- + +## 决策日志 + +- 2026-03-27: 即使真实异常消息为空,也优先展示异常类型名或默认提示,避免 UI 出现“失败但无原因”的空白状态。 diff --git a/docs/exec-plans/active/scheduled-task-observability-fix.md b/docs/exec-plans/active/scheduled-task-observability-fix.md new file mode 100644 index 0000000..95bce9a --- /dev/null +++ b/docs/exec-plans/active/scheduled-task-observability-fix.md @@ -0,0 +1,72 @@ +# Exec Plan: 定时任务可观测性修复 + +**状态**: 进行中 +**创建时间**: 2026-03-27 +**完成时间**: — +**负责人**: Agent + +--- + +## 目标 + +修复 LyraNote 定时任务“没有自动触发 / 没有稳定文件日志”的问题,让 Celery Beat 在本地与 Docker 部署中都能正常运行,并让后台任务的关键 `INFO` 日志可落盘用于排障与监控。 + +--- + +## 背景 & 上下文 + +- 相关设计文档:`docs/design-docs/scheduled-tasks.md` +- 相关后端模块:`apps/api/app/workers/celery_app.py`、`apps/api/app/workers/tasks/scheduler.py` +- 相关启动入口:`packages/cli/src/commands/local.js`、`docker-compose.yml`、`docker-compose.prod.yml` +- 影响范围:后端 / 运维启动链路 + +--- + +## 任务分解 + +### 后端 +- [ ] 调整 `apps/api/app/logging_config.py`,为后台任务增加独立的 `INFO` 文件日志落点 +- [ ] 保持现有通用告警日志策略不变,避免主日志文件被普通 `INFO` 淹没 + +### 启动链路 +- [ ] 修改 `packages/cli/src/commands/local.js`,本地开发时新增 `celery beat` 进程 +- [ ] 修改 `docker-compose.yml`,新增 `beat` 服务 +- [ ] 修改 `docker-compose.prod.yml`,新增 `beat` 服务 + +### 测试 +- [ ] 编写后端单元测试:验证后台任务 `INFO` 日志会写入专用日志文件,非任务 `INFO` 不会混入 +- [ ] 跑测试全绿:`pytest tests/unit/test_logging_config.py -v` + +--- + +## 测试策略 + +**单元测试覆盖**: +- `setup_logging`:验证会创建后台任务专用日志文件 +- `setup_logging`:验证 `app.workers.tasks.*` 的 `INFO` 日志会落盘 +- `setup_logging`:验证普通应用 `INFO` 日志不会进入后台任务日志文件 + +**手动验证覆盖**: +- 本地运行 `./lyra local` 时可看到单独的 Beat 进程 +- Docker Compose 启动后可看到 `beat` 服务 +- 触发定时任务后,`apps/api/logs/scheduled-tasks-YYYY-MM-DD.log` 出现执行记录 + +**测试文件位置**: +- `apps/api/tests/unit/test_logging_config.py` + +--- + +## 验收标准(全部满足才算完成) + +- [ ] 本地开发模式会启动 `celery beat` +- [ ] Docker 开发 / 生产编排中包含独立 `beat` 服务 +- [ ] 定时任务关键 `INFO` 日志可落到专用文件 +- [ ] `pytest tests/unit/test_logging_config.py -v` 全绿 +- [ ] `ruff check apps/api/app/logging_config.py apps/api/tests/unit/test_logging_config.py` 无报错 + +--- + +## 决策日志 + +- 2026-03-27: 采用“新增独立后台任务日志文件”而不是直接把全局文件日志降到 `INFO`,以避免 API 与第三方库常规日志淹没定时任务排障信号。 +- 2026-03-27: 采用独立 `beat` 进程,而不是 `worker -B`,以保持开发与生产拓扑一致并降低调度器单点耦合。 diff --git a/packages/cli/src/commands/local.js b/packages/cli/src/commands/local.js index a238081..233ca29 100644 --- a/packages/cli/src/commands/local.js +++ b/packages/cli/src/commands/local.js @@ -4,10 +4,106 @@ import fs from 'fs'; import path from 'path'; import chalk from 'chalk'; import { section, log, warn, info, printAccessInfo, waitTcp, tcpReady } from '../utils/ui.js'; -import { checkCommand, checkPort, prompt, exec } from '../utils/proc.js'; +import { checkCommand, checkPort, prompt, exec, execQ } from '../utils/proc.js'; import { ROOT_DIR, API_DIR, WEB_DIR } from '../utils/paths.js'; const VENV = path.join(API_DIR, '.venv'); +const APP_CONTAINER_NAMES = ['lyranote-api-1', 'lyranote-worker-1', 'lyranote-beat-1', 'lyranote-web-1']; + +function shQuote(value) { + return `'${String(value).replace(/'/g, `'\\''`)}'`; +} + +function listPidsByPattern(pattern) { + const out = execQ(`pgrep -f ${shQuote(pattern)}`); + return out.split(/\s+/).filter(Boolean); +} + +function describePid(pid) { + return execQ(`ps -p ${pid} -o command= 2>/dev/null`) || 'unknown'; +} + +async function terminatePids(pids, label) { + if (!pids.length) return; + + for (const pid of pids) { + try { + exec(`kill ${pid}`, { shell: true, stdio: 'ignore' }); + } catch { /* ignore */ } + } + + await new Promise((resolve) => setTimeout(resolve, 500)); + + const survivors = pids.filter((pid) => !!execQ(`ps -p ${pid} -o pid= 2>/dev/null`)); + for (const pid of survivors) { + try { + exec(`kill -9 ${pid}`, { shell: true, stdio: 'ignore' }); + } catch { /* ignore */ } + } + + log(`已清理 ${label} 残留进程 ${pids.join(', ')}`); +} + +async function cleanupWorkspacePort(port, label) { + const pid = execQ(`lsof -i :${port} -sTCP:LISTEN -t 2>/dev/null | head -1`); + if (!pid) return; + + const command = describePid(pid); + const cwd = execQ(`lsof -a -p ${pid} -d cwd -Fn 2>/dev/null | tail -n 1 | sed 's/^n//'`); + const belongsToWorkspace = command.includes(ROOT_DIR) || cwd.startsWith(ROOT_DIR); + + if (!belongsToWorkspace) return; + + warn(`${label} 检测到 LyraNote 残留端口占用 (PID ${pid})`); + await terminatePids([pid], `${label} 端口`); +} + +async function cleanupStaleProcesses() { + section('预清理残留进程'); + process.chdir(ROOT_DIR); + + const runningContainers = execQ("docker ps --format '{{.Names}}'") + .split('\n') + .map((name) => name.trim()) + .filter((name) => APP_CONTAINER_NAMES.includes(name)); + + if (runningContainers.length) { + warn(`检测到残留应用容器:${runningContainers.join(', ')}`); + try { exec('docker compose stop api worker beat web', { shell: true, cwd: ROOT_DIR, stdio: 'ignore' }); } catch { /* ignore */ } + try { exec('docker compose -f docker-compose.prod.yml stop api worker beat web', { shell: true, cwd: ROOT_DIR, stdio: 'ignore' }); } catch { /* ignore */ } + log('已停止残留应用层容器(保留数据层容器)'); + } else { + info('未检测到残留应用层容器'); + } + + const uvicorn = path.join(VENV, 'bin', 'uvicorn'); + const celery = path.join(VENV, 'bin', 'celery'); + const patterns = [ + { label: 'API', pids: listPidsByPattern(`${uvicorn} app.main:app`) }, + { label: 'Celery Worker', pids: listPidsByPattern(`${celery} -A app.workers.tasks.celery_app worker`) }, + { label: 'Celery Beat', pids: listPidsByPattern(`${celery} -A app.workers.tasks.celery_app beat`) }, + ]; + + let cleaned = false; + for (const item of patterns) { + if (!item.pids.length) continue; + cleaned = true; + const preview = item.pids + .slice(0, 2) + .map((pid) => describePid(pid)) + .filter(Boolean) + .join(' | '); + if (preview) warn(`${item.label} 残留命令:${preview}`); + await terminatePids(item.pids, item.label); + } + + await cleanupWorkspacePort(8000, 'FastAPI'); + await cleanupWorkspacePort(3000, 'Next.js'); + + if (!cleaned) { + info('未检测到残留 API / Worker / Beat 进程'); + } +} async function ensureEnv() { const envFile = path.join(API_DIR, '.env'); @@ -33,6 +129,7 @@ export async function startLocal() { checkCommand('docker'); await ensureEnv(); + await cleanupStaleProcesses(); // ── 数据层 ── section('检查数据层'); @@ -114,6 +211,13 @@ export async function startLocal() { args: ['-A', 'app.workers.tasks.celery_app', 'worker', '--loglevel=info', '--concurrency=2', '-n', 'worker@%h'], cwd: API_DIR, }, + { + name: 'beat', + label: 'Beat', + command: celery, + args: ['-A', 'app.workers.tasks.celery_app', 'beat', '--loglevel=info'], + cwd: API_DIR, + }, { name: 'web', label: 'Web', @@ -123,4 +227,3 @@ export async function startLocal() { }, ]); } - diff --git a/packages/cli/src/commands/logs.js b/packages/cli/src/commands/logs.js index e3eae79..2c6a419 100644 --- a/packages/cli/src/commands/logs.js +++ b/packages/cli/src/commands/logs.js @@ -14,6 +14,7 @@ export async function showLogs() { { name: '所有服务', value: '' }, { name: 'API', value: 'api' }, { name: 'Worker', value: 'worker' }, + { name: 'Beat', value: 'beat' }, { name: 'Web', value: 'web' }, { name: '数据库', value: 'db' }, { name: 'Redis', value: 'redis' }, diff --git a/packages/cli/src/commands/status.js b/packages/cli/src/commands/status.js index 1427c38..db88a50 100644 --- a/packages/cli/src/commands/status.js +++ b/packages/cli/src/commands/status.js @@ -4,6 +4,7 @@ import { execQ } from '../utils/proc.js'; const CONTAINERS = [ 'lyranote-api-1', 'lyranote-worker-1', + 'lyranote-beat-1', 'lyranote-web-1', 'lyranote-db-1', 'lyranote-redis-1',