跳转至

04 - 数据源集成

Dify数据源集成架构图

数据源配置、数据预处理、数据管理

📖 章节概述

本章将深入介绍如何在Dify中集成各种数据源,包括数据源类型、数据预处理流程、知识库管理、向量检索技术以及实际应用场景。通过详细的代码示例和实践指导,帮助读者掌握数据源集成的核心技能。

🎯 学习目标

完成本章后,你将能够:

  • 深入理解Dify数据源的类型和特点
  • 掌握数据预处理的核心技术
  • 熟练创建和管理知识库
  • 理解向量检索的原理和优化方法
  • 能够集成多种数据源到Dify应用
  • 掌握数据质量管理和优化技巧

1. 数据源类型详解

1.1 文本数据

1.1.1 文档类型

支持的文档格式: - PDF文档 - Word文档(.docx) - 纯文本(.txt) - Markdown(.md) - HTML网页

技术原理: Dify使用专门的文档解析器提取文本内容,保留基本的格式信息,然后进行文本清洗和分块处理。

代码示例 - 文档上传

Python
import requests
import json
import os
from typing import List, Dict

class DocumentUploader:
    """文档上传器"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}"
        }
        self.base_url = "https://api.dify.ai/v1"

    def create_dataset(self, name: str, description: str = "") -> Dict:
        """创建知识库"""
        url = f"{self.base_url}/datasets"
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        payload = {
            "name": name,
            "description": description,
            "permission": "only_me",
            "data_source_type": "upload_file"
        }

        try:
            response = requests.post(url, headers=headers, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def upload_document(self, dataset_id: str, file_path: str,
                       indexing_technique: str = "high_quality") -> Dict:
        """
        上传文档到知识库

        Args:
            dataset_id: 知识库ID
            file_path: 文件路径
            indexing_technique: 索引技术 (high_quality/economy)
        """
        url = f"{self.base_url}/datasets/{dataset_id}/document/create-by-file"

        # 检查文件是否存在
        if not os.path.exists(file_path):
            return {"error": f"文件不存在: {file_path}"}

        # 准备文件
        file_name = os.path.basename(file_path)

        try:
            with open(file_path, 'rb') as f:  # with自动管理资源,确保文件正确关闭
                files = {
                    'file': (file_name, f)
                }
                data = {
                    'indexing_technique': indexing_technique,
                    'process_rule': json.dumps({
                        "mode": "automatic"
                    })
                }

                response = requests.post(
                    url,
                    headers=self.headers,
                    files=files,
                    data=data
                )
                response.raise_for_status()
                return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def batch_upload(self, dataset_id: str, directory: str,
                   file_extensions: List[str] = None) -> List[Dict]:
        """
        批量上传文档

        Args:
            dataset_id: 知识库ID
            directory: 目录路径
            file_extensions: 文件扩展名列表
        """
        if file_extensions is None:
            file_extensions = ['.pdf', '.docx', '.txt', '.md']

        results = []

        # 遍历目录
        for root, dirs, files in os.walk(directory):
            for file in files:
                file_path = os.path.join(root, file)

                # 检查文件扩展名
                if any(file.lower().endswith(ext) for ext in file_extensions):
                    result = self.upload_document(dataset_id, file_path)
                    results.append({
                        "file": file_path,
                        "result": result
                    })

        return results

    def get_document_status(self, dataset_id: str, document_id: str) -> Dict:
        """获取文档处理状态"""
        url = f"{self.base_url}/datasets/{dataset_id}/documents/{document_id}"

        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

# 使用示例
if __name__ == "__main__":
    uploader = DocumentUploader(api_key="your_api_key_here")

    # 创建知识库
    dataset = uploader.create_dataset(
        name="技术文档库",
        description="存储所有技术相关的文档"
    )
    print(f"知识库创建成功!ID: {dataset.get('id')}")

    # 上传单个文档
    result = uploader.upload_document(
        dataset_id=dataset["id"],
        file_path="docs/python_guide.pdf"
    )
    print(f"文档上传结果: {result}")

    # 批量上传
    results = uploader.batch_upload(
        dataset_id=dataset["id"],
        directory="docs/",
        file_extensions=['.pdf', '.md']
    )
    print(f"批量上传完成,共处理 {len(results)} 个文件")

1.1.2 网页内容

技术原理: 通过URL导入网页内容,Dify会自动抓取网页文本,去除HTML标签和无关内容。

代码示例 - 网页导入

Python
class WebImporter:
    """网页内容导入器"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        self.base_url = "https://api.dify.ai/v1"

    def import_from_url(self, dataset_id: str, url: str,
                      indexing_technique: str = "high_quality") -> Dict:
        """
        从文本内容创建文档(可先抓取网页内容再导入)

        注意:Dify不直接支持通过API从URL创建文档
        URL导入需在Web界面操作,或先抓取网页内容再通过create-by-text接口导入

        Args:
            dataset_id: 知识库ID
            url: 网页URL
            indexing_technique: 索引技术
        """
        # 先抓取网页内容
        try:
            web_response = requests.get(url, timeout=30)
            web_response.raise_for_status()
            web_content = web_response.text
        except requests.exceptions.RequestException as e:
            return {"error": f"网页抓取失败: {str(e)}"}

        # 通过文本方式创建文档
        endpoint = f"{self.base_url}/datasets/{dataset_id}/document/create-by-text"

        payload = {
            "name": f"web_import_{url[:50]}",  # 切片操作:[start:end:step]提取子序列
            "text": web_content,
            "indexing_technique": indexing_technique,
            "process_rule": {
                "mode": "automatic"
            }
        }

        try:
            response = requests.post(endpoint, headers=self.headers, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def batch_import_urls(self, dataset_id: str, urls: List[str]) -> List[Dict]:
        """批量导入多个URL"""
        results = []

        for url in urls:
            result = self.import_from_url(dataset_id, url)
            results.append({
                "url": url,
                "result": result
            })

        return results

    def import_sitemap(self, dataset_id: str, sitemap_url: str,
                      max_pages: int = 100) -> List[Dict]:
        """
        从网站地图导入

        Args:
            dataset_id: 知识库ID
            sitemap_url: 网站地图URL
            max_pages: 最大导入页面数
        """
        import xml.etree.ElementTree as ET

        try:
            # 获取sitemap
            response = requests.get(sitemap_url)
            response.raise_for_status()

            # 解析XML
            root = ET.fromstring(response.content)

            # 提取URL
            namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
            urls = [loc.text for loc in root.findall('.//ns:loc', namespace)]

            # 限制数量
            urls = urls[:max_pages]

            # 批量导入
            return self.batch_import_urls(dataset_id, urls)

        except Exception as e:
            return [{"error": str(e)}]

# 使用示例
if __name__ == "__main__":
    importer = WebImporter(api_key="your_api_key_here")

    # 创建知识库(使用DocumentUploader的create_dataset方法)
    uploader = DocumentUploader(api_key="your_api_key_here")
    dataset = uploader.create_dataset(
        name="博客文章库",
        description="存储博客文章"
    )

    # 导入单个URL
    result = importer.import_from_url(
        dataset_id=dataset["id"],
        url="https://example.com/blog/article-1"
    )
    print(f"URL导入结果: {result}")

    # 批量导入
    urls = [
        "https://example.com/blog/article-1",
        "https://example.com/blog/article-2",
        "https://example.com/blog/article-3"
    ]
    results = importer.batch_import_urls(dataset["id"], urls)
    print(f"批量导入完成,共处理 {len(results)} 个URL")

1.2 外部数据导入

1.2.1 数据库数据导出再导入

说明: Dify不直接支持数据库连接作为数据源。支持的数据源类型包括:文件上传、文本导入和Notion同步。 如需导入数据库数据,可先将数据库内容导出为文本,再通过API导入。

支持的数据源类型: - 文件上传(PDF、Word、TXT、Markdown等) - 文本导入(通过API直接传入文本内容) - Notion同步

代码示例 - 数据库集成

Python
import pymysql
import psycopg2
from typing import List, Dict, Any
import json

class DatabaseExporter:
    """
    数据库导出器

    注意:Dify不直接支持数据库作为数据源
    该类将数据库内容导出为文本,再通过Dify API导入知识库
    """

    def __init__(self, api_key):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        self.base_url = "https://api.dify.ai/v1"

    def create_dataset_from_mysql(self, name: str, host: str,
                                database: str, user: str,
                                password: str, query: str,
                                description: str = "") -> Dict:
        """
        从MySQL创建知识库

        Args:
            name: 知识库名称
            host: 数据库主机
            database: 数据库名
            user: 用户名
            password: 密码
            query: SQL查询
            description: 描述
        """
        # 先创建知识库
        dataset = self.create_dataset(name, description)
        if "error" in dataset:
            return dataset

        dataset_id = dataset["id"]

        # 执行查询
        try:
            connection = pymysql.connect(
                host=host,
                database=database,
                user=user,
                password=password
            )

            with connection.cursor() as cursor:
                cursor.execute(query)
                results = cursor.fetchall()
                columns = [desc[0] for desc in cursor.description]

                # 转换为文本
                documents = []
                for row in results:
                    doc = {}
                    for col, val in zip(columns, row):  # zip并行遍历多个可迭代对象
                        doc[col] = str(val)
                    documents.append(doc)

                # 上传到知识库
                return self.upload_text_data(dataset_id, documents)

        except Exception as e:
            return {"error": str(e)}
        finally:
            if 'connection' in locals():
                connection.close()

    def create_dataset_from_postgres(self, name: str, host: str,
                                   database: str, user: str,
                                   password: str, query: str,
                                   description: str = "") -> Dict:
        """从PostgreSQL创建知识库"""
        # 先创建知识库
        dataset = self.create_dataset(name, description)
        if "error" in dataset:
            return dataset

        dataset_id = dataset["id"]

        # 执行查询
        try:
            connection = psycopg2.connect(
                host=host,
                database=database,
                user=user,
                password=password
            )

            with connection.cursor() as cursor:
                cursor.execute(query)
                results = cursor.fetchall()
                columns = [desc[0] for desc in cursor.description]

                # 转换为文本
                documents = []
                for row in results:
                    doc = {}
                    for col, val in zip(columns, row):
                        doc[col] = str(val)
                    documents.append(doc)

                # 上传到知识库
                return self.upload_text_data(dataset_id, documents)

        except Exception as e:
            return {"error": str(e)}
        finally:
            if 'connection' in locals():
                connection.close()

    def upload_text_data(self, dataset_id: str, data: List[Dict]) -> Dict:
        """上传文本数据"""
        url = f"{self.base_url}/datasets/{dataset_id}/document/create-by-text"

        # 将数据转换为文本
        text_content = "\n\n".join([
            json.dumps(doc, ensure_ascii=False) for doc in data  # json.dumps将Python对象转为JSON字符串
        ])

        payload = {
            "name": "database_export",
            "text": text_content,
            "indexing_technique": "high_quality",
            "process_rule": {
                "mode": "automatic"
            }
        }

        try:
            response = requests.post(url, headers=self.headers, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def create_dataset(self, name: str, description: str = "") -> Dict:
        """创建知识库"""
        url = f"{self.base_url}/datasets"

        payload = {
            "name": name,
            "description": description,
            "permission": "only_me",
            "data_source_type": "upload_file"
        }

        try:
            response = requests.post(url, headers=self.headers, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

# 使用示例
if __name__ == "__main__":
    integrator = DatabaseExporter(api_key="your_api_key_here")

    # 从数据库导出并导入知识库
    result = integrator.create_dataset_from_mysql(
        name="产品信息库",
        host="localhost",
        database="products",
        user="root",
        password="password",
        query="SELECT * FROM products WHERE category = 'electronics'",
        description="电子产品信息"
    )
    print(f"MySQL集成结果: {result}")

    # 从PostgreSQL创建知识库
    result = integrator.create_dataset_from_postgres(
        name="用户信息库",
        host="localhost",
        database="users",
        user="postgres",
        password="password",
        query="SELECT * FROM users WHERE status = 'active'",
        description="活跃用户信息"
    )
    print(f"PostgreSQL集成结果: {result}")

1.2.2 CSV/Excel文件

技术原理: 解析CSV或Excel文件,将每行数据转换为独立的文档,或合并为单个文档。

代码示例 - CSV/Excel导入

Python
import pandas as pd
from typing import List, Dict

class FileDataImporter:
    """文件数据导入器"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        self.base_url = "https://api.dify.ai/v1"

    def create_dataset(self, name: str, description: str = "") -> Dict:
        """创建知识库"""
        url = f"{self.base_url}/datasets"
        payload = {"name": name, "description": description}
        try:
            response = requests.post(url, headers=self.headers, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def import_csv(self, dataset_id: str, file_path: str,
                 row_as_document: bool = True) -> Dict:
        """
        导入CSV文件

        Args:
            dataset_id: 知识库ID
            file_path: CSV文件路径
            row_as_document: 是否每行作为一个文档
        """
        try:
            # 读取CSV
            df = pd.read_csv(file_path)

            if row_as_document:
                # 每行作为一个文档
                documents = []
                for idx, row in df.iterrows():
                    doc_text = "\n".join([
                        f"{col}: {val}" for col, val in row.items()
                    ])
                    documents.append({
                        "name": f"row_{idx}",
                        "text": doc_text
                    })

                return self.upload_multiple_documents(dataset_id, documents)
            else:
                # 整个文件作为一个文档
                text_content = df.to_string(index=False)
                return self.upload_text_data(dataset_id, text_content)

        except Exception as e:
            return {"error": str(e)}

    def import_excel(self, dataset_id: str, file_path: str,
                    sheet_name: str = 0,
                    row_as_document: bool = True) -> Dict:
        """
        导入Excel文件

        Args:
            dataset_id: 知识库ID
            file_path: Excel文件路径
            sheet_name: 工作表名称或索引
            row_as_document: 是否每行作为一个文档
        """
        try:
            # 读取Excel
            df = pd.read_excel(file_path, sheet_name=sheet_name)

            if row_as_document:
                # 每行作为一个文档
                documents = []
                for idx, row in df.iterrows():
                    doc_text = "\n".join([
                        f"{col}: {val}" for col, val in row.items()
                    ])
                    documents.append({
                        "name": f"row_{idx}",
                        "text": doc_text
                    })

                return self.upload_multiple_documents(dataset_id, documents)
            else:
                # 整个文件作为一个文档
                text_content = df.to_string(index=False)
                return self.upload_text_data(dataset_id, text_content)

        except Exception as e:
            return {"error": str(e)}

    def upload_multiple_documents(self, dataset_id: str,
                                  documents: List[Dict]) -> Dict:
        """上传多个文档"""
        results = []

        for doc in documents:
            url = f"{self.base_url}/datasets/{dataset_id}/document/create-by-text"

            payload = {
                "name": doc["name"],
                "text": doc["text"],
                "indexing_technique": "high_quality",
                "process_rule": {
                    "mode": "automatic"
                }
            }

            try:
                response = requests.post(url, headers=self.headers, json=payload)
                response.raise_for_status()
                results.append(response.json())
            except requests.exceptions.RequestException as e:
                results.append({"error": str(e)})

        return {
            "total": len(documents),
            "success": sum(1 for r in results if "error" not in r),
            "results": results
        }

    def upload_text_data(self, dataset_id: str, text: str) -> Dict:
        """上传文本数据"""
        url = f"{self.base_url}/datasets/{dataset_id}/document/create-by-text"

        payload = {
            "name": "imported_data",
            "text": text,
            "indexing_technique": "high_quality",
            "process_rule": {
                "mode": "automatic"
            }
        }

        try:
            response = requests.post(url, headers=self.headers, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

# 使用示例
if __name__ == "__main__":
    importer = FileDataImporter(api_key="your_api_key_here")

    # 创建知识库
    dataset = importer.create_dataset(
        name="销售数据",
        description="销售记录"
    )

    # 导入CSV
    result = importer.import_csv(
        dataset_id=dataset["id"],
        file_path="data/sales.csv",
        row_as_document=True
    )
    print(f"CSV导入结果: {result}")

    # 导入Excel
    result = importer.import_excel(
        dataset_id=dataset["id"],
        file_path="data/products.xlsx",
        sheet_name="Sheet1",
        row_as_document=True
    )
    print(f"Excel导入结果: {result}")

2. 数据预处理

2.1 文本清洗

技术原理: 文本清洗是数据预处理的重要步骤,包括去除HTML标签、特殊字符、标准化文本等。

代码示例 - 文本清洗

Python
import re
import html
from typing import List, Dict

class TextCleaner:
    """文本清洗器"""

    def __init__(self):
        # 常见的中英文标点符号
        self.punctuation = r'[,.;:!?""''(){}[\]<>|\\/~`@#$%^&*\-+=]'

    def clean_html(self, text: str) -> str:
        """去除HTML标签"""
        # 使用正则表达式去除HTML标签
        clean_text = re.sub(r'<[^>]+>', '', text)
        # 解码HTML实体
        clean_text = html.unescape(clean_text)
        return clean_text

    def remove_special_chars(self, text: str, keep_punctuation: bool = True) -> str:
        """去除特殊字符"""
        if keep_punctuation:
            # 保留标点符号
            pattern = r'[^\w\s' + self.punctuation + r'\u4e00-\u9fff]'
        else:
            # 不保留标点符号
            pattern = r'[^\w\s\u4e00-\u9fff]'

        clean_text = re.sub(pattern, '', text)
        return clean_text

    def normalize_whitespace(self, text: str) -> str:
        """标准化空白字符"""
        # 替换多个空白为单个空格
        clean_text = re.sub(r'\s+', ' ', text)
        # 去除首尾空白
        clean_text = clean_text.strip()
        return clean_text

    def remove_urls(self, text: str) -> str:
        """去除URL"""
        url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
        clean_text = re.sub(url_pattern, '', text)
        return clean_text

    def remove_emails(self, text: str) -> str:
        """去除邮箱地址"""
        email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        clean_text = re.sub(email_pattern, '', text)
        return clean_text

    def clean(self, text: str, options: Dict = None) -> str:
        """
        综合清洗

        Args:
            text: 待清洗文本
            options: 清洗选项
        """
        if options is None:
            options = {
                "remove_html": True,
                "remove_special_chars": True,
                "keep_punctuation": True,
                "normalize_whitespace": True,
                "remove_urls": True,
                "remove_emails": True
            }

        clean_text = text

        if options.get("remove_html"):
            clean_text = self.clean_html(clean_text)

        if options.get("remove_special_chars"):
            clean_text = self.remove_special_chars(
                clean_text,
                options.get("keep_punctuation", True)
            )

        if options.get("normalize_whitespace"):
            clean_text = self.normalize_whitespace(clean_text)

        if options.get("remove_urls"):
            clean_text = self.remove_urls(clean_text)

        if options.get("remove_emails"):
            clean_text = self.remove_emails(clean_text)

        return clean_text

    def batch_clean(self, texts: List[str], options: Dict = None) -> List[str]:
        """批量清洗文本"""
        return [self.clean(text, options) for text in texts]

# 使用示例
if __name__ == "__main__":
    cleaner = TextCleaner()

    # 示例文本
    text = """
    <p>这是一个示例文本!</p>
    访问 https://example.com 了解更多信息。
    联系邮箱: info@example.com
    """

    # 清洗文本
    clean_text = cleaner.clean(text)
    print(f"清洗后的文本:\n{clean_text}")

    # 批量清洗
    texts = [
        "<div>第一个文本</div>",
        "<span>第二个文本</span>",
        "<p>第三个文本</p>"
    ]

    clean_texts = cleaner.batch_clean(texts)
    for i, clean_text in enumerate(clean_texts):
        print(f"文本{i+1}: {clean_text}")

2.2 分块处理

技术原理: 将长文本分割成语义完整的块,每个块包含足够的上下文信息,便于检索和理解。

分块策略: 1. 固定大小分块:按字符数或token数分割 2. 语义分块:基于段落、句子等语义单元分割 3. 重叠分块:块之间有重叠,保持上下文连续性

代码示例 - 分块处理

Python
from typing import List, Dict
import re

class TextChunker:
    """文本分块器"""

    def __init__(self):
        pass

    def chunk_by_size(self, text: str, chunk_size: int = 500,
                     overlap: int = 50) -> List[str]:
        """
        按大小分块

        Args:
            text: 待分块文本
            chunk_size: 块大小(字符数)
            overlap: 重叠大小
        """
        chunks = []
        start = 0

        while start < len(text):
            end = start + chunk_size
            chunk = text[start:end]
            chunks.append(chunk)

            # 移动起始位置,考虑重叠
            start = end - overlap

        return chunks

    def chunk_by_sentence(self, text: str, max_sentences: int = 5,
                        overlap_sentences: int = 1) -> List[str]:
        """
        按句子分块

        Args:
            text: 待分块文本
            max_sentences: 每块最大句子数
            overlap_sentences: 重叠句子数
        """
        # 分割句子
        sentences = re.split(r'(?<=[.!?。!?])\s+', text)

        chunks = []
        start = 0

        while start < len(sentences):
            end = start + max_sentences
            chunk_sentences = sentences[start:end]
            chunk = ' '.join(chunk_sentences)
            chunks.append(chunk)

            # 移动起始位置,考虑重叠
            start = end - overlap_sentences

        return chunks

    def chunk_by_paragraph(self, text: str, max_paragraphs: int = 3,
                        overlap_paragraphs: int = 1) -> List[str]:
        """
        按段落分块

        Args:
            text: 待分块文本
            max_paragraphs: 每块最大段落数
            overlap_paragraphs: 重叠段落数
        """
        # 分割段落
        paragraphs = re.split(r'\n\s*\n', text.strip())

        chunks = []
        start = 0

        while start < len(paragraphs):
            end = start + max_paragraphs
            chunk_paragraphs = paragraphs[start:end]
            chunk = '\n\n'.join(chunk_paragraphs)
            chunks.append(chunk)

            # 移动起始位置,考虑重叠
            start = end - overlap_paragraphs

        return chunks

    def chunk_semantic(self, text: str, max_chunk_size: int = 1000,
                     min_chunk_size: int = 200) -> List[str]:
        """
        语义分块(基于段落和句子)

        Args:
            text: 待分块文本
            max_chunk_size: 最大块大小
            min_chunk_size: 最小块大小
        """
        chunks = []
        current_chunk = ""

        # 分割段落
        paragraphs = re.split(r'\n\s*\n', text.strip())

        for paragraph in paragraphs:
            # 检查是否需要开始新块
            if len(current_chunk) + len(paragraph) > max_chunk_size:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                    current_chunk = ""

            # 添加段落
            if current_chunk:
                current_chunk += "\n\n" + paragraph
            else:
                current_chunk = paragraph

        # 添加最后一个块
        if current_chunk:
            chunks.append(current_chunk.strip())

        return chunks

    def chunk_with_metadata(self, text: str, chunk_size: int = 500,
                           overlap: int = 50) -> List[Dict]:
        """
        带元数据的分块

        Args:
            text: 待分块文本
            chunk_size: 块大小
            overlap: 重叠大小
        """
        chunks = self.chunk_by_size(text, chunk_size, overlap)

        chunks_with_metadata = []
        for i, chunk in enumerate(chunks):  # enumerate同时获取索引和值
            chunks_with_metadata.append({
                "chunk_id": i,
                "chunk_text": chunk,
                "chunk_size": len(chunk),
                "chunk_index": i,
                "total_chunks": len(chunks)
            })

        return chunks_with_metadata

# 使用示例
if __name__ == "__main__":
    chunker = TextChunker()

    # 示例文本
    text = """
    人工智能(AI)是计算机科学的一个分支,致力于创建能够执行通常需要人类智能的任务的系统。

    机器学习是AI的一个子集,它使计算机能够从数据中学习,而无需显式编程。
    深度学习是机器学习的一种特殊形式,它使用多层神经网络来模拟人脑的结构。

    自然语言处理(NLP)是AI的另一个重要分支,它使计算机能够理解、解释和生成人类语言。
    计算机视觉则使计算机能够从图像和视频中提取有意义的信息。

    AI的应用非常广泛,包括医疗诊断、自动驾驶、金融分析、智能客服等。
    随着技术的发展,AI将在更多领域发挥重要作用。
    """

    # 按大小分块
    chunks = chunker.chunk_by_size(text, chunk_size=200, overlap=30)
    print(f"按大小分块(共{len(chunks)}块):")
    for i, chunk in enumerate(chunks):
        print(f"\n{i+1}: {chunk[:100]}...")

    # 按句子分块
    chunks = chunker.chunk_by_sentence(text, max_sentences=3, overlap_sentences=1)
    print(f"\n按句子分块(共{len(chunks)}块):")
    for i, chunk in enumerate(chunks):
        print(f"\n{i+1}: {chunk[:100]}...")

    # 带元数据的分块
    chunks = chunker.chunk_with_metadata(text, chunk_size=200, overlap=30)
    print(f"\n带元数据的分块:")
    for chunk in chunks:
        print(f"块{chunk['chunk_id']}: 大小={chunk['chunk_size']}")

3. 知识库管理

3.1 创建知识库

代码示例 - 完整的知识库管理

Python
import requests
from typing import List, Dict, Optional

class KnowledgeBaseManager:
    """知识库管理器"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
        self.base_url = "https://api.dify.ai/v1"

    def create_dataset(self, name: str, description: str = "",
                     permission: str = "only_me") -> Dict:
        """
        创建知识库

        Args:
            name: 知识库名称
            description: 描述
            permission: 权限 (only_me/all_team_members/all_users)
        """
        url = f"{self.base_url}/datasets"

        payload = {
            "name": name,
            "description": description,
            "permission": permission,
            "data_source_type": "upload_file"
        }

        try:  # try/except捕获异常
            response = requests.post(url, headers=self.headers, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def list_datasets(self, page: int = 1, limit: int = 20) -> Dict:
        """列出知识库"""
        url = f"{self.base_url}/datasets"
        params = {
            "page": page,
            "limit": limit
        }

        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def get_dataset(self, dataset_id: str) -> Dict:
        """获取知识库详情"""
        url = f"{self.base_url}/datasets/{dataset_id}"

        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def update_dataset(self, dataset_id: str, name: str = None,
                     description: str = None) -> Dict:
        """
        更新知识库信息

        注意:Dify不提供通过API更新知识库元信息的接口
        知识库的名称、描述等元信息需在Dify Web界面修改
        """
        print(f"请在Dify Web界面修改知识库信息: dataset_id={dataset_id}")
        return {"message": "请在Dify Web界面修改知识库信息"}

    def delete_dataset(self, dataset_id: str) -> Dict:
        """删除知识库"""
        url = f"{self.base_url}/datasets/{dataset_id}"

        try:
            response = requests.delete(url, headers=self.headers)
            response.raise_for_status()
            return {"message": "删除成功"}
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def list_documents(self, dataset_id: str, page: int = 1,
                     limit: int = 20) -> Dict:
        """列出知识库中的文档"""
        url = f"{self.base_url}/datasets/{dataset_id}/documents"
        params = {
            "page": page,
            "limit": limit
        }

        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def get_document(self, dataset_id: str, document_id: str) -> Dict:
        """获取文档详情"""
        url = f"{self.base_url}/datasets/{dataset_id}/documents/{document_id}"

        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def delete_document(self, dataset_id: str, document_id: str) -> Dict:
        """删除文档"""
        url = f"{self.base_url}/datasets/{dataset_id}/documents/{document_id}"

        try:
            response = requests.delete(url, headers=self.headers)
            response.raise_for_status()
            return {"message": "删除成功"}
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def update_indexing(self, dataset_id: str,
                       indexing_technique: str = "high_quality") -> Dict:
        """
        更新索引技术

        注意:Dify不提供通过API切换索引技术的接口
        索引技术在创建知识库时指定,后续可在Web界面管理
        支持的索引技术:
        - high_quality: 高质量(使用嵌入模型向量化)
        - economy: 经济型(使用关键词索引)
        """
        print(f"请在Dify Web界面管理知识库索引: dataset_id={dataset_id}")
        return {"message": "索引技术在创建知识库时指定,后续在Web界面管理"}

# 使用示例
if __name__ == "__main__":
    manager = KnowledgeBaseManager(api_key="your_api_key_here")

    # 创建知识库
    dataset = manager.create_dataset(
        name="技术文档",
        description="存储技术相关文档",
        permission="only_me"
    )
    print(f"知识库创建成功!ID: {dataset.get('id')}")

    # 列出所有知识库
    datasets = manager.list_datasets()
    print(f"共有 {len(datasets.get('data', []))} 个知识库")

    # 获取知识库详情
    if "id" in dataset:
        detail = manager.get_dataset(dataset["id"])
        print(f"知识库详情: {detail}")

    # 列出文档
    if "id" in dataset:
        documents = manager.list_documents(dataset["id"])
        print(f"知识库中有 {len(documents.get('data', []))} 个文档")

3.2 检索策略

3.2.1 向量检索

技术原理: 将文本转换为向量表示,通过计算向量相似度进行检索。

代码示例 - 向量检索配置

Python
class VectorRetrievalConfig:
    """向量检索配置"""

    @staticmethod
    def create_config(top_k: int = 5, score_threshold: float = 0.5,
                    reranking: bool = False) -> Dict:
        """
        创建向量检索配置

        Args:
            top_k: 返回结果数量
            score_threshold: 相似度阈值
            reranking: 是否重排序
        """
        return {
            "retrieval_mode": "single",
            "top_k": top_k,
            "score_threshold": score_threshold,
            "reranking_enable": reranking,
            "reranking_model": {
                "reranking_provider": "cohere",
                "reranking_model_name": "rerank-english-v2.0"
            } if reranking else None
        }

# 使用示例
config = VectorRetrievalConfig.create_config(
    top_k=10,
    score_threshold=0.6,
    reranking=True
)
print(f"向量检索配置: {config}")

3.2.2 关键词检索

技术原理: 基于关键词匹配进行检索,适合精确匹配场景。

代码示例 - 关键词检索配置

Python
class KeywordRetrievalConfig:
    """关键词检索配置"""

    @staticmethod
    def create_config(top_k: int = 5,
                    match_type: str = "fuzzy") -> Dict:
        """
        创建关键词检索配置

        Args:
            top_k: 返回结果数量
            match_type: 匹配类型 (fuzzy/exact)
        """
        return {
            "retrieval_mode": "single",
            "top_k": top_k,
            "keyword_search_config": {
                "keyword_search_type": match_type
            }
        }

# 使用示例
config = KeywordRetrievalConfig.create_config(
    top_k=10,
    match_type="fuzzy"
)
print(f"关键词检索配置: {config}")

3.2.3 混合检索

技术原理: 结合向量检索和关键词检索,平衡语义理解和精确匹配。

代码示例 - 混合检索配置

Python
class HybridRetrievalConfig:
    """混合检索配置"""

    @staticmethod  # @staticmethod静态方法,不需要实例
    def create_config(top_k: int = 5, score_threshold: float = 0.5,
                    vector_weight: float = 0.7,
                    keyword_weight: float = 0.3,
                    reranking: bool = False) -> Dict:
        """
        创建混合检索配置

        Args:
            top_k: 返回结果数量
            score_threshold: 相似度阈值
            vector_weight: 向量检索权重
            keyword_weight: 关键词检索权重
            reranking: 是否重排序
        """
        return {
            "retrieval_mode": "multiple",
            "top_k": top_k,
            "score_threshold": score_threshold,
            "reranking_enable": reranking,
            "reranking_model": {
                "reranking_provider": "cohere",
                "reranking_model_name": "rerank-english-v2.0"
            } if reranking else None,
            "weights": {
                "vector_weight": vector_weight,
                "keyword_weight": keyword_weight
            }
        }

# 使用示例
config = HybridRetrievalConfig.create_config(
    top_k=10,
    score_threshold=0.6,
    vector_weight=0.8,
    keyword_weight=0.2,
    reranking=True
)
print(f"混合检索配置: {config}")

4. 练习题

基础练习

  1. 创建知识库
  2. 上传文档
  3. 配置参数
  4. 测试检索

进阶练习

  1. 构建企业知识库
  2. 集成多种数据源
  3. 优化检索策略
  4. 实现智能问答

  5. 开发文档管理系统

  6. 批量导入文档
  7. 自动分类标签
  8. 智能检索

5. 最佳实践

✅ 推荐做法

  1. 数据质量
  2. 清洗数据
  3. 去除噪声
  4. 标准化格式

  5. 分块策略

  6. 合理分块
  7. 保持语义完整
  8. 避免重叠过多

  9. 检索优化

  10. 选择合适的检索策略
  11. 调整相似度阈值
  12. 使用重排序提高质量

❌ 避免做法

  1. 数据过多
  2. 选择相关数据
  3. 避免冗余
  4. 定期清理

  5. 分块不当

  6. 避免过小或过大的块
  7. 保持语义完整性
  8. 考虑上下文

6. 常见问题

Q1: 如何选择合适的分块大小?

A: 选择建议: - 短文本:200-500字符 - 中等文本:500-1000字符 - 长文本:1000-2000字符 - 考虑重叠:通常重叠10-20%

Q2: 如何提高检索准确率?

A: 优化方法: - 使用混合检索 - 调整权重比例 - 启用重排序 - 优化相似度阈值

Q3: 如何处理多语言数据?

A: 处理方法: - 按语言分知识库 - 使用多语言嵌入模型 - 添加语言标签 - 分别配置检索策略

7. 总结

本章深入介绍了数据源集成的核心内容,包括:

  1. 数据源类型:文本数据、结构化数据
  2. 数据预处理:文本清洗、分块处理
  3. 知识库管理:创建、查询、更新、删除
  4. 检索策略:向量检索、关键词检索、混合检索

通过本章的学习,你应该能够熟练集成各种数据源到Dify应用了。

8. 下一步

继续学习05-模型配置与优化,深入了解模型配置和优化技巧。


最后更新日期:2026-02-12 适用版本:Dify实战教程 v2026