04 - 数据源集成¶
数据源配置、数据预处理、数据管理
📖 章节概述¶
本章将深入介绍如何在Dify中集成各种数据源,包括数据源类型、数据预处理流程、知识库管理、向量检索技术以及实际应用场景。通过详细的代码示例和实践指导,帮助读者掌握数据源集成的核心技能。
🎯 学习目标¶
完成本章后,你将能够:
- 深入理解Dify数据源的类型和特点
- 掌握数据预处理的核心技术
- 熟练创建和管理知识库
- 理解向量检索的原理和优化方法
- 能够集成多种数据源到Dify应用
- 掌握数据质量管理和优化技巧
1. 数据源类型详解¶
1.1 文本数据¶
1.1.1 文档类型¶
支持的文档格式: - PDF文档 - Word文档(.docx) - 纯文本(.txt) - Markdown(.md) - HTML网页
技术原理: Dify使用专门的文档解析器提取文本内容,保留基本的格式信息,然后进行文本清洗和分块处理。
代码示例 - 文档上传:
import requests
import json
import os
from typing import List, Dict
class DocumentUploader:
"""文档上传器"""
def __init__(self, api_key):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}"
}
self.base_url = "https://api.dify.ai/v1"
def create_dataset(self, name: str, description: str = "") -> Dict:
"""创建知识库"""
url = f"{self.base_url}/datasets"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"name": name,
"description": description,
"permission": "only_me",
"data_source_type": "upload_file"
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def upload_document(self, dataset_id: str, file_path: str,
indexing_technique: str = "high_quality") -> Dict:
"""
上传文档到知识库
Args:
dataset_id: 知识库ID
file_path: 文件路径
indexing_technique: 索引技术 (high_quality/economy)
"""
url = f"{self.base_url}/datasets/{dataset_id}/document/create-by-file"
# 检查文件是否存在
if not os.path.exists(file_path):
return {"error": f"文件不存在: {file_path}"}
# 准备文件
file_name = os.path.basename(file_path)
try:
with open(file_path, 'rb') as f: # with自动管理资源,确保文件正确关闭
files = {
'file': (file_name, f)
}
data = {
'indexing_technique': indexing_technique,
'process_rule': json.dumps({
"mode": "automatic"
})
}
response = requests.post(
url,
headers=self.headers,
files=files,
data=data
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def batch_upload(self, dataset_id: str, directory: str,
file_extensions: List[str] = None) -> List[Dict]:
"""
批量上传文档
Args:
dataset_id: 知识库ID
directory: 目录路径
file_extensions: 文件扩展名列表
"""
if file_extensions is None:
file_extensions = ['.pdf', '.docx', '.txt', '.md']
results = []
# 遍历目录
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
# 检查文件扩展名
if any(file.lower().endswith(ext) for ext in file_extensions):
result = self.upload_document(dataset_id, file_path)
results.append({
"file": file_path,
"result": result
})
return results
def get_document_status(self, dataset_id: str, document_id: str) -> Dict:
"""获取文档处理状态"""
url = f"{self.base_url}/datasets/{dataset_id}/documents/{document_id}"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# 使用示例
if __name__ == "__main__":
uploader = DocumentUploader(api_key="your_api_key_here")
# 创建知识库
dataset = uploader.create_dataset(
name="技术文档库",
description="存储所有技术相关的文档"
)
print(f"知识库创建成功!ID: {dataset.get('id')}")
# 上传单个文档
result = uploader.upload_document(
dataset_id=dataset["id"],
file_path="docs/python_guide.pdf"
)
print(f"文档上传结果: {result}")
# 批量上传
results = uploader.batch_upload(
dataset_id=dataset["id"],
directory="docs/",
file_extensions=['.pdf', '.md']
)
print(f"批量上传完成,共处理 {len(results)} 个文件")
1.1.2 网页内容¶
技术原理: 通过URL导入网页内容,Dify会自动抓取网页文本,去除HTML标签和无关内容。
代码示例 - 网页导入:
class WebImporter:
"""网页内容导入器"""
def __init__(self, api_key):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
self.base_url = "https://api.dify.ai/v1"
def import_from_url(self, dataset_id: str, url: str,
indexing_technique: str = "high_quality") -> Dict:
"""
从文本内容创建文档(可先抓取网页内容再导入)
注意:Dify不直接支持通过API从URL创建文档
URL导入需在Web界面操作,或先抓取网页内容再通过create-by-text接口导入
Args:
dataset_id: 知识库ID
url: 网页URL
indexing_technique: 索引技术
"""
# 先抓取网页内容
try:
web_response = requests.get(url, timeout=30)
web_response.raise_for_status()
web_content = web_response.text
except requests.exceptions.RequestException as e:
return {"error": f"网页抓取失败: {str(e)}"}
# 通过文本方式创建文档
endpoint = f"{self.base_url}/datasets/{dataset_id}/document/create-by-text"
payload = {
"name": f"web_import_{url[:50]}", # 切片操作:[start:end:step]提取子序列
"text": web_content,
"indexing_technique": indexing_technique,
"process_rule": {
"mode": "automatic"
}
}
try:
response = requests.post(endpoint, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def batch_import_urls(self, dataset_id: str, urls: List[str]) -> List[Dict]:
"""批量导入多个URL"""
results = []
for url in urls:
result = self.import_from_url(dataset_id, url)
results.append({
"url": url,
"result": result
})
return results
def import_sitemap(self, dataset_id: str, sitemap_url: str,
max_pages: int = 100) -> List[Dict]:
"""
从网站地图导入
Args:
dataset_id: 知识库ID
sitemap_url: 网站地图URL
max_pages: 最大导入页面数
"""
import xml.etree.ElementTree as ET
try:
# 获取sitemap
response = requests.get(sitemap_url)
response.raise_for_status()
# 解析XML
root = ET.fromstring(response.content)
# 提取URL
namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
urls = [loc.text for loc in root.findall('.//ns:loc', namespace)]
# 限制数量
urls = urls[:max_pages]
# 批量导入
return self.batch_import_urls(dataset_id, urls)
except Exception as e:
return [{"error": str(e)}]
# 使用示例
if __name__ == "__main__":
importer = WebImporter(api_key="your_api_key_here")
# 创建知识库(使用DocumentUploader的create_dataset方法)
uploader = DocumentUploader(api_key="your_api_key_here")
dataset = uploader.create_dataset(
name="博客文章库",
description="存储博客文章"
)
# 导入单个URL
result = importer.import_from_url(
dataset_id=dataset["id"],
url="https://example.com/blog/article-1"
)
print(f"URL导入结果: {result}")
# 批量导入
urls = [
"https://example.com/blog/article-1",
"https://example.com/blog/article-2",
"https://example.com/blog/article-3"
]
results = importer.batch_import_urls(dataset["id"], urls)
print(f"批量导入完成,共处理 {len(results)} 个URL")
1.2 外部数据导入¶
1.2.1 数据库数据导出再导入¶
说明: Dify不直接支持数据库连接作为数据源。支持的数据源类型包括:文件上传、文本导入和Notion同步。 如需导入数据库数据,可先将数据库内容导出为文本,再通过API导入。
支持的数据源类型: - 文件上传(PDF、Word、TXT、Markdown等) - 文本导入(通过API直接传入文本内容) - Notion同步
代码示例 - 数据库集成:
import pymysql
import psycopg2
from typing import List, Dict, Any
import json
class DatabaseExporter:
"""
数据库导出器
注意:Dify不直接支持数据库作为数据源
该类将数据库内容导出为文本,再通过Dify API导入知识库
"""
def __init__(self, api_key):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
self.base_url = "https://api.dify.ai/v1"
def create_dataset_from_mysql(self, name: str, host: str,
database: str, user: str,
password: str, query: str,
description: str = "") -> Dict:
"""
从MySQL创建知识库
Args:
name: 知识库名称
host: 数据库主机
database: 数据库名
user: 用户名
password: 密码
query: SQL查询
description: 描述
"""
# 先创建知识库
dataset = self.create_dataset(name, description)
if "error" in dataset:
return dataset
dataset_id = dataset["id"]
# 执行查询
try:
connection = pymysql.connect(
host=host,
database=database,
user=user,
password=password
)
with connection.cursor() as cursor:
cursor.execute(query)
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# 转换为文本
documents = []
for row in results:
doc = {}
for col, val in zip(columns, row): # zip并行遍历多个可迭代对象
doc[col] = str(val)
documents.append(doc)
# 上传到知识库
return self.upload_text_data(dataset_id, documents)
except Exception as e:
return {"error": str(e)}
finally:
if 'connection' in locals():
connection.close()
def create_dataset_from_postgres(self, name: str, host: str,
database: str, user: str,
password: str, query: str,
description: str = "") -> Dict:
"""从PostgreSQL创建知识库"""
# 先创建知识库
dataset = self.create_dataset(name, description)
if "error" in dataset:
return dataset
dataset_id = dataset["id"]
# 执行查询
try:
connection = psycopg2.connect(
host=host,
database=database,
user=user,
password=password
)
with connection.cursor() as cursor:
cursor.execute(query)
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# 转换为文本
documents = []
for row in results:
doc = {}
for col, val in zip(columns, row):
doc[col] = str(val)
documents.append(doc)
# 上传到知识库
return self.upload_text_data(dataset_id, documents)
except Exception as e:
return {"error": str(e)}
finally:
if 'connection' in locals():
connection.close()
def upload_text_data(self, dataset_id: str, data: List[Dict]) -> Dict:
"""上传文本数据"""
url = f"{self.base_url}/datasets/{dataset_id}/document/create-by-text"
# 将数据转换为文本
text_content = "\n\n".join([
json.dumps(doc, ensure_ascii=False) for doc in data # json.dumps将Python对象转为JSON字符串
])
payload = {
"name": "database_export",
"text": text_content,
"indexing_technique": "high_quality",
"process_rule": {
"mode": "automatic"
}
}
try:
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def create_dataset(self, name: str, description: str = "") -> Dict:
"""创建知识库"""
url = f"{self.base_url}/datasets"
payload = {
"name": name,
"description": description,
"permission": "only_me",
"data_source_type": "upload_file"
}
try:
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# 使用示例
if __name__ == "__main__":
integrator = DatabaseExporter(api_key="your_api_key_here")
# 从数据库导出并导入知识库
result = integrator.create_dataset_from_mysql(
name="产品信息库",
host="localhost",
database="products",
user="root",
password="password",
query="SELECT * FROM products WHERE category = 'electronics'",
description="电子产品信息"
)
print(f"MySQL集成结果: {result}")
# 从PostgreSQL创建知识库
result = integrator.create_dataset_from_postgres(
name="用户信息库",
host="localhost",
database="users",
user="postgres",
password="password",
query="SELECT * FROM users WHERE status = 'active'",
description="活跃用户信息"
)
print(f"PostgreSQL集成结果: {result}")
1.2.2 CSV/Excel文件¶
技术原理: 解析CSV或Excel文件,将每行数据转换为独立的文档,或合并为单个文档。
代码示例 - CSV/Excel导入:
import pandas as pd
from typing import List, Dict
class FileDataImporter:
"""文件数据导入器"""
def __init__(self, api_key):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
self.base_url = "https://api.dify.ai/v1"
def create_dataset(self, name: str, description: str = "") -> Dict:
"""创建知识库"""
url = f"{self.base_url}/datasets"
payload = {"name": name, "description": description}
try:
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def import_csv(self, dataset_id: str, file_path: str,
row_as_document: bool = True) -> Dict:
"""
导入CSV文件
Args:
dataset_id: 知识库ID
file_path: CSV文件路径
row_as_document: 是否每行作为一个文档
"""
try:
# 读取CSV
df = pd.read_csv(file_path)
if row_as_document:
# 每行作为一个文档
documents = []
for idx, row in df.iterrows():
doc_text = "\n".join([
f"{col}: {val}" for col, val in row.items()
])
documents.append({
"name": f"row_{idx}",
"text": doc_text
})
return self.upload_multiple_documents(dataset_id, documents)
else:
# 整个文件作为一个文档
text_content = df.to_string(index=False)
return self.upload_text_data(dataset_id, text_content)
except Exception as e:
return {"error": str(e)}
def import_excel(self, dataset_id: str, file_path: str,
sheet_name: str = 0,
row_as_document: bool = True) -> Dict:
"""
导入Excel文件
Args:
dataset_id: 知识库ID
file_path: Excel文件路径
sheet_name: 工作表名称或索引
row_as_document: 是否每行作为一个文档
"""
try:
# 读取Excel
df = pd.read_excel(file_path, sheet_name=sheet_name)
if row_as_document:
# 每行作为一个文档
documents = []
for idx, row in df.iterrows():
doc_text = "\n".join([
f"{col}: {val}" for col, val in row.items()
])
documents.append({
"name": f"row_{idx}",
"text": doc_text
})
return self.upload_multiple_documents(dataset_id, documents)
else:
# 整个文件作为一个文档
text_content = df.to_string(index=False)
return self.upload_text_data(dataset_id, text_content)
except Exception as e:
return {"error": str(e)}
def upload_multiple_documents(self, dataset_id: str,
documents: List[Dict]) -> Dict:
"""上传多个文档"""
results = []
for doc in documents:
url = f"{self.base_url}/datasets/{dataset_id}/document/create-by-text"
payload = {
"name": doc["name"],
"text": doc["text"],
"indexing_technique": "high_quality",
"process_rule": {
"mode": "automatic"
}
}
try:
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
results.append(response.json())
except requests.exceptions.RequestException as e:
results.append({"error": str(e)})
return {
"total": len(documents),
"success": sum(1 for r in results if "error" not in r),
"results": results
}
def upload_text_data(self, dataset_id: str, text: str) -> Dict:
"""上传文本数据"""
url = f"{self.base_url}/datasets/{dataset_id}/document/create-by-text"
payload = {
"name": "imported_data",
"text": text,
"indexing_technique": "high_quality",
"process_rule": {
"mode": "automatic"
}
}
try:
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# 使用示例
if __name__ == "__main__":
importer = FileDataImporter(api_key="your_api_key_here")
# 创建知识库
dataset = importer.create_dataset(
name="销售数据",
description="销售记录"
)
# 导入CSV
result = importer.import_csv(
dataset_id=dataset["id"],
file_path="data/sales.csv",
row_as_document=True
)
print(f"CSV导入结果: {result}")
# 导入Excel
result = importer.import_excel(
dataset_id=dataset["id"],
file_path="data/products.xlsx",
sheet_name="Sheet1",
row_as_document=True
)
print(f"Excel导入结果: {result}")
2. 数据预处理¶
2.1 文本清洗¶
技术原理: 文本清洗是数据预处理的重要步骤,包括去除HTML标签、特殊字符、标准化文本等。
代码示例 - 文本清洗:
import re
import html
from typing import List, Dict
class TextCleaner:
"""文本清洗器"""
def __init__(self):
# 常见的中英文标点符号
self.punctuation = r'[,.;:!?""''(){}[\]<>|\\/~`@#$%^&*\-+=]'
def clean_html(self, text: str) -> str:
"""去除HTML标签"""
# 使用正则表达式去除HTML标签
clean_text = re.sub(r'<[^>]+>', '', text)
# 解码HTML实体
clean_text = html.unescape(clean_text)
return clean_text
def remove_special_chars(self, text: str, keep_punctuation: bool = True) -> str:
"""去除特殊字符"""
if keep_punctuation:
# 保留标点符号
pattern = r'[^\w\s' + self.punctuation + r'\u4e00-\u9fff]'
else:
# 不保留标点符号
pattern = r'[^\w\s\u4e00-\u9fff]'
clean_text = re.sub(pattern, '', text)
return clean_text
def normalize_whitespace(self, text: str) -> str:
"""标准化空白字符"""
# 替换多个空白为单个空格
clean_text = re.sub(r'\s+', ' ', text)
# 去除首尾空白
clean_text = clean_text.strip()
return clean_text
def remove_urls(self, text: str) -> str:
"""去除URL"""
url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'
clean_text = re.sub(url_pattern, '', text)
return clean_text
def remove_emails(self, text: str) -> str:
"""去除邮箱地址"""
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
clean_text = re.sub(email_pattern, '', text)
return clean_text
def clean(self, text: str, options: Dict = None) -> str:
"""
综合清洗
Args:
text: 待清洗文本
options: 清洗选项
"""
if options is None:
options = {
"remove_html": True,
"remove_special_chars": True,
"keep_punctuation": True,
"normalize_whitespace": True,
"remove_urls": True,
"remove_emails": True
}
clean_text = text
if options.get("remove_html"):
clean_text = self.clean_html(clean_text)
if options.get("remove_special_chars"):
clean_text = self.remove_special_chars(
clean_text,
options.get("keep_punctuation", True)
)
if options.get("normalize_whitespace"):
clean_text = self.normalize_whitespace(clean_text)
if options.get("remove_urls"):
clean_text = self.remove_urls(clean_text)
if options.get("remove_emails"):
clean_text = self.remove_emails(clean_text)
return clean_text
def batch_clean(self, texts: List[str], options: Dict = None) -> List[str]:
"""批量清洗文本"""
return [self.clean(text, options) for text in texts]
# 使用示例
if __name__ == "__main__":
cleaner = TextCleaner()
# 示例文本
text = """
<p>这是一个示例文本!</p>
访问 https://example.com 了解更多信息。
联系邮箱: info@example.com
"""
# 清洗文本
clean_text = cleaner.clean(text)
print(f"清洗后的文本:\n{clean_text}")
# 批量清洗
texts = [
"<div>第一个文本</div>",
"<span>第二个文本</span>",
"<p>第三个文本</p>"
]
clean_texts = cleaner.batch_clean(texts)
for i, clean_text in enumerate(clean_texts):
print(f"文本{i+1}: {clean_text}")
2.2 分块处理¶
技术原理: 将长文本分割成语义完整的块,每个块包含足够的上下文信息,便于检索和理解。
分块策略: 1. 固定大小分块:按字符数或token数分割 2. 语义分块:基于段落、句子等语义单元分割 3. 重叠分块:块之间有重叠,保持上下文连续性
代码示例 - 分块处理:
from typing import List, Dict
import re
class TextChunker:
"""文本分块器"""
def __init__(self):
pass
def chunk_by_size(self, text: str, chunk_size: int = 500,
overlap: int = 50) -> List[str]:
"""
按大小分块
Args:
text: 待分块文本
chunk_size: 块大小(字符数)
overlap: 重叠大小
"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
# 移动起始位置,考虑重叠
start = end - overlap
return chunks
def chunk_by_sentence(self, text: str, max_sentences: int = 5,
overlap_sentences: int = 1) -> List[str]:
"""
按句子分块
Args:
text: 待分块文本
max_sentences: 每块最大句子数
overlap_sentences: 重叠句子数
"""
# 分割句子
sentences = re.split(r'(?<=[.!?。!?])\s+', text)
chunks = []
start = 0
while start < len(sentences):
end = start + max_sentences
chunk_sentences = sentences[start:end]
chunk = ' '.join(chunk_sentences)
chunks.append(chunk)
# 移动起始位置,考虑重叠
start = end - overlap_sentences
return chunks
def chunk_by_paragraph(self, text: str, max_paragraphs: int = 3,
overlap_paragraphs: int = 1) -> List[str]:
"""
按段落分块
Args:
text: 待分块文本
max_paragraphs: 每块最大段落数
overlap_paragraphs: 重叠段落数
"""
# 分割段落
paragraphs = re.split(r'\n\s*\n', text.strip())
chunks = []
start = 0
while start < len(paragraphs):
end = start + max_paragraphs
chunk_paragraphs = paragraphs[start:end]
chunk = '\n\n'.join(chunk_paragraphs)
chunks.append(chunk)
# 移动起始位置,考虑重叠
start = end - overlap_paragraphs
return chunks
def chunk_semantic(self, text: str, max_chunk_size: int = 1000,
min_chunk_size: int = 200) -> List[str]:
"""
语义分块(基于段落和句子)
Args:
text: 待分块文本
max_chunk_size: 最大块大小
min_chunk_size: 最小块大小
"""
chunks = []
current_chunk = ""
# 分割段落
paragraphs = re.split(r'\n\s*\n', text.strip())
for paragraph in paragraphs:
# 检查是否需要开始新块
if len(current_chunk) + len(paragraph) > max_chunk_size:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = ""
# 添加段落
if current_chunk:
current_chunk += "\n\n" + paragraph
else:
current_chunk = paragraph
# 添加最后一个块
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def chunk_with_metadata(self, text: str, chunk_size: int = 500,
overlap: int = 50) -> List[Dict]:
"""
带元数据的分块
Args:
text: 待分块文本
chunk_size: 块大小
overlap: 重叠大小
"""
chunks = self.chunk_by_size(text, chunk_size, overlap)
chunks_with_metadata = []
for i, chunk in enumerate(chunks): # enumerate同时获取索引和值
chunks_with_metadata.append({
"chunk_id": i,
"chunk_text": chunk,
"chunk_size": len(chunk),
"chunk_index": i,
"total_chunks": len(chunks)
})
return chunks_with_metadata
# 使用示例
if __name__ == "__main__":
chunker = TextChunker()
# 示例文本
text = """
人工智能(AI)是计算机科学的一个分支,致力于创建能够执行通常需要人类智能的任务的系统。
机器学习是AI的一个子集,它使计算机能够从数据中学习,而无需显式编程。
深度学习是机器学习的一种特殊形式,它使用多层神经网络来模拟人脑的结构。
自然语言处理(NLP)是AI的另一个重要分支,它使计算机能够理解、解释和生成人类语言。
计算机视觉则使计算机能够从图像和视频中提取有意义的信息。
AI的应用非常广泛,包括医疗诊断、自动驾驶、金融分析、智能客服等。
随着技术的发展,AI将在更多领域发挥重要作用。
"""
# 按大小分块
chunks = chunker.chunk_by_size(text, chunk_size=200, overlap=30)
print(f"按大小分块(共{len(chunks)}块):")
for i, chunk in enumerate(chunks):
print(f"\n块{i+1}: {chunk[:100]}...")
# 按句子分块
chunks = chunker.chunk_by_sentence(text, max_sentences=3, overlap_sentences=1)
print(f"\n按句子分块(共{len(chunks)}块):")
for i, chunk in enumerate(chunks):
print(f"\n块{i+1}: {chunk[:100]}...")
# 带元数据的分块
chunks = chunker.chunk_with_metadata(text, chunk_size=200, overlap=30)
print(f"\n带元数据的分块:")
for chunk in chunks:
print(f"块{chunk['chunk_id']}: 大小={chunk['chunk_size']}")
3. 知识库管理¶
3.1 创建知识库¶
代码示例 - 完整的知识库管理:
import requests
from typing import List, Dict, Optional
class KnowledgeBaseManager:
"""知识库管理器"""
def __init__(self, api_key):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
self.base_url = "https://api.dify.ai/v1"
def create_dataset(self, name: str, description: str = "",
permission: str = "only_me") -> Dict:
"""
创建知识库
Args:
name: 知识库名称
description: 描述
permission: 权限 (only_me/all_team_members/all_users)
"""
url = f"{self.base_url}/datasets"
payload = {
"name": name,
"description": description,
"permission": permission,
"data_source_type": "upload_file"
}
try: # try/except捕获异常
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def list_datasets(self, page: int = 1, limit: int = 20) -> Dict:
"""列出知识库"""
url = f"{self.base_url}/datasets"
params = {
"page": page,
"limit": limit
}
try:
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def get_dataset(self, dataset_id: str) -> Dict:
"""获取知识库详情"""
url = f"{self.base_url}/datasets/{dataset_id}"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def update_dataset(self, dataset_id: str, name: str = None,
description: str = None) -> Dict:
"""
更新知识库信息
注意:Dify不提供通过API更新知识库元信息的接口
知识库的名称、描述等元信息需在Dify Web界面修改
"""
print(f"请在Dify Web界面修改知识库信息: dataset_id={dataset_id}")
return {"message": "请在Dify Web界面修改知识库信息"}
def delete_dataset(self, dataset_id: str) -> Dict:
"""删除知识库"""
url = f"{self.base_url}/datasets/{dataset_id}"
try:
response = requests.delete(url, headers=self.headers)
response.raise_for_status()
return {"message": "删除成功"}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def list_documents(self, dataset_id: str, page: int = 1,
limit: int = 20) -> Dict:
"""列出知识库中的文档"""
url = f"{self.base_url}/datasets/{dataset_id}/documents"
params = {
"page": page,
"limit": limit
}
try:
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def get_document(self, dataset_id: str, document_id: str) -> Dict:
"""获取文档详情"""
url = f"{self.base_url}/datasets/{dataset_id}/documents/{document_id}"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def delete_document(self, dataset_id: str, document_id: str) -> Dict:
"""删除文档"""
url = f"{self.base_url}/datasets/{dataset_id}/documents/{document_id}"
try:
response = requests.delete(url, headers=self.headers)
response.raise_for_status()
return {"message": "删除成功"}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def update_indexing(self, dataset_id: str,
indexing_technique: str = "high_quality") -> Dict:
"""
更新索引技术
注意:Dify不提供通过API切换索引技术的接口
索引技术在创建知识库时指定,后续可在Web界面管理
支持的索引技术:
- high_quality: 高质量(使用嵌入模型向量化)
- economy: 经济型(使用关键词索引)
"""
print(f"请在Dify Web界面管理知识库索引: dataset_id={dataset_id}")
return {"message": "索引技术在创建知识库时指定,后续在Web界面管理"}
# 使用示例
if __name__ == "__main__":
manager = KnowledgeBaseManager(api_key="your_api_key_here")
# 创建知识库
dataset = manager.create_dataset(
name="技术文档",
description="存储技术相关文档",
permission="only_me"
)
print(f"知识库创建成功!ID: {dataset.get('id')}")
# 列出所有知识库
datasets = manager.list_datasets()
print(f"共有 {len(datasets.get('data', []))} 个知识库")
# 获取知识库详情
if "id" in dataset:
detail = manager.get_dataset(dataset["id"])
print(f"知识库详情: {detail}")
# 列出文档
if "id" in dataset:
documents = manager.list_documents(dataset["id"])
print(f"知识库中有 {len(documents.get('data', []))} 个文档")
3.2 检索策略¶
3.2.1 向量检索¶
技术原理: 将文本转换为向量表示,通过计算向量相似度进行检索。
代码示例 - 向量检索配置:
class VectorRetrievalConfig:
"""向量检索配置"""
@staticmethod
def create_config(top_k: int = 5, score_threshold: float = 0.5,
reranking: bool = False) -> Dict:
"""
创建向量检索配置
Args:
top_k: 返回结果数量
score_threshold: 相似度阈值
reranking: 是否重排序
"""
return {
"retrieval_mode": "single",
"top_k": top_k,
"score_threshold": score_threshold,
"reranking_enable": reranking,
"reranking_model": {
"reranking_provider": "cohere",
"reranking_model_name": "rerank-english-v2.0"
} if reranking else None
}
# 使用示例
config = VectorRetrievalConfig.create_config(
top_k=10,
score_threshold=0.6,
reranking=True
)
print(f"向量检索配置: {config}")
3.2.2 关键词检索¶
技术原理: 基于关键词匹配进行检索,适合精确匹配场景。
代码示例 - 关键词检索配置:
class KeywordRetrievalConfig:
"""关键词检索配置"""
@staticmethod
def create_config(top_k: int = 5,
match_type: str = "fuzzy") -> Dict:
"""
创建关键词检索配置
Args:
top_k: 返回结果数量
match_type: 匹配类型 (fuzzy/exact)
"""
return {
"retrieval_mode": "single",
"top_k": top_k,
"keyword_search_config": {
"keyword_search_type": match_type
}
}
# 使用示例
config = KeywordRetrievalConfig.create_config(
top_k=10,
match_type="fuzzy"
)
print(f"关键词检索配置: {config}")
3.2.3 混合检索¶
技术原理: 结合向量检索和关键词检索,平衡语义理解和精确匹配。
代码示例 - 混合检索配置:
class HybridRetrievalConfig:
"""混合检索配置"""
@staticmethod # @staticmethod静态方法,不需要实例
def create_config(top_k: int = 5, score_threshold: float = 0.5,
vector_weight: float = 0.7,
keyword_weight: float = 0.3,
reranking: bool = False) -> Dict:
"""
创建混合检索配置
Args:
top_k: 返回结果数量
score_threshold: 相似度阈值
vector_weight: 向量检索权重
keyword_weight: 关键词检索权重
reranking: 是否重排序
"""
return {
"retrieval_mode": "multiple",
"top_k": top_k,
"score_threshold": score_threshold,
"reranking_enable": reranking,
"reranking_model": {
"reranking_provider": "cohere",
"reranking_model_name": "rerank-english-v2.0"
} if reranking else None,
"weights": {
"vector_weight": vector_weight,
"keyword_weight": keyword_weight
}
}
# 使用示例
config = HybridRetrievalConfig.create_config(
top_k=10,
score_threshold=0.6,
vector_weight=0.8,
keyword_weight=0.2,
reranking=True
)
print(f"混合检索配置: {config}")
4. 练习题¶
基础练习¶
- 创建知识库
- 上传文档
- 配置参数
- 测试检索
进阶练习¶
- 构建企业知识库
- 集成多种数据源
- 优化检索策略
-
实现智能问答
-
开发文档管理系统
- 批量导入文档
- 自动分类标签
- 智能检索
5. 最佳实践¶
✅ 推荐做法¶
- 数据质量
- 清洗数据
- 去除噪声
-
标准化格式
-
分块策略
- 合理分块
- 保持语义完整
-
避免重叠过多
-
检索优化
- 选择合适的检索策略
- 调整相似度阈值
- 使用重排序提高质量
❌ 避免做法¶
- 数据过多
- 选择相关数据
- 避免冗余
-
定期清理
-
分块不当
- 避免过小或过大的块
- 保持语义完整性
- 考虑上下文
6. 常见问题¶
Q1: 如何选择合适的分块大小?¶
A: 选择建议: - 短文本:200-500字符 - 中等文本:500-1000字符 - 长文本:1000-2000字符 - 考虑重叠:通常重叠10-20%
Q2: 如何提高检索准确率?¶
A: 优化方法: - 使用混合检索 - 调整权重比例 - 启用重排序 - 优化相似度阈值
Q3: 如何处理多语言数据?¶
A: 处理方法: - 按语言分知识库 - 使用多语言嵌入模型 - 添加语言标签 - 分别配置检索策略
7. 总结¶
本章深入介绍了数据源集成的核心内容,包括:
- 数据源类型:文本数据、结构化数据
- 数据预处理:文本清洗、分块处理
- 知识库管理:创建、查询、更新、删除
- 检索策略:向量检索、关键词检索、混合检索
通过本章的学习,你应该能够熟练集成各种数据源到Dify应用了。
8. 下一步¶
继续学习05-模型配置与优化,深入了解模型配置和优化技巧。
最后更新日期:2026-02-12 适用版本:Dify实战教程 v2026