跳转至

第十三章 Deep Research Agent

⚠️ 时效性说明:本章涉及前沿模型/价格/榜单等信息,可能随版本快速变化;请以论文原文、官方发布页和 API 文档为准。

构建能够自主进行深度研究、信息综合与报告生成的AI智能体

📌 定位说明:本章对标 hello-agents "自动化深度研究智能体"专题。我们将从架构设计到完整实现,构建一个Mini Deep Research Agent——它能将用户的开放式问题分解为子问题,自主在网络上搜索、阅读、推理,最终生成带引用的结构化研究报告。

导航上一章:Agent Memory系统 | 下一章:Generative Agents与仿真 | 目录


📖 本章概览

主题 内容 预计学时
13.1 Deep Research Agent概述 什么是Deep Research,与普通RAG的区别 1小时
13.2 架构设计 核心模块、状态机、数据流 1.5小时
13.3 Research Planner实现 问题分解、研究计划、动态调整 1.5小时
13.4 Web搜索与内容获取 搜索API集成、内容提取、并行优化 1.5小时
13.5 内容阅读与信息提取 长文档处理、关键信息抽取、引用追踪 1.5小时
13.6 知识综合与推理 多源融合、矛盾检测、置信度评估 1小时
13.7 报告生成 结构化报告、带引用输出、多格式导出 1小时
13.8 完整实现:Mini Deep Research Agent 全部模块集成、流式输出、完整可运行代码 3小时
13.9 优化与进阶 搜索策略、预算控制、并行化、人机协作 1小时
13.10 与商业产品对比 OpenAI/Perplexity/Gemini对比分析 0.5小时

目录


1. Deep Research Agent概述

1.1 什么是Deep Research

2024-2025年,多家公司相继推出"Deep Research"功能:

Text Only
Deep Research 产品时间线
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
2024.12  Google Gemini Deep Research (Gemini 2.0 Flash Thinking)
2025.02  OpenAI Deep Research (o3 + 工具调用)
2025.03  Perplexity Deep Research (多步搜索 + 推理)
2025.04  Grok Deep Research (xAI)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Deep Research的核心定义:一个能够自主执行多步骤研究任务的AI Agent——它接收一个开放式问题,自动规划研究路径、搜索信息、阅读文档、交叉验证、最终输出一份带引用的综合研究报告。

1.2 与普通RAG的根本区别

Text Only
普通RAG(单轮被动检索):
  用户提问 → 检索Top-K文档 → 拼接上下文 → LLM生成回答

  特点: 被动的、一次性的、依赖已有知识库
  局限: 无法发现知识库中不存在的信息

Deep Research Agent(多轮主动探索):
  用户提问 → 规划子问题 → 搜索1 → 阅读 → 发现新线索
           → 搜索2 → 深入阅读 → 交叉验证
           → 搜索3 → 补充缺失信息 → 综合推理
           → 生成带引用的研究报告

  特点: 主动的、迭代的、能发现新信息
维度 普通RAG Deep Research Agent
检索方式 单轮向量检索 多轮主动搜索
信息来源 固定知识库 全网实时搜索
推理深度 浅层(拼接后生成) 深层(规划→搜索→阅读→推理→写作)
自主性 无(被动响应) 高(自主决策搜索什么、读什么)
输出形式 简短回答 结构化研究报告
处理时间 秒级 分钟级

1.3 核心能力环

Deep Research Agent的五大核心能力形成一个闭环:

Text Only
         ┌─────────┐
         │  规划    │ ← 用户问题
         │ Planning │
         └────┬────┘
              │ 子问题列表
         ┌─────────┐
         │  搜索    │ ← 互联网/知识库
         │Searching │
         └────┬────┘
              │ 搜索结果
         ┌─────────┐
         │  阅读    │ ← 网页/文档内容
         │ Reading  │
         └────┬────┘
              │ 提取的信息
         ┌─────────┐      ┌─────────┐
         │  推理    │─────→│  写作    │→ 研究报告
         │Reasoning │      │ Writing  │
         └────┬────┘      └─────────┘
              │ 发现新问题?
         返回"规划"阶段(迭代)

2. 架构设计

2.1 整体架构

Text Only
┌──────────────────────────────────────────────────┐
│               Deep Research Agent                 │
│                                                    │
│  ┌──────────────────────────────────────────────┐ │
│  │            Research Orchestrator              │ │
│  │         (状态机 + 循环控制)                    │ │
│  └──┬──────┬──────┬──────┬──────┬───────────────┘ │
│     │      │      │      │      │                  │
│  ┌──▼──┐┌──▼──┐┌──▼──┐┌──▼──┐┌──▼───┐            │
│  │Plan ││Sear ││Read ││Syn  ││Write │            │
│  │ner  ││cher ││er   ││thesi││r     │            │
│  │     ││     ││     ││zer  ││      │            │
│  └─────┘└─────┘└─────┘└─────┘└──────┘            │
│                                                    │
│  ┌──────────────────────────────────────────────┐ │
│  │            Research State Store               │ │
│  │  (研究计划/已搜索URL/提取的信息/引用列表)       │ │
│  └──────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘

2.2 核心数据模型

Python
"""Deep Research Agent — 数据模型定义"""

from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime

class ResearchPhase(Enum):
    """研究阶段状态机"""
    PLANNING = "planning"
    SEARCHING = "searching"
    READING = "reading"
    SYNTHESIZING = "synthesizing"
    WRITING = "writing"
    REVIEWING = "reviewing"
    COMPLETE = "complete"

@dataclass
class SubQuestion:
    """子问题"""
    id: int
    question: str
    priority: int = 1               # 1-5, 5最高
    status: str = "pending"          # pending / searching / answered / failed
    search_queries: list[str] = field(default_factory=list)
    findings: list[str] = field(default_factory=list)
    sources: list[str] = field(default_factory=list)

@dataclass
class SearchResult:
    """搜索结果"""
    url: str
    title: str
    snippet: str
    score: float = 0.0
    fetched: bool = False

@dataclass
class ExtractedInfo:
    """从网页提取的信息"""
    source_url: str
    source_title: str
    content: str
    key_facts: list[str] = field(default_factory=list)
    relevance_score: float = 0.0
    extracted_at: str = field(default_factory=lambda: datetime.now().isoformat())  # lambda包装确保每次实例化取当前时间

@dataclass
class Citation:
    """引用"""
    id: int
    url: str
    title: str
    accessed_at: str
    snippet: str = ""

@dataclass
class ResearchState:
    """研究状态——贯穿整个研究流程的全局状态"""
    query: str
    phase: ResearchPhase = ResearchPhase.PLANNING
    sub_questions: list[SubQuestion] = field(default_factory=list)
    search_results: list[SearchResult] = field(default_factory=list)
    extracted_infos: list[ExtractedInfo] = field(default_factory=list)
    citations: list[Citation] = field(default_factory=list)
    visited_urls: set[str] = field(default_factory=set)
    report: str = ""
    iteration: int = 0
    max_iterations: int = 3
    token_budget: int = 100_000      # Token预算上限
    tokens_used: int = 0

2.3 状态机流转

Text Only
状态转移图:
                        ┌──────────────────────────┐
                        │                          │
  START → PLANNING ──→ SEARCHING ──→ READING ──→ SYNTHESIZING
              ▲                                      │
              │          还有未回答的子问题?           │
              └────────── YES ◄─────────────────────┘
                                                     │ NO
                                                  WRITING ──→ REVIEWING ──→ COMPLETE

3. Research Planner实现

3.1 将用户问题分解为子问题

Research Planner是Deep Research Agent的"大脑"——它将用户的开放式问题拆解为可搜索的具体子问题。

Python
"""Research Planner — 问题分解与计划生成"""

import json
from openai import AsyncOpenAI

client = AsyncOpenAI()

PLANNER_SYSTEM_PROMPT = """你是一位资深的研究规划师。你的任务是将用户的研究问题分解为3-6个具体的子问题。

每个子问题应该:
1. 足够具体,可以通过网络搜索找到答案
2. 覆盖问题的不同方面(定义、现状、技术细节、对比、趋势等)
3. 按逻辑顺序排列(基础→进阶)
4. 包含推荐的搜索查询词

输出JSON格式:
{
    "research_plan": {
        "summary": "研究计划简述",
        "sub_questions": [
            {
                "id": 1,
                "question": "子问题描述",
                "priority": 5,
                "search_queries": ["搜索词1", "搜索词2"]
            }
        ]
    }
}"""

async def create_research_plan(query: str) -> list[SubQuestion]:
    """将用户问题分解为子问题,返回研究计划"""
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": PLANNER_SYSTEM_PROMPT},
            {"role": "user", "content": f"请为以下研究问题制定计划:\n\n{query}"},
        ],
        response_format={"type": "json_object"},
        temperature=0.7,
    )

    plan_data = json.loads(response.choices[0].message.content)
    sub_questions = []
    for sq in plan_data["research_plan"]["sub_questions"]:
        sub_questions.append(SubQuestion(
            id=sq["id"],
            question=sq["question"],
            priority=sq.get("priority", 3),
            search_queries=sq.get("search_queries", [sq["question"]]),
        ))

    return sub_questions

3.2 动态调整研究计划

在研究过程中,Agent可能发现新的重要方向需要探索。Planner需要能够动态更新计划:

Python
REPLAN_PROMPT = """基于当前的研究进展,评估是否需要调整研究计划。

已完成的子问题及其发现:
{completed_findings}

尚未完成的子问题:
{pending_questions}

请判断:
1. 是否有重要方向被遗漏?
2. 已有发现是否暗示了新的值得探索的子问题?
3. 哪些未完成的子问题可以跳过(因为已被其他发现覆盖)?

输出JSON:
{{
    "add_questions": [...],
    "skip_questions": [子问题id列表],
    "reasoning": "调整原因"
}}"""

async def replan(
    state: ResearchState,
) -> tuple[list[SubQuestion], list[int]]:
    """根据已有发现动态调整研究计划

    Returns:
        (new_questions, skip_ids): 新增的子问题和要跳过的子问题ID
    """
    completed = "\n".join(
        f"- Q{sq.id}: {sq.question}\n  发现: {'; '.join(sq.findings[:3])}"
        for sq in state.sub_questions if sq.status == "answered"
    )
    pending = "\n".join(
        f"- Q{sq.id}: {sq.question}"
        for sq in state.sub_questions if sq.status == "pending"
    )

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "你是研究计划调整助手。"},
            {"role": "user", "content": REPLAN_PROMPT.format(
                completed_findings=completed or "暂无",
                pending_questions=pending or "暂无",
            )},
        ],
        response_format={"type": "json_object"},
        temperature=0.5,
    )

    result = json.loads(response.choices[0].message.content)

    new_questions = []
    max_id = max((sq.id for sq in state.sub_questions), default=0)
    for i, q in enumerate(result.get("add_questions", []), start=max_id + 1):
        new_questions.append(SubQuestion(
            id=i,
            question=q if isinstance(q, str) else q.get("question", ""),  # 三元+isinstance:兼容字符串和字典两种输入格式
            priority=3,
            search_queries=[q] if isinstance(q, str) else q.get("search_queries", []),
        ))

    skip_ids = result.get("skip_questions", [])
    return new_questions, skip_ids

4. Web搜索与内容获取

4.1 Tavily搜索集成

Tavily 是专为AI Agent设计的搜索API,返回结构化结果,非常适合Deep Research场景。

Python
"""Web Searcher — 搜索API集成与内容获取"""

import asyncio
import httpx
from tavily import AsyncTavilyClient

class WebSearcher:
    """Web搜索器:支持Tavily和备用的httpx直接搜索"""

    def __init__(self, tavily_api_key: str):
        self.tavily = AsyncTavilyClient(api_key=tavily_api_key)
        self.http_client = httpx.AsyncClient(
            timeout=15.0,
            follow_redirects=True,
            headers={"User-Agent": "DeepResearchAgent/1.0"},
        )

    async def search(
        self,
        query: str,
        max_results: int = 5,
        search_depth: str = "advanced",
    ) -> list[SearchResult]:
        """执行搜索,返回结构化结果

        注意:生产环境应添加速率限制(如 asyncio.Semaphore 或令牌桶),
        避免短时间内发送大量请求导致API配额耗尽或被封禁。
        """
        try:
            response = await self.tavily.search(
                query=query,
                max_results=max_results,
                search_depth=search_depth,     # "basic" 或 "advanced"
                include_raw_content=False,
            )
            results = []
            for item in response.get("results", []):
                results.append(SearchResult(
                    url=item["url"],
                    title=item.get("title", ""),
                    snippet=item.get("content", ""),
                    score=item.get("score", 0.0),
                ))
            return results
        except Exception as e:
            print(f"[搜索错误] {query}: {e}")
            return []

    async def search_multiple(
        self,
        queries: list[str],
        max_results_per_query: int = 3,
    ) -> list[SearchResult]:
        """并行执行多个搜索查询"""
        tasks = [
            self.search(q, max_results=max_results_per_query)
            for q in queries
        ]
        all_results_nested = await asyncio.gather(*tasks)

        # 合并并去重(按URL)
        seen_urls: set[str] = set()
        merged: list[SearchResult] = []
        for results in all_results_nested:
            for r in results:
                if r.url not in seen_urls:
                    seen_urls.add(r.url)
                    merged.append(r)

        # 按相关性排序
        merged.sort(key=lambda x: x.score, reverse=True)  # 合并去重后按分数降序排列
        return merged

    async def close(self):
        await self.http_client.aclose()

4.2 网页内容提取与清洗

Python
"""Content Reader — 网页内容提取与清洗"""

import re
import httpx

async def fetch_page_content(
    url: str,
    client: httpx.AsyncClient | None = None,
    max_chars: int = 15_000,
) -> str | None:
    """获取网页内容并清洗为纯文本

    Args:
        url: 目标URL
        client: 可复用的httpx客户端
        max_chars: 最大字符数(截断保护)
    """
    _client = client or httpx.AsyncClient(timeout=15.0, follow_redirects=True)
    try:
        resp = await _client.get(url)
        resp.raise_for_status()
        html = resp.text
    except Exception as e:
        print(f"[获取失败] {url}: {e}")
        return None
    finally:
        if client is None:
            await _client.aclose()

    return clean_html(html, max_chars)

def clean_html(html: str, max_chars: int = 15_000) -> str:
    """将HTML清洗为纯文本"""
    # 移除script和style标签及其内容(这些不包含有用信息)
    html = re.sub(r"<script[^>]*>.*?</script>", "", html, flags=re.DOTALL | re.IGNORECASE)
    html = re.sub(r"<style[^>]*>.*?</style>", "", html, flags=re.DOTALL | re.IGNORECASE)
    # 移除HTML注释
    html = re.sub(r"<!--.*?-->", "", html, flags=re.DOTALL)
    # 移除所有HTML标签,保留纯文本
    text = re.sub(r"<[^>]+>", " ", html)
    # 解码常见HTML实体(如 &amp; → &)
    text = text.replace("&amp;", "&").replace("&lt;", "<").replace("&gt;", ">")
    text = text.replace("&nbsp;", " ").replace("&quot;", '"')
    # 合并多余空白字符为单个空格
    text = re.sub(r"\s+", " ", text).strip()
    # 截断保护:防止过长文本耗尽Token预算
    if len(text) > max_chars:
        text = text[:max_chars] + "\n\n[内容已截断...]"
    return text

4.3 URL去重与质量过滤

Python
from urllib.parse import urlparse

# 低质量域名黑名单
LOW_QUALITY_DOMAINS = {
    "pinterest.com", "quora.com", "facebook.com",
    "instagram.com", "tiktok.com", "twitter.com",
}

def filter_search_results(
    results: list[SearchResult],
    visited_urls: set[str],
    max_results: int = 8,
) -> list[SearchResult]:
    """过滤低质量和已访问的搜索结果,提升后续阅读效率"""
    filtered = []
    for r in results:
        # 跳过已访问的URL,避免重复处理
        if r.url in visited_urls:
            continue
        # 跳过社交媒体等低信息密度站点
        domain = urlparse(r.url).netloc.replace("www.", "")
        if domain in LOW_QUALITY_DOMAINS:
            continue
        # 跳过二进制文件格式(难以解析为纯文本)
        if r.url.endswith((".pdf", ".ppt", ".pptx", ".doc", ".docx")):
            continue
        filtered.append(r)

    return filtered[:max_results]

5. 内容阅读与信息提取

5.1 长文档分块处理

当网页内容过长时,需要分块让LLM处理:

Python
def chunk_text(text: str, chunk_size: int = 4000, overlap: int = 200) -> list[str]:
    """将长文本分块

    Args:
        text: 原始文本
        chunk_size: 每块字符数
        overlap: 块之间的重叠字符数
    """
    if len(text) <= chunk_size:
        return [text]

    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        # 尝试在句号处断开
        if end < len(text):
            last_period = text.rfind(".", start + chunk_size // 2, end)
            if last_period > start:
                end = last_period + 1
        chunks.append(text[start:end])
        start = end - overlap

    return chunks

5.2 LLM-based关键信息提取

Python
EXTRACT_PROMPT = """你是一位研究助手。请从以下网页内容中提取与研究问题相关的关键信息。

研究问题:{question}

网页内容:
{content}

请输出JSON格式:
{{
    "key_facts": ["事实1", "事实2", ...],
    "relevance_score": 0.0 到 1.0 之间的相关度评分,
    "summary": "50字以内的核心摘要"
}}

注意:只提取与研究问题直接相关的信息,忽略广告、导航等无关内容。"""

async def extract_info_from_content(
    content: str,
    question: str,
    source_url: str,
    source_title: str,
) -> ExtractedInfo:
    """使用LLM从网页内容中提取关键信息"""
    chunks = chunk_text(content, chunk_size=6000)

    all_facts: list[str] = []
    best_relevance = 0.0

    for chunk in chunks[:3]:  # 最多处理3个块(预算控制)
        response = await client.chat.completions.create(
            model="gpt-4o-mini",  # 使用较小模型降低成本
            messages=[
                {"role": "system", "content": "你是一位精确的信息提取助手。"},
                {"role": "user", "content": EXTRACT_PROMPT.format(
                    question=question,
                    content=chunk,
                )},
            ],
            response_format={"type": "json_object"},
            temperature=0.2,
        )

        result = json.loads(response.choices[0].message.content)
        all_facts.extend(result.get("key_facts", []))
        relevance = result.get("relevance_score", 0.0)
        best_relevance = max(best_relevance, relevance)

    return ExtractedInfo(
        source_url=source_url,
        source_title=source_title,
        content=content[:2000],  # 保留前2000字符作为参考
        key_facts=all_facts,
        relevance_score=best_relevance,
    )

5.3 事实核查与交叉验证

Python
FACT_CHECK_PROMPT = """你是一位事实核查专家。请对以下声明进行交叉验证。

声明:{claim}

来源1的说法:{source1}
来源2的说法:{source2}

请判断:
1. 两个来源是否一致?
2. 如果不一致,哪个更可信?为什么?
3. 置信度评分(0-1)

输出JSON:
{{
    "consistent": true/false,
    "confidence": 0.0-1.0,
    "reasoning": "判断理由",
    "verified_fact": "经核查后的事实陈述"
}}"""

async def cross_validate(
    claim: str,
    sources: list[ExtractedInfo],
) -> dict:
    """交叉验证一个事实声明"""
    if len(sources) < 2:
        return {"consistent": True, "confidence": 0.5, "verified_fact": claim}

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "你是事实核查专家。"},
            {"role": "user", "content": FACT_CHECK_PROMPT.format(
                claim=claim,
                source1=sources[0].key_facts[:3],
                source2=sources[1].key_facts[:3],
            )},
        ],
        response_format={"type": "json_object"},
        temperature=0.2,
    )
    return json.loads(response.choices[0].message.content)

5.4 引用追踪

Python
class CitationTracker:
    """引用追踪器——管理所有引用来源"""

    def __init__(self):
        self.citations: list[Citation] = []
        self._url_to_id: dict[str, int] = {}

    def add_citation(self, url: str, title: str, snippet: str = "") -> int:
        """添加引用,返回引用编号"""
        if url in self._url_to_id:
            return self._url_to_id[url]

        cid = len(self.citations) + 1
        self.citations.append(Citation(
            id=cid,
            url=url,
            title=title,
            accessed_at=datetime.now().strftime("%Y-%m-%d"),
            snippet=snippet,
        ))
        self._url_to_id[url] = cid
        return cid

    def format_reference_list(self) -> str:
        """生成引用列表(Markdown格式)"""
        lines = ["## 参考资料\n"]
        for c in self.citations:
            lines.append(f"[{c.id}] [{c.title}]({c.url}) — 访问日期: {c.accessed_at}")
        return "\n".join(lines)

    def inline_cite(self, citation_id: int) -> str:
        """生成行内引用标记"""
        return f"[{citation_id}]"

6. 知识综合与推理

6.1 多源信息融合

Python
SYNTHESIZE_PROMPT = """你是一位研究分析师。请综合以下多个来源的信息,回答研究子问题。

子问题:{question}

来源信息:
{sources_text}

要求:
1. 综合所有来源的关键发现
2. 标注每个关键事实的来源编号(如 [1], [2])
3. 如果来源之间存在矛盾,请明确指出并给出你的判断
4. 给出对该子问题的综合回答

输出JSON:
{{
    "answer": "综合回答(带引用标注)",
    "key_findings": ["发现1 [来源编号]", "发现2 [来源编号]"],
    "contradictions": ["来源A说...,但来源B说..."],
    "confidence": 0.0-1.0,
    "gaps": ["尚未覆盖的方面"]
}}"""

async def synthesize_for_question(
    question: str,
    infos: list[ExtractedInfo],
    citation_tracker: CitationTracker,
) -> dict:
    """为一个子问题综合多源信息"""
    # 构建来源文本,同时注册引用
    sources_parts = []
    for info in infos:
        cid = citation_tracker.add_citation(
            url=info.source_url,
            title=info.source_title,
            snippet="; ".join(info.key_facts[:2]),  # 取前两条关键事实作为引用摘要
        )
        facts_text = "\n".join(f"  - {f}" for f in info.key_facts)
        sources_parts.append(
            f"来源[{cid}] ({info.source_title}):\n{facts_text}"
        )

    sources_text = "\n\n".join(sources_parts)

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "你是一位严谨的研究分析师。"},
            {"role": "user", "content": SYNTHESIZE_PROMPT.format(
                question=question,
                sources_text=sources_text,
            )},
        ],
        response_format={"type": "json_object"},
        temperature=0.3,
    )
    return json.loads(response.choices[0].message.content)

6.2 矛盾检测与解决

Python
async def detect_contradictions(
    findings: list[dict],
) -> list[dict]:
    """检测不同子问题的发现之间的矛盾"""
    all_facts = []
    for f in findings:
        all_facts.extend(f.get("key_findings", []))

    if len(all_facts) < 2:
        return []

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "你是矛盾检测专家。"},
            {"role": "user", "content": f"""请检查以下研究发现之间是否存在矛盾:

{chr(10).join(f'- {fact}' for fact in all_facts)}

输出JSON:
{{
    "contradictions": [
        {{
            "fact_a": "事实A",
            "fact_b": "与之矛盾的事实B",
            "resolution": "建议的解决方式",
            "keep": "A 或 B 或 both"
        }}
    ]
}}"""},
        ],
        response_format={"type": "json_object"},
        temperature=0.2,
    )
    result = json.loads(response.choices[0].message.content)
    return result.get("contradictions", [])

6.3 置信度评估

Python
def compute_overall_confidence(findings: list[dict]) -> float:
    """计算研究的总体置信度

    综合考虑三个因素:
    - 各子问题的置信度均值(基础分)
    - 矛盾数量(扣分项:矛盾越多置信度越低)
    - 来源数量(加分项:来源越多越可靠)
    """
    if not findings:
        return 0.0

    # 基础分:各子问题置信度的算术平均
    confidences = [f.get("confidence", 0.5) for f in findings]
    avg_confidence = sum(confidences) / len(confidences)

    # 矛盾惩罚:每个矛盾扣分0.05,最多扣0.2
    total_contradictions = sum(
        len(f.get("contradictions", [])) for f in findings
    )
    contradiction_penalty = min(total_contradictions * 0.05, 0.2)

    # 来源加分:来源越多交叉验证越充分,最多加0.1
    total_sources = sum(
        len(f.get("key_findings", [])) for f in findings
    )
    source_bonus = min(total_sources * 0.01, 0.1)

    # 结果限制在[0, 1]范围内
    return max(0.0, min(1.0, avg_confidence - contradiction_penalty + source_bonus))

7. 报告生成

7.1 结构化报告模板

Python
REPORT_PROMPT = """你是一位专业的研究报告撰写人。请基于以下研究发现,撰写一份结构化的研究报告。

原始研究问题:{query}

各子问题的研究发现:
{findings_text}

矛盾与冲突:
{contradictions_text}

总体置信度:{confidence:.0%}

要求:
1. 使用Markdown格式
2. 包含以下结构:
   - 摘要(Executive Summary):3-5句话概述核心发现
   - 详细分析:按主题组织,非按子问题组织
   - 关键发现:列表形式
   - 局限性与不确定性
   - 结论
3. 所有关键事实必须标注引用来源(如 [1], [2])
4. 语言专业、客观、简洁
5. 如果存在矛盾信息,在报告中明确说明

请直接输出Markdown格式的报告内容(不要包裹在JSON中)。"""

async def generate_report(
    query: str,
    findings: list[dict],
    contradictions: list[dict],
    confidence: float,
    citation_tracker: CitationTracker,
) -> str:
    """生成结构化研究报告"""
    findings_text = ""
    for i, f in enumerate(findings, 1):
        findings_text += f"\n### 子问题 {i}\n"
        findings_text += f"回答: {f.get('answer', 'N/A')}\n"
        findings_text += "关键发现:\n"
        for kf in f.get("key_findings", []):
            findings_text += f"  - {kf}\n"

    contradictions_text = "无明显矛盾" if not contradictions else "\n".join(
        f"- {c['fact_a']} vs {c['fact_b']}(建议: {c['resolution']})"
        for c in contradictions
    )

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "你是一位专业研究报告撰写人。"},
            {"role": "user", "content": REPORT_PROMPT.format(
                query=query,
                findings_text=findings_text,
                contradictions_text=contradictions_text,
                confidence=confidence,
            )},
        ],
        temperature=0.4,
        max_tokens=4000,
    )

    report = response.choices[0].message.content
    # 追加引用列表
    report += "\n\n---\n\n" + citation_tracker.format_reference_list()
    return report

7.2 多格式输出

Python
import json as json_lib

def export_report(report: str, format: str = "markdown") -> str:
    """将报告导出为不同格式

    Args:
        report: Markdown格式的报告内容
        format: 输出格式 - "markdown", "html", "json"
    """
    if format == "markdown":
        return report

    elif format == "html":
        # 简易Markdown→HTML转换(生产环境建议用markdown库)
        html = report
        # 标题转换:### → <h3>, ## → <h2>, # → <h1>
        html = re.sub(r"^### (.+)$", r"<h3>\1</h3>", html, flags=re.MULTILINE)
        html = re.sub(r"^## (.+)$", r"<h2>\1</h2>", html, flags=re.MULTILINE)
        html = re.sub(r"^# (.+)$", r"<h1>\1</h1>", html, flags=re.MULTILINE)
        # 粗体和斜体:**text** → <strong>, *text* → <em>
        html = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", html)
        html = re.sub(r"\*(.+?)\*", r"<em>\1</em>", html)
        # 列表项:- text → <li>text</li>
        html = re.sub(r"^- (.+)$", r"<li>\1</li>", html, flags=re.MULTILINE)
        # 段落分隔:双换行转为段落标签
        html = re.sub(r"\n\n", r"</p><p>", html)
        return f"<html><body><p>{html}</p></body></html>"

    elif format == "json":
        # 将报告按##标题拆分为结构化JSON
        sections = re.split(r"^## ", report, flags=re.MULTILINE)
        result = {"title": sections[0].strip() if sections else "", "sections": []}
        for s in sections[1:]:
            lines = s.strip().split("\n", 1)
            result["sections"].append({
                "heading": lines[0].strip(),
                "content": lines[1].strip() if len(lines) > 1 else "",
            })
        return json_lib.dumps(result, ensure_ascii=False, indent=2)

    else:
        raise ValueError(f"不支持的格式: {format}")

8. 完整实现:Mini Deep Research Agent

以下是集成所有模块的完整可运行Agent实现。

8.1 依赖安装

Bash
pip install openai tavily-python httpx

8.2 完整Agent代码

Python
"""
Mini Deep Research Agent — 完整实现
====================================
一个能够自主规划、搜索、阅读、推理、撰写研究报告的Agent。

依赖:pip install openai tavily-python httpx
环境变量:OPENAI_API_KEY, TAVILY_API_KEY
"""

from __future__ import annotations

import asyncio
import json
import os
import re
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from urllib.parse import urlparse

import httpx
from openai import AsyncOpenAI
from tavily import AsyncTavilyClient

# ─────────────────────────── 数据模型 ───────────────────────────

class ResearchPhase(Enum):
    PLANNING = "planning"
    SEARCHING = "searching"
    READING = "reading"
    SYNTHESIZING = "synthesizing"
    WRITING = "writing"
    COMPLETE = "complete"

@dataclass
class SubQuestion:
    id: int
    question: str
    priority: int = 3
    status: str = "pending"
    search_queries: list[str] = field(default_factory=list)
    findings: list[str] = field(default_factory=list)
    sources: list[str] = field(default_factory=list)

@dataclass
class SearchResult:
    url: str
    title: str
    snippet: str
    score: float = 0.0

@dataclass
class ExtractedInfo:
    source_url: str
    source_title: str
    key_facts: list[str] = field(default_factory=list)
    relevance_score: float = 0.0

@dataclass
class Citation:
    id: int
    url: str
    title: str
    accessed_at: str

@dataclass
class ResearchState:
    query: str
    phase: ResearchPhase = ResearchPhase.PLANNING
    sub_questions: list[SubQuestion] = field(default_factory=list)
    extracted_infos: list[ExtractedInfo] = field(default_factory=list)
    findings: list[dict] = field(default_factory=list)
    visited_urls: set[str] = field(default_factory=set)
    report: str = ""
    iteration: int = 0
    max_iterations: int = 3

# ─────────────────────────── 引用追踪 ───────────────────────────

class CitationTracker:
    def __init__(self):
        self.citations: list[Citation] = []
        self._url_map: dict[str, int] = {}

    def add(self, url: str, title: str) -> int:
        if url in self._url_map:
            return self._url_map[url]
        cid = len(self.citations) + 1
        self.citations.append(Citation(
            id=cid, url=url, title=title,
            accessed_at=datetime.now().strftime("%Y-%m-%d"),
        ))
        self._url_map[url] = cid
        return cid

    def format_references(self) -> str:
        lines = ["\n## 参考资料\n"]
        for c in self.citations:
            lines.append(f"[{c.id}] [{c.title}]({c.url})")
        return "\n".join(lines)

# ─────────────────────────── 工具函数 ───────────────────────────

LOW_QUALITY_DOMAINS = {"pinterest.com", "quora.com", "facebook.com", "tiktok.com"}

def clean_html(html: str, max_chars: int = 12_000) -> str:
    html = re.sub(r"<script[^>]*>.*?</script>", "", html, flags=re.DOTALL | re.I)
    html = re.sub(r"<style[^>]*>.*?</style>", "", html, flags=re.DOTALL | re.I)
    text = re.sub(r"<[^>]+>", " ", html)
    text = text.replace("&amp;", "&").replace("&nbsp;", " ")
    text = re.sub(r"\s+", " ", text).strip()
    return text[:max_chars]

def chunk_text(text: str, size: int = 5000, overlap: int = 200) -> list[str]:
    if len(text) <= size:
        return [text]
    chunks, start = [], 0
    while start < len(text):
        end = min(start + size, len(text))
        chunks.append(text[start:end])
        start = end - overlap
    return chunks

# ─────────────────────────── 核心Agent ──────────────────────────

class DeepResearchAgent:
    """Mini Deep Research Agent"""

    def __init__(
        self,
        openai_api_key: str | None = None,
        tavily_api_key: str | None = None,
        model: str = "gpt-4o",
        fast_model: str = "gpt-4o-mini",
        max_iterations: int = 3,
    ):
        self.llm = AsyncOpenAI(api_key=openai_api_key)
        self.tavily = AsyncTavilyClient(
            api_key=tavily_api_key or os.getenv("TAVILY_API_KEY", ""),
        )
        self.http = httpx.AsyncClient(timeout=15, follow_redirects=True)
        self.model = model
        self.fast_model = fast_model
        self.max_iterations = max_iterations
        self.citations = CitationTracker()

    # ────────── 1. 规划 ──────────

    async def plan(self, query: str) -> list[SubQuestion]:
        self._log("📋 规划研究方案...")
        resp = await self.llm.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": (
                    "你是研究规划师。将用户问题分解为3-5个可搜索的子问题。"
                    "输出JSON: {\"sub_questions\": [{\"id\": 1, \"question\": \"...\", "
                    "\"priority\": 1-5, \"search_queries\": [\"...\"]}]}"
                )},
                {"role": "user", "content": query},
            ],
            response_format={"type": "json_object"},
            temperature=0.7,
        )
        data = json.loads(resp.choices[0].message.content)
        sqs = []
        for sq in data["sub_questions"]:
            sqs.append(SubQuestion(
                id=sq["id"],
                question=sq["question"],
                priority=sq.get("priority", 3),
                search_queries=sq.get("search_queries", [sq["question"]]),
            ))
        self._log(f"   → 生成 {len(sqs)} 个子问题")
        return sqs

    # ────────── 2. 搜索 ──────────

    async def search(
        self,
        queries: list[str],
        visited: set[str],
        per_query: int = 3,
    ) -> list[SearchResult]:
        self._log(f"🔍 并行搜索 {len(queries)} 个查询...")
        tasks = [
            self.tavily.search(q, max_results=per_query, search_depth="advanced")
            for q in queries
        ]
        raw_results = await asyncio.gather(*tasks, return_exceptions=True)

        seen: set[str] = set()
        results: list[SearchResult] = []
        for raw in raw_results:
            if isinstance(raw, Exception):  # gather(return_exceptions=True)会将异常作为结果返回,此处跳过
                continue
            for item in raw.get("results", []):
                url = item["url"]
                domain = urlparse(url).netloc.replace("www.", "")
                if (url in visited or url in seen
                        or domain in LOW_QUALITY_DOMAINS):
                    continue
                seen.add(url)
                results.append(SearchResult(
                    url=url,
                    title=item.get("title", ""),
                    snippet=item.get("content", ""),
                    score=item.get("score", 0.0),
                ))
        results.sort(key=lambda r: r.score, reverse=True)
        self._log(f"   → 获得 {len(results)} 个唯一结果")
        return results[:10]

    # ────────── 3. 阅读 ──────────

    async def read_and_extract(
        self,
        result: SearchResult,
        question: str,
    ) -> ExtractedInfo | None:
        try:
            resp = await self.http.get(result.url)
            resp.raise_for_status()
            text = clean_html(resp.text)
        except Exception:
            return None

        if len(text) < 100:
            return None

        chunks = chunk_text(text)
        all_facts: list[str] = []
        best_rel = 0.0

        for chunk in chunks[:2]:
            llm_resp = await self.llm.chat.completions.create(
                model=self.fast_model,
                messages=[
                    {"role": "system", "content": "从网页文本中提取与问题相关的关键事实。"
                     "输出JSON: {\"facts\": [\"...\"], \"relevance\": 0.0-1.0}"},
                    {"role": "user", "content": (
                        f"问题: {question}\n\n网页内容:\n{chunk}"
                    )},
                ],
                response_format={"type": "json_object"},
                temperature=0.2,
            )
            data = json.loads(llm_resp.choices[0].message.content)
            all_facts.extend(data.get("facts", []))
            best_rel = max(best_rel, data.get("relevance", 0.0))

        if not all_facts:
            return None

        return ExtractedInfo(
            source_url=result.url,
            source_title=result.title,
            key_facts=all_facts,
            relevance_score=best_rel,
        )

    async def read_multiple(
        self,
        results: list[SearchResult],
        question: str,
        max_pages: int = 4,
    ) -> list[ExtractedInfo]:
        self._log(f"📖 阅读 {min(len(results), max_pages)} 个网页...")
        tasks = [
            self.read_and_extract(r, question)
            for r in results[:max_pages]
        ]
        infos_raw = await asyncio.gather(*tasks)
        infos = [i for i in infos_raw if i is not None]  # 过滤提取失败(返回None)的结果
        self._log(f"   → 成功提取 {len(infos)} 个来源的信息")
        return infos

    # ────────── 4. 综合 ──────────

    async def synthesize(
        self,
        question: str,
        infos: list[ExtractedInfo],
    ) -> dict:
        sources_text = ""
        for info in infos:
            cid = self.citations.add(info.source_url, info.source_title)
            facts = "\n".join(f"  - {f}" for f in info.key_facts)
            sources_text += f"\n来源[{cid}] ({info.source_title}):\n{facts}\n"

        resp = await self.llm.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": (
                    "综合多个来源回答研究子问题。标注引用来源编号。"
                    "输出JSON: {\"answer\": \"...\", \"key_findings\": [\"...\"], "
                    "\"confidence\": 0.0-1.0, \"gaps\": [\"...\"]}"
                )},
                {"role": "user", "content": f"子问题: {question}\n\n{sources_text}"},
            ],
            response_format={"type": "json_object"},
            temperature=0.3,
        )
        return json.loads(resp.choices[0].message.content)

    # ────────── 5. 报告 ──────────

    async def write_report(self, query: str, findings: list[dict]) -> str:
        self._log("📝 撰写研究报告...")
        findings_text = ""
        for i, f in enumerate(findings, 1):
            findings_text += f"\n### 子问题 {i}\n"
            findings_text += f"回答: {f.get('answer', 'N/A')}\n"
            for kf in f.get("key_findings", []):
                findings_text += f"  - {kf}\n"

        resp = await self.llm.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": (
                    "你是专业研究报告撰写人。基于研究发现撰写Markdown格式报告。"
                    "结构:摘要→详细分析→关键发现→局限性→结论。"
                    "所有关键事实标注引用编号 [1], [2] 等。"
                )},
                {"role": "user", "content": (
                    f"研究问题:{query}\n\n研究发现:\n{findings_text}"
                )},
            ],
            temperature=0.4,
            max_tokens=4000,
        )
        report = resp.choices[0].message.content
        report += "\n\n---\n" + self.citations.format_references()
        return report

    # ────────── 主循环 ──────────

    async def research(self, query: str) -> str:
        """执行完整的深度研究流程,返回研究报告"""
        self._log(f"🚀 开始研究: {query}\n")
        state = ResearchState(query=query, max_iterations=self.max_iterations)

        # Phase 1: 规划
        state.sub_questions = await self.plan(query)
        state.phase = ResearchPhase.SEARCHING

        # Phase 2-4: 对每个子问题执行 搜索→阅读→综合
        for sq in state.sub_questions:
            self._log(f"\n── 子问题 {sq.id}: {sq.question}")
            sq.status = "searching"

            # 搜索
            results = await self.search(
                sq.search_queries, state.visited_urls,
            )
            for r in results:
                state.visited_urls.add(r.url)

            # 阅读
            infos = await self.read_multiple(results, sq.question)
            state.extracted_infos.extend(infos)

            # 综合
            if infos:
                self._log("🧠 综合分析...")
                finding = await self.synthesize(sq.question, infos)
                state.findings.append(finding)
                sq.findings = finding.get("key_findings", [])
                sq.status = "answered"
                self._log(f"   → 置信度: {finding.get('confidence', 0):.0%}")
            else:
                sq.status = "failed"
                self._log("   → 未能获取有效信息")

        # Phase 5: 撰写报告
        state.phase = ResearchPhase.WRITING
        state.report = await self.write_report(query, state.findings)
        state.phase = ResearchPhase.COMPLETE

        self._log("\n✅ 研究完成!")
        return state.report

    async def close(self):
        await self.http.aclose()

    @staticmethod
    def _log(msg: str):
        print(msg)

# ─────────────────────────── 运行入口 ───────────────────────────

async def main():
    agent = DeepResearchAgent(
        max_iterations=2,
    )
    try:
        report = await agent.research(
            "2025年大语言模型在代码生成领域的最新进展和主要挑战是什么?"
        )
        print("\n" + "=" * 60)
        print(report)

        # 保存报告
        with open("research_report.md", "w", encoding="utf-8") as f:
            f.write(report)
        print("\n📄 报告已保存至 research_report.md")
    finally:
        await agent.close()

if __name__ == "__main__":
    asyncio.run(main())

8.3 运行示例

Text Only
🚀 开始研究: 2025年大语言模型在代码生成领域的最新进展和主要挑战是什么?

📋 规划研究方案...
   → 生成 4 个子问题

── 子问题 1: 2025年代码生成领域有哪些主要的大语言模型?
🔍 并行搜索 2 个查询...
   → 获得 8 个唯一结果
📖 阅读 4 个网页...
   → 成功提取 3 个来源的信息
🧠 综合分析...
   → 置信度: 85%

── 子问题 2: 这些模型在代码生成任务上的性能基准和评测结果如何?
🔍 并行搜索 2 个查询...
   → 获得 6 个唯一结果
📖 阅读 4 个网页...
   → 成功提取 4 个来源的信息
🧠 综合分析...
   → 置信度: 80%

── 子问题 3: 当前代码生成面临的主要技术挑战是什么?
...

── 子问题 4: 代码生成技术的未来发展趋势是什么?
...

📝 撰写研究报告...

✅ 研究完成!

8.4 支持流式输出研究进度

Python
import asyncio
from collections.abc import AsyncIterator  # 异步迭代器抽象基类,用于类型注解async def中包含yield的异步生成器

class StreamingResearchAgent(DeepResearchAgent):
    """支持流式输出进度的研究Agent"""

    async def research_stream(
        self,
        query: str,
    ) -> AsyncIterator[dict]:
        """流式执行研究,通过yield返回进度事件"""
        yield {"event": "start", "query": query}

        # 规划
        yield {"event": "phase", "phase": "planning"}
        sub_questions = await self.plan(query)
        yield {
            "event": "plan_ready",
            "sub_questions": [
                {"id": sq.id, "question": sq.question}
                for sq in sub_questions
            ],
        }

        state = ResearchState(query=query)
        state.sub_questions = sub_questions
        findings: list[dict] = []

        for sq in sub_questions:
            yield {"event": "sub_question_start", "id": sq.id, "question": sq.question}

            # 搜索
            yield {"event": "searching", "queries": sq.search_queries}
            results = await self.search(sq.search_queries, state.visited_urls)
            for r in results:
                state.visited_urls.add(r.url)
            yield {"event": "search_done", "count": len(results)}

            # 阅读
            yield {"event": "reading", "count": min(len(results), 4)}
            infos = await self.read_multiple(results, sq.question)
            yield {"event": "read_done", "extracted": len(infos)}

            # 综合
            if infos:
                yield {"event": "synthesizing"}
                finding = await self.synthesize(sq.question, infos)
                findings.append(finding)
                yield {
                    "event": "synthesized",
                    "confidence": finding.get("confidence", 0),
                    "key_findings": finding.get("key_findings", [])[:3],
                }

        # 写报告
        yield {"event": "phase", "phase": "writing"}
        report = await self.write_report(query, findings)
        yield {"event": "complete", "report": report}

# 使用示例
async def stream_demo():
    agent = StreamingResearchAgent()
    try:
        async for event in agent.research_stream("AI Agent的最新发展"):
            if event["event"] == "phase":
                print(f"\n📎 阶段: {event['phase']}")
            elif event["event"] == "sub_question_start":
                print(f"\n🔎 子问题 {event['id']}: {event['question']}")
            elif event["event"] == "search_done":
                print(f"   搜索到 {event['count']} 个结果")
            elif event["event"] == "synthesized":
                print(f"   置信度: {event['confidence']:.0%}")
            elif event["event"] == "complete":
                print(f"\n📄 报告长度: {len(event['report'])} 字符")
    finally:
        await agent.close()

9. 优化与进阶

9.1 搜索策略对比

Text Only
三种搜索策略:

BFS(广度优先):
  Q1 → [搜索1a, 搜索1b, 搜索1c]
  Q2 → [搜索2a, 搜索2b, 搜索2c]
  Q3 → [搜索3a, 搜索3b, 搜索3c]
  → 然后统一阅读和综合
  优点: 覆盖面广,并行度高
  缺点: 可能浪费预算在低价值方向

DFS(深度优先):
  Q1 → 搜索 → 阅读 → 发现新线索 → 深入搜索 → ...
  然后 Q2 → ...
  优点: 可以深入挖掘
  缺点: 串行执行,速度慢

混合策略(推荐):
  Round 1: BFS对所有子问题进行初步搜索
  Round 2: 根据Round 1的发现,DFS深入最有价值的方向
  Round 3: 补充搜索覆盖信息空白
Python
async def hybrid_search_strategy(
    agent: DeepResearchAgent,
    sub_questions: list[SubQuestion],
    state: ResearchState,
) -> list[dict]:
    """混合搜索策略实现"""
    all_findings: list[dict] = []

    # Round 1: BFS — 广度优先,快速了解全貌
    round1_tasks = []
    for sq in sub_questions:
        round1_tasks.append(
            agent.search(sq.search_queries[:1], state.visited_urls, per_query=3)
        )
    round1_results = await asyncio.gather(*round1_tasks)

    for sq, results in zip(sub_questions, round1_results):  # zip将子问题与对应搜索结果一一配对处理
        for r in results:
            state.visited_urls.add(r.url)
        infos = await agent.read_multiple(results, sq.question, max_pages=2)
        if infos:
            finding = await agent.synthesize(sq.question, infos)
            all_findings.append(finding)
            sq.status = "answered"
            sq.findings = finding.get("key_findings", [])

    # Round 2: DFS — 对置信度低的子问题深入搜索
    # enumerate+zip+元组解包:同时获取索引i和配对的(子问题, 发现)
    for i, (sq, finding) in enumerate(zip(sub_questions, all_findings)):
        confidence = finding.get("confidence", 0)
        gaps = finding.get("gaps", [])
        if confidence < 0.6 and gaps:
            # 针对信息空白进行深入搜索
            deep_queries = gaps[:2]
            deep_results = await agent.search(deep_queries, state.visited_urls)
            for r in deep_results:
                state.visited_urls.add(r.url)
            deep_infos = await agent.read_multiple(deep_results, sq.question)
            if deep_infos:
                # 合并新旧信息重新综合
                combined_infos = state.extracted_infos + deep_infos
                relevant = [
                    info for info in combined_infos
                    if info.relevance_score > 0.3
                ]
                all_findings[i] = await agent.synthesize(sq.question, relevant[-6:])

    return all_findings

9.2 预算控制

Python
@dataclass
class ResearchBudget:
    """研究预算管理"""
    max_llm_calls: int = 30
    max_search_queries: int = 15
    max_pages_to_read: int = 20
    max_tokens: int = 150_000

    # 当前消耗
    llm_calls_used: int = 0
    search_queries_used: int = 0
    pages_read: int = 0
    tokens_used: int = 0

    @property
    def remaining_llm_calls(self) -> int:
        return max(0, self.max_llm_calls - self.llm_calls_used)

    @property
    def remaining_searches(self) -> int:
        return max(0, self.max_search_queries - self.search_queries_used)

    def can_search(self) -> bool:
        return self.search_queries_used < self.max_search_queries

    def can_read(self) -> bool:
        return self.pages_read < self.max_pages_to_read

    def can_call_llm(self) -> bool:
        return self.llm_calls_used < self.max_llm_calls

    def record_llm_call(self, tokens: int = 0):
        self.llm_calls_used += 1
        self.tokens_used += tokens

    def record_search(self, count: int = 1):
        self.search_queries_used += count

    def record_page_read(self, count: int = 1):
        self.pages_read += count

    def summary(self) -> str:
        return (
            f"预算使用: LLM {self.llm_calls_used}/{self.max_llm_calls}, "
            f"搜索 {self.search_queries_used}/{self.max_search_queries}, "
            f"阅读 {self.pages_read}/{self.max_pages_to_read}, "
            f"Token {self.tokens_used:,}/{self.max_tokens:,}"
        )

9.3 人机协作模式

Python
class HumanInTheLoopResearchAgent(DeepResearchAgent):
    """人机协作的研究Agent——在关键决策点征求人类意见"""

    async def research_with_human(self, query: str) -> str:
        state = ResearchState(query=query)

        # 1. 规划 — 让人类确认研究计划
        sub_questions = await self.plan(query)
        print("\n📋 研究计划:")
        for sq in sub_questions:
            print(f"  {sq.id}. {sq.question} (优先级: {sq.priority})")

        user_input = input("\n请确认计划 (回车确认 / 输入修改意见): ").strip()
        if user_input:
            # 根据人类反馈修改计划
            sub_questions = await self._adjust_plan(sub_questions, user_input)

        state.sub_questions = sub_questions

        # 2. 执行搜索 & 阅读
        for sq in state.sub_questions:
            results = await self.search(sq.search_queries, state.visited_urls)
            for r in results:
                state.visited_urls.add(r.url)
            infos = await self.read_multiple(results, sq.question)

            if infos:
                finding = await self.synthesize(sq.question, infos)
                state.findings.append(finding)
                sq.status = "answered"

        # 3. 报告前确认
        print("\n📊 研究发现摘要:")
        for i, f in enumerate(state.findings, 1):
            print(f"  {i}. 置信度 {f.get('confidence', 0):.0%}")
            for kf in f.get("key_findings", [])[:2]:
                print(f"     - {kf}")

        user_input = input("\n是否需要补充搜索?(回车继续 / 输入补充方向): ").strip()
        if user_input:
            # 额外搜索
            extra_results = await self.search([user_input], state.visited_urls)
            infos = await self.read_multiple(extra_results, user_input)
            if infos:
                finding = await self.synthesize(user_input, infos)
                state.findings.append(finding)

        # 4. 生成报告
        report = await self.write_report(query, state.findings)
        return report

    async def _adjust_plan(
        self,
        original: list[SubQuestion],
        feedback: str,
    ) -> list[SubQuestion]:
        resp = await self.llm.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": (
                    "根据用户反馈调整研究计划。输出JSON: "
                    "{\"sub_questions\": [{\"id\": 1, \"question\": \"...\", "
                    "\"search_queries\": [\"...\"]}]}"
                )},
                {"role": "user", "content": (
                    f"原计划:\n"
                    + "\n".join(f"{sq.id}. {sq.question}" for sq in original)
                    + f"\n\n用户反馈: {feedback}"
                )},
            ],
            response_format={"type": "json_object"},
        )
        data = json.loads(resp.choices[0].message.content)
        return [
            SubQuestion(
                id=sq["id"], question=sq["question"],
                search_queries=sq.get("search_queries", []),
            )
            for sq in data["sub_questions"]
        ]

10. 与商业产品对比

10.1 主要商业产品

特性 OpenAI Deep Research Gemini Deep Research Perplexity Pro 我们的Mini实现
底层模型 o3 + 工具调用 Gemini 2.0 Flash Thinking 自研推理模型 GPT-4o
搜索能力 内置Web浏览器 Google搜索直接集成 自建搜索索引 Tavily API
研究深度 5-30分钟深度研究 多步搜索+思考 多轮推理搜索 1-5分钟
报告质量 学术级长报告 结构化报告 带引用的详细回答 Markdown报告
推理能力 o3级深度推理 Flash Thinking 中等 GPT-4o级别
成本 $200/月(Pro) $20/月(AI Premium) $20/月(Pro) API按量计费
可定制性 完全可定制
数据隐私 数据经过OpenAI 数据经过Google 数据经过Perplexity 可私有化部署

10.2 我们的优势与不足

Text Only
✅ 优势:
  - 完全可控:搜索策略、模型选择、输出格式都可自定义
  - 可私有化部署:敏感数据不外传
  - 可集成:嵌入到现有系统/工作流
  - 可扩展:自建知识库、接入内部API
  - 成本灵活:按需使用、可控预算

❌ 不足:
  - 搜索质量:Tavily < Google/Bing原生搜索
  - 推理深度:GPT-4o < o3专用推理模型
  - 网页解析:简单正则 < 专业渲染引擎
  - 工程成熟度:原型 vs 生产级系统

10.3 弥合差距的方向

  1. 搜索增强:接入多个搜索引擎(Tavily + SerpAPI + Brave),交叉取结果
  2. 推理增强:使用o3-mini / Claude 3.5 Sonnet作为推理模型
  3. 内容提取增强:集成trafilatura或Jina Reader API进行更精确的网页解析
  4. 迭代深化:增加迭代轮数,支持"追问"式深入搜索
  5. 评估体系:建立自动化评估流程,持续衡量报告质量

11. 练习题

练习1:增加Brave搜索后端(基础)

WebSearcher添加Brave Search API作为备选搜索后端。当Tavily配额用尽时自动切换。

提示:

Python
# Brave Search API
# https://api.search.brave.com/res/v1/web/search?q=...
async def search_brave(query: str, api_key: str) -> list[SearchResult]:
    ...

练习2:实现记忆式研究(中级)

让Agent能够记住之前研究过的主题,当用户再次搜索相近主题时复用已有发现,减少API调用。

要求: - 将每次研究成果持久化存储(如JSON文件) - 新研究开始时检索历史发现 - 让Planner在制定计划时考虑已有知识

练习3:构建研究Agent评估系统(高级)

设计一套系统来评估Deep Research Agent的输出质量。

评估维度建议: - 事实准确性(抽样人工核查) - 引用覆盖率(关键声明是否有引用) - 信息完整性(是否回答了所有子问题) - 报告结构质量(LLM-as-Judge评分)

练习4:实现知识图谱增强(进阶)

在综合阶段构建一个简易知识图谱(使用networkx),将提取的实体和关系可视化,并用图谱辅助矛盾检测。

Python
import networkx as nx

class ResearchKnowledgeGraph:
    """研究知识图谱"""
    def __init__(self):
        self.graph = nx.DiGraph()

    def add_fact(self, subject: str, relation: str, obj: str, source: str):
        """添加一个事实三元组"""
        ...

    def find_contradictions(self) -> list[dict]:
        """检测图谱中的矛盾"""
        ...

    def visualize(self, output_path: str = "knowledge_graph.html"):
        """可视化知识图谱"""
        ...

📝 本章小结

本章系统学习了Deep Research Agent的核心知识:

  1. ✅ 理解了Deep Research Agent与传统RAG的本质区别(主动多轮探索 vs 被动单轮检索)
  2. ✅ 掌握了Deep Research Agent的状态机架构设计(planning→searching→reading→synthesizing→writing)
  3. ✅ 实现了Research Planner(问题分解、子问题生成、动态重新规划)
  4. ✅ 实现了基于Tavily API的Web搜索模块
  5. ✅ 实现了网页内容提取与清洗(HTML解析、正文提取、噪音过滤)
  6. ✅ 实现了LLM驱动的信息抽取(结构化提取、事实校验、交叉验证)
  7. ✅ 实现了引用追踪系统(来源标注、引用链管理)
  8. ✅ 实现了知识综合引擎(矛盾检测、置信度评分、信息融合)
  9. ✅ 实现了多格式报告生成(Markdown结构化输出、带引用的学术风格报告)
  10. ✅ 完成了Mini Deep Research Agent的完整实现(含流式输出支持)
  11. ✅ 掌握了混合搜索策略(BFS广度优先 + DFS深度优先)
  12. ✅ 理解了预算控制机制(Token预算、API调用限制、分级模型策略)
  13. ✅ 了解了Human-in-the-loop模式(人工审核节点、交互式研究流程)
  14. ✅ 对比了主流商业产品(OpenAI Deep Research、Gemini、Perplexity)

✅ 学习检查清单

  • 能解释Deep Research Agent与传统RAG的核心区别
  • 能设计Deep Research Agent的状态机架构
  • 能实现问题分解与动态重新规划功能
  • 能集成Tavily等搜索API实现Web搜索
  • 能实现网页内容提取与清洗
  • 能用LLM进行结构化信息抽取
  • 能实现引用追踪和来源管理
  • 能实现知识综合(含矛盾检测和置信度评分)
  • 能生成带引用的结构化研究报告
  • 能实现研究过程的流式输出
  • 能设计预算控制和提前终止策略
  • 了解主流Deep Research商业产品的架构差异

🔗 相关章节

📚 参考资料

  1. OpenAI "Deep Research" (2025) — 基于o3的深度研究Agent
  2. Google "Gemini Deep Research" (2024) — Gemini 2.0 Flash Thinking驱动
  3. Perplexity AI — AI搜索与研究引擎
  4. Tavily AI Search API — 专为AI Agent设计的搜索API
  5. Assisting in Writing Wikipedia-like Articles From Scratch with Large Language Models (STORM, 2024)
  6. GPT Researcher — 开源Deep Research Agent实现
  7. LangChain "Research Assistant" 模板 — 研究Agent参考架构

📝 面试额外练习

Q1: Deep Research Agent与普通RAG的核心区别是什么?

核心区别在于“主动多轮探索” vs “被动单轮检索”。RAG是检索一次、生成一次;Deep Research会根据初步发现动态调整搜索策略,多轮迭代直到信息充分。其他区别包括:问题分解能力、矛盾检测、引用追踪、综合报告生成。

Q2: 如何控制Deep Research Agent的成本?

关键策略:① Token预算:为每个研究任务设定Token上限。② API调用限制:限制搜索次数和页面读取数。③ 分级模型:规划用强模型,提取用弱模型。④ 缓存:相同查询缓存结果。⑤ 提前终止:信息充分时主动停止搜索。

Q3: 如何评估Deep Research Agent的输出质量?

多维度评估:① 完整性:是否覆盖所有子问题。② 准确性:事实是否正确、引用是否有效。③ 一致性:是否存在内部矛盾。④ 时效性:信息是否过时。可用LLM-as-Judge自动评估,也可与人工审核结合。

🔗 下一步

下一章我们将学习生成式Agent与仿真,探索如何构建能够模拟人类行为的Agent。

继续学习: 14-生成式Agent与仿真


祝你学习愉快! 🎉


最后更新日期:2026-02-12 适用版本:AI Agent开发实战教程 v2026