diff --git a/ReportEngine/agent.py b/ReportEngine/agent.py index 5246787c2..6622deaeb 100644 --- a/ReportEngine/agent.py +++ b/ReportEngine/agent.py @@ -374,7 +374,14 @@ def _initialize_nodes(self): fallback_llm_clients=self.json_rescue_clients, error_log_dir=self.config.JSON_ERROR_LOG_DIR, ) - + self.fast_chapter_node = FastChapterGenerationNode( + self.llm_client, + self.validator, + self.chapter_storage, + fallback_llm_clients=self.json_rescue_clients, + error_log_dir=self.config.JSON_ERROR_LOG_DIR, + ) + self.model_discriminator = ModelCapabilityDiscriminator(self.config.REPORT_ENGINE_MODEL_NAME or "") def generate_report(self, query: str, reports: List[Any], forum_logs: str = "", custom_template: str = "", save_report: bool = True, stream_handler: Optional[Callable[[str, Dict[str, Any]], None]] = None) -> str: @@ -531,145 +538,166 @@ def emit(event_type: str, payload: Dict[str, Any]): self._persist_planning_artifacts(run_dir, layout_design, word_plan, template_overview) emit('stage', {'stage': 'storage_ready', 'run_dir': str(run_dir)}) - chapters = [] - chapter_max_attempts = max( - self._CONTENT_SPARSE_MIN_ATTEMPTS, self.config.CHAPTER_JSON_MAX_ATTEMPTS - ) - total_chapters = len(sections) # 总章节数 - completed_chapters = 0 # 已完成章节数 - - for section in sections: - logger.info(f"生成章节: {section.title}") - emit('chapter_status', { - 'chapterId': section.chapter_id, - 'title': section.title, - 'status': 'running' - }) - # 章节流式回调:把LLM返回的delta透传给SSE,便于前端实时渲染 - def chunk_callback(delta: str, meta: Dict[str, Any], section_ref: TemplateSection = section): - """ - 章节内容流式回调。 - - Args: - delta: LLM最新输出的增量文本。 - meta: 节点回传的章节元数据,兜底时使用。 - section_ref: 默认指向当前章节,保证在缺失元信息时也能定位。 - """ - emit('chapter_chunk', { - 'chapterId': meta.get('chapterId') or section_ref.chapter_id, - 'title': meta.get('title') or section_ref.title, - 'delta': delta + is_strong_model = self.model_discriminator.is_strong_model() + logger.info(f"模型能力判定: {'强模型' if is_strong_model else '弱模型'} (MODEL={self.config.REPORT_ENGINE_MODEL_NAME})") + + if is_strong_model: + # === 高速自适应通道 (Fast Track) === + try: + chapters = self.fast_chapter_node.run_continuous( + sections, + generation_context, + run_dir, + stream_handler=chunk_callback # 复用回调 + ) + emit('progress', {'progress': 90, 'message': '强模型连续生成完成'}) + except Exception as e: + logger.error(f"强模型生成失败,尝试回退到分章节模式: {e}") + # 回退到普通模式 + is_strong_model = False + + if not is_strong_model: + # === 安全分治通道 (Safe Lane) === + chapters = [] + chapter_max_attempts = max( + self._CONTENT_SPARSE_MIN_ATTEMPTS, self.config.CHAPTER_JSON_MAX_ATTEMPTS + ) + total_chapters = len(sections) # 总章节数 + completed_chapters = 0 # 已完成章节数 + for section in sections: + logger.info(f"生成章节: {section.title}") + emit('chapter_status', { + 'chapterId': section.chapter_id, + 'title': section.title, + 'status': 'running' }) + # 章节流式回调:把LLM返回的delta透传给SSE,便于前端实时渲染 + def chunk_callback(delta: str, meta: Dict[str, Any], section_ref: TemplateSection = section): + """ + 章节内容流式回调。 + + + + Args: + delta: LLM最新输出的增量文本。 + meta: 节点回传的章节元数据,兜底时使用。 + section_ref: 默认指向当前章节,保证在缺失元信息时也能定位。 + """ + emit('chapter_chunk', { + 'chapterId': meta.get('chapterId') or section_ref.chapter_id, + 'title': meta.get('title') or section_ref.title, + 'delta': delta + }) - chapter_payload: Dict[str, Any] | None = None - attempt = 1 - best_sparse_candidate: Dict[str, Any] | None = None - best_sparse_score = -1 - fallback_used = False - while attempt <= chapter_max_attempts: - try: - chapter_payload = self.chapter_generation_node.run( - section, - generation_context, - run_dir, - stream_callback=chunk_callback - ) - break - except (ChapterJsonParseError, ChapterContentError) as structured_error: - error_kind = ( - "content_sparse" if isinstance(structured_error, ChapterContentError) else "json_parse" - ) - readable_label = "内容密度异常" if error_kind == "content_sparse" else "JSON解析失败" - if isinstance(structured_error, ChapterContentError): - candidate = getattr(structured_error, "chapter_payload", None) - candidate_score = getattr(structured_error, "body_characters", 0) or 0 - if isinstance(candidate, dict) and candidate_score >= 0: - if candidate_score > best_sparse_score: - best_sparse_candidate = deepcopy(candidate) - best_sparse_score = candidate_score - will_fallback = ( - isinstance(structured_error, ChapterContentError) - and attempt >= chapter_max_attempts - and attempt >= self._CONTENT_SPARSE_MIN_ATTEMPTS - and best_sparse_candidate is not None - ) - logger.warning( - "章节 {title} {label}(第 {attempt}/{total} 次尝试): {error}", - title=section.title, - label=readable_label, - attempt=attempt, - total=chapter_max_attempts, - error=structured_error, - ) - status_value = 'retrying' if attempt < chapter_max_attempts or will_fallback else 'error' - status_payload = { - 'chapterId': section.chapter_id, - 'title': section.title, - 'status': status_value, - 'attempt': attempt, - 'error': str(structured_error), - 'reason': error_kind, - } - if will_fallback: - status_payload['warning'] = 'content_sparse_fallback_pending' - emit('chapter_status', status_payload) - if will_fallback: + chapter_payload: Dict[str, Any] | None = None + attempt = 1 + best_sparse_candidate: Dict[str, Any] | None = None + best_sparse_score = -1 + fallback_used = False + while attempt <= chapter_max_attempts: + try: + chapter_payload = self.chapter_generation_node.run( + section, + generation_context, + run_dir, + stream_callback=chunk_callback + ) + break + except (ChapterJsonParseError, ChapterContentError) as structured_error: + error_kind = ( + "content_sparse" if isinstance(structured_error, ChapterContentError) else "json_parse" + ) + readable_label = "内容密度异常" if error_kind == "content_sparse" else "JSON解析失败" + if isinstance(structured_error, ChapterContentError): + candidate = getattr(structured_error, "chapter_payload", None) + candidate_score = getattr(structured_error, "body_characters", 0) or 0 + if isinstance(candidate, dict) and candidate_score >= 0: + if candidate_score > best_sparse_score: + best_sparse_candidate = deepcopy(candidate) + best_sparse_score = candidate_score + will_fallback = ( + isinstance(structured_error, ChapterContentError) + and attempt >= chapter_max_attempts + and attempt >= self._CONTENT_SPARSE_MIN_ATTEMPTS + and best_sparse_candidate is not None + ) logger.warning( - "章节 {title} 达到最大尝试次数,保留字数最多(约 {score} 字)的版本作为兜底输出", + "章节 {title} {label}(第 {attempt}/{total} 次尝试): {error}", title=section.title, - score=best_sparse_score, + label=readable_label, + attempt=attempt, + total=chapter_max_attempts, + error=structured_error, ) - chapter_payload = self._finalize_sparse_chapter(best_sparse_candidate) - fallback_used = True - break - if attempt >= chapter_max_attempts: - raise - attempt += 1 - continue - except Exception as chapter_error: - if not self._should_retry_inappropriate_content_error(chapter_error): - raise - logger.warning( - "章节 {title} 触发内容安全限制(第 {attempt}/{total} 次尝试),准备重新生成: {error}", - title=section.title, - attempt=attempt, - total=chapter_max_attempts, - error=chapter_error, + status_value = 'retrying' if attempt < chapter_max_attempts or will_fallback else 'error' + status_payload = { + 'chapterId': section.chapter_id, + 'title': section.title, + 'status': status_value, + 'attempt': attempt, + 'error': str(structured_error), + 'reason': error_kind, + } + if will_fallback: + status_payload['warning'] = 'content_sparse_fallback_pending' + emit('chapter_status', status_payload) + if will_fallback: + logger.warning( + "章节 {title} 达到最大尝试次数,保留字数最多(约 {score} 字)的版本作为兜底输出", + title=section.title, + score=best_sparse_score, + ) + chapter_payload = self._finalize_sparse_chapter(best_sparse_candidate) + fallback_used = True + break + if attempt >= chapter_max_attempts: + raise + attempt += 1 + continue + except Exception as chapter_error: + if not self._should_retry_inappropriate_content_error(chapter_error): + raise + logger.warning( + "章节 {title} 触发内容安全限制(第 {attempt}/{total} 次尝试),准备重新生成: {error}", + title=section.title, + attempt=attempt, + total=chapter_max_attempts, + error=chapter_error, + ) + emit('chapter_status', { + 'chapterId': section.chapter_id, + 'title': section.title, + 'status': 'retrying' if attempt < chapter_max_attempts else 'error', + 'attempt': attempt, + 'error': str(chapter_error), + 'reason': 'content_filter' + }) + if attempt >= chapter_max_attempts: + raise + attempt += 1 + continue + if chapter_payload is None: + raise ChapterJsonParseError( + f"{section.title} 章节JSON在 {chapter_max_attempts} 次尝试后仍无法解析" ) - emit('chapter_status', { - 'chapterId': section.chapter_id, - 'title': section.title, - 'status': 'retrying' if attempt < chapter_max_attempts else 'error', - 'attempt': attempt, - 'error': str(chapter_error), - 'reason': 'content_filter' - }) - if attempt >= chapter_max_attempts: - raise - attempt += 1 - continue - if chapter_payload is None: - raise ChapterJsonParseError( - f"{section.title} 章节JSON在 {chapter_max_attempts} 次尝试后仍无法解析" - ) - chapters.append(chapter_payload) - completed_chapters += 1 # 更新已完成章节数 - # 计算当前进度:20% + 80% * (已完成章节数 / 总章节数),四舍五入 - chapter_progress = 20 + round(80 * completed_chapters / total_chapters) - emit('progress', { - 'progress': chapter_progress, - 'message': f'章节 {completed_chapters}/{total_chapters} 已完成' - }) - completion_status = { - 'chapterId': section.chapter_id, - 'title': section.title, - 'status': 'completed', - 'attempt': attempt, - } - if fallback_used: - completion_status['warning'] = 'content_sparse_fallback' - completion_status['warningMessage'] = self._CONTENT_SPARSE_WARNING_TEXT - emit('chapter_status', completion_status) + chapters.append(chapter_payload) + completed_chapters += 1 # 更新已完成章节数 + # 计算当前进度:20% + 80% * (已完成章节数 / 总章节数),四舍五入 + chapter_progress = 20 + round(80 * completed_chapters / total_chapters) + emit('progress', { + 'progress': chapter_progress, + 'message': f'章节 {completed_chapters}/{total_chapters} 已完成' + }) + completion_status = { + 'chapterId': section.chapter_id, + 'title': section.title, + 'status': 'completed', + 'attempt': attempt, + } + if fallback_used: + completion_status['warning'] = 'content_sparse_fallback' + completion_status['warningMessage'] = self._CONTENT_SPARSE_WARNING_TEXT + emit('chapter_status', completion_status) document_ir = self.document_composer.build_document( report_id, diff --git a/ReportEngine/nodes/fast_chapter_generation_node.py b/ReportEngine/nodes/fast_chapter_generation_node.py new file mode 100644 index 000000000..bb3ac14ed --- /dev/null +++ b/ReportEngine/nodes/fast_chapter_generation_node.py @@ -0,0 +1,189 @@ +from __future__ import annotations +import json +import re +from pathlib import Path +from typing import Any, Dict, List, Optional, Callable + +from loguru import logger +from .base_node import BaseNode +from ..core import TemplateSection, ChapterStorage +from ..ir import IRValidator +from .stream_supervisor import StreamSupervisor +from ..prompts import ( + SYSTEM_PROMPT_CHAPTER_JSON, + build_chapter_user_prompt +) +from ..nodes.chapter_generation_node import ChapterGenerationNode, ChapterJsonParseError + + +class FastChapterGenerationNode(ChapterGenerationNode): + """ + 双轨制 - 强模型自适应生成节点。 + + 特点: + 1. **有状态 (Stateful)**: 维护 conversation history,让模型知道前几章写了什么。 + 2. **流式监管 (Supervised)**: 使用 StreamSupervisor 实时监控,防止长生成崩溃。 + 3. **兼容输出**: 最终仍产出标准 Chapter JSON 落盘,适配现有 Renderer。 + """ + + def __init__( + self, + llm_client, + validator: IRValidator, + storage: ChapterStorage, + fallback_llm_clients: Optional[List[Any]] = None, + error_log_dir: Optional[str | Path] = None, + ): + super().__init__(llm_client, validator, storage, fallback_llm_clients, error_log_dir) + self.conversation_history: List[Dict[str, str]] = [] # 维护多轮对话历史 + + def run_continuous( + self, + sections: List[TemplateSection], + context: Dict[str, Any], + run_dir: Path, + stream_handler: Optional[Callable[[str, Dict[str, Any]], None]] = None, + **kwargs + ) -> List[Dict[str, Any]]: + """ + 连续生成所有章节。 + + 不同于基类的 run() 处理单章,此方法在一个 Session 中依次生成所有章节, + 并将前序章节的摘要或全文保留在 Context 中,实现"全局连贯"。 + """ + logger.info("[FastTrack] 启动强模型连续生成模式") + generated_chapters = [] + + # 初始化系统提示词(只设一次,作为对话背景) + # 注意:这里我们复用 SYSTEM_PROMPT_CHAPTER_JSON,但在多轮对话中可能需要调整 + # 为了简单起见,我们将在每轮 user message 中强化上下文 + + self.conversation_history = [] + + for i, section in enumerate(sections): + logger.info(f"[FastTrack] 正在生成第 {i + 1}/{len(sections)} 章: {section.title}") + + # 1. 构造当前章的 Prompt + # 我们复用 _build_payload,但会注入"前情提要" + llm_payload = self._build_payload(section, context) + + # 增强 Payload:注入前序章节的总结或内容(如果不太长) + if generated_chapters: + previous_summaries = [] + for prev in generated_chapters[-2:]: # 只带最近2章,避免 context 爆炸(即使是强模型也要省) + prev_title = prev.get('title', 'Unknown') + # 简单提取前章的某些关键点,或者直接告诉模型"接上文" + previous_summaries.append(f"已完成章节《{prev_title}》。") + + llm_payload['globalContext']['previousContext'] = "\n".join(previous_summaries) + llm_payload['globalContext']['instruction'] = "请保持与前文的逻辑连贯性,承接上文风格。" + + user_message = build_chapter_user_prompt(llm_payload) + + # 2. 调用 LLM (带历史记录) + # 注意:目前的 LLMClient.stream_invoke 通常是无状态的单次调用。 + # 要实现多轮,我们需要手动把 history 拼进去,或者 LLMClient 支持 messages 列表。 + # 假设 LLMClient.stream_invoke 接受 str prompt。 + # 对于强模型,我们可以把 history 拼成 text,或者如果底层支持 list[dict],最好传 list。 + # 这里为了兼容性,我们将"历史"作为 text 附在本次 prompt 前面,模拟多轮。 + + # *更优解*:每次生成完,把 content 存入 self.conversation_history + # 下一次 prompt = history + new_user_message + + # 但考虑到 Prompt 长度和 Token 费用,我们在 User Prompt 里包含"前情提要"可能更经济, + # 而不是把整个对话记录发过去。强模型有 128k context,发过去也行。 + # 让我们采用:User Prompt + (Optional) Previous Chapter Content Summary + + # 3. 流式生成 + 监控 + chapter_dir = self.storage.begin_chapter(run_dir, { + "chapterId": section.chapter_id, "slug": section.slug, "title": section.title + }) + + raw_text = self._stream_llm_with_supervisor( + user_message, + chapter_dir, + stream_handler, + section_meta={"chapterId": section.chapter_id, "title": section.title}, + **kwargs + ) + + # 4. 解析与后处理 (复用基类逻辑) + # 强模型虽然强,也需要解析 JSON + try: + chapter_json = self._parse_chapter(raw_text) + self._sanitize_chapter_blocks(chapter_json) + # 补全元数据 + chapter_json['chapterId'] = section.chapter_id + chapter_json['title'] = section.title + + # 落盘 + self.storage.persist_chapter(run_dir, chapter_json, chapter_json, None) + generated_chapters.append(chapter_json) + + # 记录到历史(供下一轮参考) + self.conversation_history.append({"role": "user", "content": user_message}) + self.conversation_history.append({"role": "assistant", "content": raw_text}) + + except Exception as e: + logger.error(f"[FastTrack] 章节 {section.title} 解析失败: {e}") + # 强模型失败了,是否降级? + # 这里简单处理:抛出异常,外层可能会捕获。 + # 或者回退到基类的重试逻辑。 + raise e + + return generated_chapters + + def _stream_llm_with_supervisor( + self, + user_message: str, + chapter_dir: Path, + stream_callback: Optional[Callable[[str, Dict[str, Any]], None]], + section_meta: Dict[str, Any], + **kwargs + ) -> str: + """ + 带监控的流式生成。 + """ + supervisor = StreamSupervisor() + chunks = [] + + # 构造完整的 messages 列表用于强模型 + # 这里假设 LLMClient 能够处理 messages 或者我们手动拼成 prompt string + # 简单起见,我们假设 base_url 对应的 backend 能处理长文本 prompt + + # 如果 self.conversation_history 非空,拼接到 user_message 前面? + # 这是一个简化实现。理想情况下应修改 LLMClient 支持 messages=[] 参数。 + # 这里我们假设 user_message 包含了足够上下文。 + + full_prompt = user_message + if self.conversation_history: + # 简易拼接历史,模拟 coherent context + history_text = "\n\n".join( + [f"{msg['role'].upper()}: {msg['content'][:500]}..." for msg in self.conversation_history]) + full_prompt = f"Previous conversation history:\n{history_text}\n\nCurrent Request:\n{user_message}" + + with self.storage.capture_stream(chapter_dir) as stream_fp: + stream = self.llm_client.stream_invoke( + SYSTEM_PROMPT_CHAPTER_JSON, # 系统提示词 + full_prompt, + temperature=kwargs.get("temperature", 0.2), + top_p=kwargs.get("top_p", 0.95), + ) + + for delta in stream: + # 1. 监控 + if not supervisor.push(delta): + logger.warning("[FastTrack] 监控到生成异常,尝试中断或标记(目前策略:继续但记录警告)") + # 在更高级实现中,这里可以 raise StopGeneration 异常并触发重试 + + # 2. 落盘与回调 + stream_fp.write(delta) + chunks.append(delta) + if stream_callback: + try: + stream_callback(delta, section_meta) + except Exception: + pass + + return "".join(chunks) + diff --git a/ReportEngine/nodes/stream_supervisor.py b/ReportEngine/nodes/stream_supervisor.py new file mode 100644 index 000000000..b5c2e7962 --- /dev/null +++ b/ReportEngine/nodes/stream_supervisor.py @@ -0,0 +1,61 @@ +import re +from typing import List, Optional, Callable +from loguru import logger + + +class StreamSupervisor: + """ + 流式监控器(滑动窗口校验)。 + + 在强模型进行长文本生成时,实时监控输出流: + 1. 维护滑动窗口缓冲; + 2. 检测关键格式错误(如未闭合的 Markdown/JSON); + 3. (可选) 异步调用小模型校验逻辑一致性。 + """ + + def __init__(self, window_size: int = 2000): + self.window_size = window_size + self.buffer = "" + self.total_generated = 0 + self._last_check_pos = 0 + + def push(self, chunk: str) -> bool: + """ + 推入新生成的文本块。 + + Returns: + bool: 如果检测到严重错误建议中断,返回 False;否则返回 True。 + """ + self.buffer += chunk + self.total_generated += len(chunk) + + # 简单的滑动窗口维护:只保留最近 window_size 长度用于正则检查 + if len(self.buffer) > self.window_size * 2: + self.buffer = self.buffer[-self.window_size:] + + # 每积累一定量才检查一次,避免过度消耗 CPU + if len(self.buffer) - self._last_check_pos > 100: + self._last_check_pos = len(self.buffer) + if not self._quick_format_check(self.buffer): + return False + + return True + + def _quick_format_check(self, text: str) -> bool: + """ + 快速格式检查(启发式)。 + """ + # 1. 检查是否存在严重的 JSON 格式断裂(如果正在生成 JSON) + # 这里假设生成的是 Markdown + JSON 混合,主要防范的是 Markdown 结构 + # 例如:代码块标记不匹配 ``` 数量奇偶校验(虽不严谨,但作滑动窗口参考) + # 但流式生成中,代码块未闭合是正常的,所以不能简单数数。 + + # 检查是否出现了大量的重复字符(模型崩溃常见特征) + if len(text) > 200: + last_100 = text[-100:] + if len(set(last_100)) < 5: # 极低熵,可能在重复 "......" + logger.warning("StreamSupervisor: 检测到可能的重复生成循环") + return False + + return True + diff --git a/ReportEngine/utils/model_discriminator.py b/ReportEngine/utils/model_discriminator.py new file mode 100644 index 000000000..389f1c620 --- /dev/null +++ b/ReportEngine/utils/model_discriminator.py @@ -0,0 +1,35 @@ +from typing import Set + + +class ModelCapabilityDiscriminator: + """ + 模型能力判别器。 + + 根据模型名称或动态探测结果,判定当前 LLM 是否具备"强模型"特质(超长上下文、复杂指令遵循)。 + 用于在 ReportAgent 中切换"高速自适应通道"与"安全分治通道"。 + """ + + # 强模型白名单(支持部分匹配) + STRONG_MODEL_KEYWORDS = { + "gpt-4", + "claude-3-opus", + "claude-3-5-sonnet", + "gemini-1.5-pro", + "qwen-max", + "deepseek-chat" # DeepSeek V3/R1 is strong enough + } + + def __init__(self, model_name: str): + self.model_name = model_name.lower() + + def is_strong_model(self) -> bool: + """ + 判断是否为强模型。 + + 目前采用静态规则匹配,未来可扩展动态探测逻辑。 + """ + for keyword in self.STRONG_MODEL_KEYWORDS: + if keyword in self.model_name: + return True + return False +