03 - 工作流设计¶
工作流概念、节点类型、连接方式
📖 章节概述¶
本章将深入探讨Dify工作流的设计方法,包括工作流的核心概念、各种节点类型的高级用法、连接方式的设计模式、性能优化技巧以及实际应用场景。通过详细的代码示例和实践指导,帮助读者掌握复杂工作流的设计和实现。
🎯 学习目标¶
完成本章后,你将能够:
- 深入理解工作流的核心概念和架构原理
- 熟练掌握各种节点类型的高级配置
- 设计高效、可维护的复杂工作流
- 掌握工作流性能优化的方法
- 理解工作流的最佳实践和设计模式
- 能够解决实际业务中的复杂问题
1. 工作流概念深度解析¶
1.1 工作流架构原理¶
技术架构: Dify工作流采用DAG(有向无环图)架构,这是一种数据流处理模型,具有以下特点:
- 节点(Node):代表一个独立的处理单元
- 边(Edge):代表数据流向和依赖关系
- 无环(Acyclic):确保流程的可预测性和终止性
- 并行执行:支持多个节点并行处理
核心组件:
Python
class WorkflowGraph:
"""工作流图数据结构"""
def __init__(self):
self.nodes = {} # 节点字典 {node_id: node_data}
self.edges = [] # 边列表
self.adjacency = {} # 邻接表
def add_node(self, node_id, node_type, node_data):
"""添加节点"""
self.nodes[node_id] = {
"id": node_id,
"type": node_type,
"data": node_data
}
self.adjacency[node_id] = []
def add_edge(self, source_id, target_id, edge_type="sequential"):
"""添加边"""
self.edges.append({
"id": f"{source_id}-{target_id}",
"source": source_id,
"target": target_id,
"type": edge_type
})
self.adjacency[source_id].append(target_id)
def validate(self):
"""验证工作流是否为DAG"""
# 使用DFS(深度优先搜索)检测有向图中是否存在环
visited = set()
recursion_stack = set() # 追踪当前递归路径上的节点,用于检测回边
def has_cycle(node_id):
visited.add(node_id)
recursion_stack.add(node_id)
for neighbor in self.adjacency.get(node_id, []):
if neighbor not in visited:
if has_cycle(neighbor):
return True
elif neighbor in recursion_stack:
return True
recursion_stack.remove(node_id)
return False
for node_id in self.nodes:
if node_id not in visited:
if has_cycle(node_id):
return False, "工作流存在循环依赖"
return True, "工作流验证通过"
def topological_sort(self):
"""拓扑排序"""
# 基于Kahn算法:反复移除入度为0的节点,确定工作流节点的执行顺序
in_degree = {node_id: 0 for node_id in self.nodes}
# 计算入度
for edge in self.edges:
in_degree[edge["target"]] += 1
# 找到入度为0的节点
queue = [node_id for node_id, degree in in_degree.items() if degree == 0]
result = []
while queue:
node_id = queue.pop(0)
result.append(node_id)
for neighbor in self.adjacency.get(node_id, []):
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(result) != len(self.nodes):
return None, "工作流存在循环"
return result, "排序成功"
# 使用示例
if __name__ == "__main__":
graph = WorkflowGraph()
# 添加节点:每个节点有唯一ID、类型(start/llm/end等)和配置数据
graph.add_node("start", "start", {"title": "开始"})
graph.add_node("process", "llm", {"title": "处理"})
graph.add_node("end", "end", {"title": "结束"})
# 添加边:定义节点间的数据流向(source → target)
graph.add_edge("start", "process")
graph.add_edge("process", "end")
# 验证DAG合法性(无环检测)
is_valid, message = graph.validate()
print(f"验证结果: {is_valid}, {message}")
# 拓扑排序:确定节点执行的先后顺序
order, message = graph.topological_sort()
print(f"执行顺序: {order}")
1.2 工作流类型详解¶
1.2.1 顺序工作流¶
特点:节点按顺序执行,每个节点的输出作为下一个节点的输入
应用场景:简单的线性处理流程
代码示例:
Python
class SequentialWorkflow:
"""顺序工作流"""
def __init__(self, api_key):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def build(self):
"""构建顺序工作流"""
# 四阶段流水线:开始 → 预处理(代码节点) → LLM分析 → 格式化(代码节点) → 结束
return {
"graph": {
"nodes": [
{
"id": "start",
"type": "start", # 开始节点:定义工作流的输入变量
"data": {
"title": "开始",
"variables": [
{
"variable": "input_text",
"label": "输入文本",
"type": "paragraph",
"required": True
}
]
}
},
{
"id": "preprocess",
"type": "code", # 代码节点:运行在Dify安全沙箱中的Python代码
"data": {
"title": "预处理",
"code": """
def main(input_text: str) -> dict: # 代码节点入口函数,参数名需与上游变量匹配
# 文本清洗
cleaned = input_text.strip().lower()
# 分词
words = cleaned.split()
return {
"cleaned_text": cleaned,
"word_count": len(words),
"words": words
}
""",
"outputs": [
{"variable": "cleaned_text", "type": "string"},
{"variable": "word_count", "type": "number"},
{"variable": "words", "type": "array"}
]
}
},
{
"id": "analyze",
"type": "llm", # LLM节点:调用大语言模型进行推理
"data": {
"title": "分析",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat" # chat模式支持多轮对话格式的prompt
},
"prompt_template": [
{
"role": "system",
"text": "你是一个文本分析专家。"
},
{
"role": "user",
# Dify变量引用语法:{{#节点ID.变量名#}},引用上游节点的输出
"text": """请分析以下文本:
文本:{{#preprocess.cleaned_text#}}
词数:{{#preprocess.word_count#}}
请提供:
1. 情感倾向(正面/负面/中性)
2. 关键词提取
3. 主题分类"""
}
]
}
},
{
"id": "format",
"type": "code",
"data": {
"title": "格式化",
"code": """
import json
def main(llm_output: str) -> dict:
# 尝试解析JSON
try:
result = json.loads(llm_output)
except:
result = {"raw_output": llm_output}
return result
""",
"outputs": [
{"variable": "formatted_result", "type": "object"}
]
}
},
{
"id": "end",
"type": "end", # 结束节点:定义工作流的最终输出
"data": {
"title": "结束",
"outputs": [
{
"value_selector": ["format", "formatted_result"], # [节点ID, 变量名] 选择上游输出
"variable": "result"
}
]
}
}
],
# 边定义数据流向:source节点的输出传递给target节点作为输入
"edges": [
{"id": "start-preprocess", "source": "start", "target": "preprocess"},
{"id": "preprocess-analyze", "source": "preprocess", "target": "analyze"},
{"id": "analyze-format", "source": "analyze", "target": "format"},
{"id": "format-end", "source": "format", "target": "end"}
]
}
}
1.2.2 分支工作流¶
特点:根据条件选择不同的执行路径
应用场景:需要根据输入或中间结果进行决策的场景
代码示例:
Python
class BranchingWorkflow:
"""分支工作流"""
def __init__(self, api_key):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def build(self):
"""构建分支工作流"""
# 根据query_type条件路由到不同处理器:知识问答/代码生成/翻译/总结
return {
"graph": {
"nodes": [
{
"id": "start",
"type": "start",
"data": {
"title": "开始",
"variables": [
{
"variable": "user_query",
"label": "用户查询",
"type": "paragraph",
"required": True
},
{
"variable": "query_type",
"label": "查询类型",
"type": "select",
"required": True,
"options": ["知识问答", "代码生成", "翻译", "总结"]
}
]
}
},
{
"id": "classify",
"type": "if-else", # 条件分支节点:根据条件将流程路由到不同分支
"data": {
"title": "分类",
"cases": [ # 每个case定义一个分支及其匹配条件
{
"case_id": "qa",
"conditions": [
{
"variable_selector": ["start", "query_type"],
"comparison_operator": "is",
"value": "知识问答"
}
],
"logical_operator": "and"
},
{
"case_id": "code",
"conditions": [
{
"variable_selector": ["start", "query_type"],
"comparison_operator": "is",
"value": "代码生成"
}
],
"logical_operator": "and"
},
{
"case_id": "translate",
"conditions": [
{
"variable_selector": ["start", "query_type"],
"comparison_operator": "is",
"value": "翻译"
}
],
"logical_operator": "and"
},
{
"case_id": "summarize",
"conditions": [
{
"variable_selector": ["start", "query_type"],
"comparison_operator": "is",
"value": "总结"
}
],
"logical_operator": "and"
}
]
}
},
{
"id": "qa_handler",
"type": "knowledge-retrieval", # 知识库检索节点:从已导入的数据集中检索相关内容
"data": {
"title": "知识问答处理",
"dataset_ids": ["qa_dataset_id"], # 关联的知识库ID列表
"retrieval_mode": "multiple", # multiple模式:从多个数据集联合检索
"multiple_retrieval_config": {
"top_k": 3, # 返回最相关的前3条结果
"score_threshold": 0.5 # 相似度阈值,低于此值的结果被过滤
}
}
},
{
"id": "code_handler",
"type": "llm",
"data": {
"title": "代码生成处理",
"model": {
"provider": "openai",
"name": "gpt-4o",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个专业的程序员,擅长编写高质量代码。"
},
{
"role": "user",
"text": "请为以下需求编写代码:\n{{#start.user_query#}}\n\n要求:\n1. 代码清晰易读\n2. 添加注释\n3. 包含错误处理"
}
]
}
},
{
"id": "translate_handler",
"type": "llm",
"data": {
"title": "翻译处理",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个专业的翻译,擅长中英互译。"
},
{
"role": "user",
"text": "请翻译以下文本:\n{{#start.user_query#}}"
}
]
}
},
{
"id": "summarize_handler",
"type": "llm",
"data": {
"title": "总结处理",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个专业的文本总结专家。"
},
{
"role": "user",
"text": "请总结以下文本:\n{{#start.user_query#}}\n\n要求:\n1. 提取关键信息\n2. 保持简洁\n3. 结构清晰"
}
]
}
},
{
"id": "end",
"type": "end",
"data": {
"title": "结束",
"outputs": [
{
"value_selector": ["qa_handler", "context"],
"variable": "qa_result"
},
{
"value_selector": ["code_handler", "text"],
"variable": "code_result"
},
{
"value_selector": ["translate_handler", "text"],
"variable": "translate_result"
},
{
"value_selector": ["summarize_handler", "text"],
"variable": "summarize_result"
}
]
}
}
],
# sourceHandle指定if-else节点的具体分支输出,对应cases中的case_id
"edges": [
{"id": "start-classify", "source": "start", "target": "classify"},
{"id": "classify-qa", "source": "classify", "target": "qa_handler", "sourceHandle": "qa"},
{"id": "classify-code", "source": "classify", "target": "code_handler", "sourceHandle": "code"},
{"id": "classify-translate", "source": "classify", "target": "translate_handler", "sourceHandle": "translate"},
{"id": "classify-summarize", "source": "classify", "target": "summarize_handler", "sourceHandle": "summarize"},
{"id": "qa-end", "source": "qa_handler", "target": "end"},
{"id": "code-end", "source": "code_handler", "target": "end"},
{"id": "translate-end", "source": "translate_handler", "target": "end"},
{"id": "summarize-end", "source": "summarize_handler", "target": "end"}
]
}
}
1.2.3 并行工作流¶
特点:多个节点同时执行,最后合并结果
应用场景:需要同时处理多个独立任务的场景
代码示例:
Python
class ParallelWorkflow:
"""并行工作流"""
def __init__(self, api_key):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def build(self):
"""构建并行工作流"""
# 扇出-扇入模式:开始节点同时触发多个并行LLM任务,最后由代码节点合并结果
return {
"graph": {
"nodes": [
{
"id": "start",
"type": "start",
"data": {
"title": "开始",
"variables": [
{
"variable": "text",
"label": "待分析文本",
"type": "paragraph",
"required": True
}
]
}
},
{
"id": "sentiment",
"type": "llm",
"data": {
"title": "情感分析",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个情感分析专家。"
},
{
"role": "user",
"text": "请分析以下文本的情感倾向(正面/负面/中性):\n{{#start.text#}}\n\n只返回情感倾向。"
}
]
}
},
{
"id": "keywords",
"type": "llm",
"data": {
"title": "关键词提取",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个关键词提取专家。"
},
{
"role": "user",
"text": "请从以下文本中提取5个关键词:\n{{#start.text#}}\n\n以JSON格式返回:{\"keywords\": [\"关键词1\", \"关键词2\", ...]}"
}
]
}
},
{
"id": "summary",
"type": "llm",
"data": {
"title": "文本摘要",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个文本摘要专家。"
},
{
"role": "user",
"text": "请为以下文本生成一个简短摘要(不超过100字):\n{{#start.text#}}"
}
]
}
},
{
"id": "merge",
"type": "code",
"data": {
"title": "合并结果",
"code": """
import json
def main(sentiment: str, keywords: str, summary: str) -> dict:
# 解析关键词
try: # try/except捕获异常
keywords_data = json.loads(keywords)
keyword_list = keywords_data.get("keywords", [])
except:
keyword_list = []
# 合并结果
result = {
"sentiment": sentiment.strip(),
"keywords": keyword_list,
"summary": summary.strip()
}
return result
""",
"outputs": [
{"variable": "merged_result", "type": "object"}
]
}
},
{
"id": "end",
"type": "end",
"data": {
"title": "结束",
"outputs": [
{
"value_selector": ["merge", "merged_result"],
"variable": "analysis_result"
}
]
}
}
],
# 并行扇出:start同时连接3个LLM节点并行执行;扇入:3个节点汇聚到merge
"edges": [
{"id": "start-sentiment", "source": "start", "target": "sentiment"},
{"id": "start-keywords", "source": "start", "target": "keywords"},
{"id": "start-summary", "source": "start", "target": "summary"},
{"id": "sentiment-merge", "source": "sentiment", "target": "merge"},
{"id": "keywords-merge", "source": "keywords", "target": "merge"},
{"id": "summary-merge", "source": "summary", "target": "merge"},
{"id": "merge-end", "source": "merge", "target": "end"}
]
}
}
1.2.4 循环工作流¶
特点:重复执行某些节点,直到满足条件
应用场景:需要迭代处理的场景
代码示例:
Python
class LoopWorkflow:
"""循环工作流"""
def __init__(self, api_key):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def build(self):
"""构建循环工作流"""
# 迭代优化模式:初始化 → 条件检查 → LLM处理 → 状态更新,直到达到最大迭代次数
return {
"graph": {
"nodes": [
{
"id": "start",
"type": "start",
"data": {
"title": "开始",
"variables": [
{
"variable": "target_text",
"label": "目标文本",
"type": "paragraph",
"required": True
},
{
"variable": "max_iterations",
"label": "最大迭代次数",
"type": "number",
"default": 5,
"required": True
}
]
}
},
{
"id": "initialize",
"type": "code",
"data": {
"title": "初始化",
"code": """
def main(target_text: str, max_iterations: int) -> dict:
# 初始化迭代状态:当前文本、迭代计数器、完成标志
return {
"current_text": target_text,
"iteration": 0,
"max_iterations": max_iterations,
"is_complete": False
}
""", # outputs定义代码节点向下游暴露的变量
"outputs": [
{"variable": "current_text", "type": "string"},
{"variable": "iteration", "type": "number"},
{"variable": "max_iterations", "type": "number"},
{"variable": "is_complete", "type": "boolean"}
]
}
},
{
"id": "check_condition",
"type": "if-else",
"data": {
"title": "检查条件",
"cases": [
{
"case_id": "continue",
"conditions": [
{
"variable_selector": ["initialize", "is_complete"],
"comparison_operator": "is",
"value": False
},
{
"variable_selector": ["initialize", "iteration"],
"comparison_operator": "<",
"value": "{{#initialize.max_iterations#}}"
}
],
"logical_operator": "and"
},
{
"case_id": "break",
"conditions": [],
"logical_operator": "and"
}
]
}
},
{
"id": "process",
"type": "llm",
"data": {
"title": "处理文本",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个文本优化专家。"
},
{
"role": "user",
"text": "请优化以下文本,使其更加简洁清晰:\n{{#initialize.current_text#}}\n\n只返回优化后的文本,不要添加其他说明。"
}
]
}
},
{
"id": "update",
"type": "code",
"data": {
"title": "更新状态",
"code": """
def main(processed_text: str, iteration: int, max_iterations: int) -> dict:
# 更新迭代状态:递增计数器,判断是否达到终止条件
new_iteration = iteration + 1
is_complete = new_iteration >= max_iterations # 达到最大次数时标记完成
return {
"current_text": processed_text,
"iteration": new_iteration,
"max_iterations": max_iterations,
"is_complete": is_complete
}
""",
"outputs": [
{"variable": "current_text", "type": "string"},
{"variable": "iteration", "type": "number"},
{"variable": "max_iterations", "type": "number"},
{"variable": "is_complete", "type": "boolean"}
]
}
},
{
"id": "end",
"type": "end",
"data": {
"title": "结束",
"outputs": [
{
"value_selector": ["initialize", "current_text"],
"variable": "final_text"
},
{
"value_selector": ["initialize", "iteration"],
"variable": "total_iterations"
}
]
}
}
],
"edges": [
{"id": "start-initialize", "source": "start", "target": "initialize"},
{"id": "initialize-check", "source": "initialize", "target": "check_condition"},
{"id": "check-process", "source": "check_condition", "target": "process", "sourceHandle": "continue"},
{"id": "check-end", "source": "check_condition", "target": "end", "sourceHandle": "break"},
{"id": "process-update", "source": "process", "target": "update"},
{"id": "update-end", "source": "update", "target": "end"}
]
# 注意:Dify工作流基于DAG(有向无环图),不支持循环边
# 要实现迭代处理,请使用Dify内置的“迭代”节点类型
}
}
2. 节点类型高级用法¶
2.1 LLM节点高级配置¶
2.1.1 上下文管理¶
Python
def create_llm_with_context(node_id, context_config):
"""创建带上下文的LLM节点"""
return {
"id": node_id,
"type": "llm",
"data": {
"title": "LLM处理",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个专业的助手。"
},
{
"role": "user",
"text": "{{#user_input#}}"
}
],
# 上下文配置:将知识库检索结果等外部信息注入LLM提示词
"context": {
"enabled": True,
"variable_selector": context_config["variables"], # 关联的上下文变量(如知识库检索结果)
"max_tokens": context_config.get("max_tokens", 2000), # 上下文最大token数
"max_segments": context_config.get("max_segments", 10) # 最大检索片段数
},
# 对话记忆配置:仅在Chatflow类型工作流中生效
"memory": {
"enabled": context_config.get("memory_enabled", False),
"window": {
"enabled": True,
"size": context_config.get("memory_size", 10) # 保留最近N轮对话历史
}
}
}
}
# 使用示例
llm_node = create_llm_with_context(
node_id="llm_with_context",
context_config={
"variables": ["kb_1.context", "user_history"],
"max_tokens": 3000,
"max_segments": 15,
"memory_enabled": True,
"memory_size": 5
}
)
2.1.2 多模型切换¶
Python
def create_multi_model_llm(node_id, model_fallback):
"""创建多模型切换的LLM节点"""
return {
"id": node_id,
"type": "llm",
"data": {
"title": "多模型LLM",
"model": {
"provider": model_fallback["primary"]["provider"],
"name": model_fallback["primary"]["name"],
"mode": "chat",
"completion_params": {
"temperature": 0.7,
"max_tokens": 2048
}
},
"fallback_model": model_fallback["fallback"][0] if model_fallback["fallback"] else None,
# 注意:Dify LLM节点不直接支持fallback_models列表
# 可以通过条件分支+多个LLM节点实现模型回退逻辑
"prompt_template": [
{
"role": "system",
"text": "你是一个专业的助手。"
},
{
"role": "user",
"text": "{{#user_input#}}"
}
]
}
}
# 使用示例
llm_node = create_multi_model_llm(
node_id="multi_model_llm",
model_fallback={
"primary": {
"provider": "openai",
"name": "gpt-4o"
},
"fallback": [
{
"provider": "openai",
"name": "gpt-4o-mini"
},
{
"provider": "anthropic",
"name": "claude-3-sonnet-20240229"
}
]
}
)
2.2 知识库节点高级配置¶
2.2.1 混合检索策略¶
Python
def create_hybrid_retrieval(node_id, retrieval_config):
"""创建混合检索节点"""
# 混合检索 = 向量语义检索 + 关键词检索,配合重排序模型提升检索质量
return {
"id": node_id,
"type": "knowledge-retrieval",
"data": {
"title": "混合检索",
"dataset_ids": retrieval_config["dataset_ids"],
"retrieval_mode": "multiple", # single=单数据集检索, multiple=多数据集联合检索
"multiple_retrieval_config": {
"top_k": retrieval_config.get("top_k", 5),
"score_threshold": retrieval_config.get("score_threshold", 0.5),
"reranking_enable": retrieval_config.get("reranking_enable", True), # 启用重排序优化结果排名
"reranking_model": {
"reranking_provider": "cohere",
"reranking_model_name": "rerank-multilingual-v3.0" # 多语言重排序模型
},
# 混合检索权重:语义相似度与关键词精确匹配的权重分配
"weights": {
"vector_weight": retrieval_config.get("vector_weight", 0.7), # 向量语义检索权重
"keyword_weight": retrieval_config.get("keyword_weight", 0.3) # 关键词匹配权重
}
}
}
}
# 使用示例
kb_node = create_hybrid_retrieval(
node_id="hybrid_kb",
retrieval_config={
"dataset_ids": ["dataset_1", "dataset_2"],
"top_k": 10,
"score_threshold": 0.6,
"reranking_enable": True,
"vector_weight": 0.8,
"keyword_weight": 0.2
}
)
2.2.2 动态过滤¶
Python
def create_filtered_retrieval(node_id, filter_config):
"""创建带过滤的检索节点"""
# 基于元数据(metadata)过滤检索,可按分类、语言、日期等条件筛选文档
return {
"id": node_id,
"type": "knowledge-retrieval",
"data": {
"title": "过滤检索",
"dataset_ids": filter_config["dataset_ids"],
"retrieval_mode": "multiple",
"multiple_retrieval_config": {
"top_k": filter_config.get("top_k", 5),
"score_threshold": filter_config.get("score_threshold", 0.5),
"filtering": {
"filter_by": filter_config["filter_by"],
"filters": filter_config["filters"]
}
}
}
}
# 使用示例
kb_node = create_filtered_retrieval(
node_id="filtered_kb",
filter_config={
"dataset_ids": ["dataset_1"],
"top_k": 5,
"score_threshold": 0.6,
"filter_by": "metadata",
# 元数据过滤条件:支持精确匹配和比较运算符
"filters": [
{
"key": "category",
"value": "技术文档" # 精确匹配文档分类
},
{
"key": "language",
"value": "中文"
},
{
"key": "date",
"operator": ">=", # 支持 >=, <=, >, < 等比较运算符
"value": "2024-01-01"
}
]
}
)
2.3 代码节点高级用法¶
2.3.1 外部库调用¶
Python
def create_code_with_libraries(node_id, code_config):
"""创建可调用外部库的代码节点"""
return {
"id": node_id,
"type": "code",
"data": {
"title": "代码执行",
"code": code_config["code"],
"outputs": code_config["outputs"],
"environment_variables": code_config.get("environment_variables", {}),
"timeout": code_config.get("timeout", 30)
}
}
# 使用示例 - 基本数据处理(Dify沙箱环境支持的操作)
code_node = create_code_with_libraries(
node_id="data_processing",
code_config={
"code": """
import json
def main(data: str, operation: str) -> dict:
# 解析输入数据
# 注意:Dify代码节点运行在安全沙箱中
# 不支持numpy、pandas等外部库,不支持文件I/O操作
# 可使用的标准库:json, math, re, datetime等
data_dict = json.loads(data)
result = {}
values = list(data_dict.values())
if operation == "mean":
# 手动计算平均值
if values and all(isinstance(v, (int, float)) for v in values): # isinstance检查对象类型
result = {"mean": sum(values) / len(values)}
elif operation == "sum":
if values and all(isinstance(v, (int, float)) for v in values):
result = {"sum": sum(values)}
elif operation == "count":
result = {"count": len(data_dict)}
return {
"result": result,
"keys": list(data_dict.keys()),
"count": len(data_dict)
}
""",
"outputs": [
{"variable": "result", "type": "object"},
{"variable": "keys", "type": "array"},
{"variable": "count", "type": "number"}
],
"timeout": 60
}
)
2.3.2 外部服务调用¶
Python
def create_http_request_node(node_id, http_config):
"""
创建HTTP请求节点
注意:Dify代码节点运行在安全沙箱中,不支持文件I/O操作
如需调用外部服务,应使用HTTP请求节点而非代码节点
"""
return {
"id": node_id,
"type": "http-request", # Dify内置HTTP请求节点,用于调用外部REST API
"data": {
"title": "外部服务调用",
"method": http_config.get("method", "POST"),
"url": http_config["url"], # URL支持Dify变量模板,如 {{#start.api_url#}}
"headers": http_config.get("headers", {}),
"body": http_config.get("body", {}), # 请求体支持变量引用
"timeout": http_config.get("timeout", 30) # 请求超时时间(秒)
}
}
# 使用示例
http_node = create_http_request_node(
node_id="api_call",
http_config={
"url": "https://api.example.com/process",
"method": "POST",
"headers": {"Content-Type": "application/json"},
"body": {"data": "{{#start.input_data#}}"},
"timeout": 30
}
)
3. 工作流设计模式¶
3.1 责任链模式¶
应用场景:需要多个节点依次处理数据的场景
Python
class ResponsibilityChainWorkflow:
"""责任链模式工作流"""
def build(self):
"""构建责任链工作流"""
# 责任链:验证器 → 条件判断 → 处理器1(清洗) → 处理器2(增强);验证失败走错误处理分支
return {
"graph": {
"nodes": [
{
"id": "start",
"type": "start",
"data": {
"title": "开始",
"variables": [
{
"variable": "input_data",
"label": "输入数据",
"type": "paragraph",
"required": True
}
]
}
},
{
"id": "validator",
"type": "code",
"data": {
"title": "验证器",
"code": """
def main(input_data: str) -> dict:
# 验证数据
if not input_data or len(input_data) < 10:
return {
"valid": False,
"error": "输入数据太短",
"data": None
}
return {
"valid": True,
"error": None,
"data": input_data
}
""",
"outputs": [
{"variable": "valid", "type": "boolean"},
{"variable": "error", "type": "string"},
{"variable": "data", "type": "string"}
]
}
},
{
"id": "check_valid",
"type": "if-else",
"data": {
"title": "检查验证",
"cases": [
{
"case_id": "valid",
"conditions": [
{
"variable_selector": ["validator", "valid"],
"comparison_operator": "is",
"value": True
}
],
"logical_operator": "and"
},
{
"case_id": "invalid",
"conditions": [],
"logical_operator": "and"
}
]
}
},
{
"id": "processor1",
"type": "llm",
"data": {
"title": "处理器1",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个数据清洗专家。"
},
{
"role": "user",
"text": "请清洗以下数据:\n{{#validator.data#}}"
}
]
}
},
{
"id": "processor2",
"type": "llm",
"data": {
"title": "处理器2",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个数据增强专家。"
},
{
"role": "user",
"text": "请增强以下数据:\n{{#processor1.text#}}"
}
]
}
},
{
"id": "error_handler",
"type": "code",
"data": {
"title": "错误处理",
"code": """
def main(error: str) -> dict:
return {
"status": "error",
"message": error,
"result": None
}
""",
"outputs": [
{"variable": "status", "type": "string"},
{"variable": "message", "type": "string"},
{"variable": "result", "type": "string"}
]
}
},
{
"id": "end",
"type": "end",
"data": {
"title": "结束",
"outputs": [
{
"value_selector": ["processor2", "text"],
"variable": "result"
},
{
"value_selector": ["error_handler", "status"],
"variable": "status"
}
]
}
}
],
"edges": [
{"id": "start-validator", "source": "start", "target": "validator"},
{"id": "validator-check", "source": "validator", "target": "check_valid"},
{"id": "check-processor1", "source": "check_valid", "target": "processor1", "sourceHandle": "valid"},
{"id": "check-error", "source": "check_valid", "target": "error_handler", "sourceHandle": "invalid"},
{"id": "processor1-processor2", "source": "processor1", "target": "processor2"},
{"id": "processor2-end", "source": "processor2", "target": "end"},
{"id": "error-end", "source": "error_handler", "target": "end"}
]
}
}
3.2 策略模式¶
应用场景:需要根据不同条件选择不同处理策略的场景
Python
class StrategyWorkflow:
"""策略模式工作流"""
def build(self):
"""构建策略模式工作流"""
# 策略模式:根据用户选择的策略(快速/准确/平衡)路由到不同配置的LLM节点
return {
"graph": {
"nodes": [
{
"id": "start",
"type": "start",
"data": {
"title": "开始",
"variables": [
{
"variable": "input",
"label": "输入",
"type": "paragraph",
"required": True
},
{
"variable": "strategy",
"label": "策略",
"type": "select",
"required": True,
"options": ["快速", "准确", "平衡"]
}
]
}
},
{
"id": "select_strategy",
"type": "if-else",
"data": {
"title": "选择策略",
"cases": [
{
"case_id": "fast",
"conditions": [
{
"variable_selector": ["start", "strategy"],
"comparison_operator": "is",
"value": "快速"
}
],
"logical_operator": "and"
},
{
"case_id": "accurate",
"conditions": [
{
"variable_selector": ["start", "strategy"],
"comparison_operator": "is",
"value": "准确"
}
],
"logical_operator": "and"
},
{
"case_id": "balanced",
"conditions": [
{
"variable_selector": ["start", "strategy"],
"comparison_operator": "is",
"value": "平衡"
}
],
"logical_operator": "and"
}
]
}
},
{
"id": "fast_strategy",
"type": "llm",
"data": {
"title": "快速策略",
"model": {
"provider": "openai",
"name": "gpt-4o-mini", # 轻量模型,推理速度快
"mode": "chat",
"completion_params": {
"temperature": 0.3, # 低温度 = 更确定性输出,减少推理时间
"max_tokens": 500 # 限制输出长度以加快响应
}
},
"prompt_template": [
{
"role": "system",
"text": "快速处理,简洁回答。"
},
{
"role": "user",
"text": "{{#start.input#}}"
}
]
}
},
{
"id": "accurate_strategy",
"type": "llm",
"data": {
"title": "准确策略",
"model": {
"provider": "openai",
"name": "gpt-4o", # 高性能模型,推理质量更高
"mode": "chat",
"completion_params": {
"temperature": 0.1, # 极低温度 = 最大确定性,适合精确分析任务
"max_tokens": 2000 # 允许更长输出以提供详细分析
}
},
"prompt_template": [
{
"role": "system",
"text": "仔细分析,详细回答。"
},
{
"role": "user",
"text": "{{#start.input#}}"
}
]
}
},
{
"id": "balanced_strategy",
"type": "llm",
"data": {
"title": "平衡策略",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat",
"completion_params": {
"temperature": 0.7, # 中等温度 = 兼顾创造性与准确性
"max_tokens": 1000 # 中等输出长度
}
},
"prompt_template": [
{
"role": "system",
"text": "平衡速度和准确性。"
},
{
"role": "user",
"text": "{{#start.input#}}"
}
]
}
},
{
"id": "end",
"type": "end",
"data": {
"title": "结束",
"outputs": [
{
"value_selector": ["fast_strategy", "text"],
"variable": "fast_result"
},
{
"value_selector": ["accurate_strategy", "text"],
"variable": "accurate_result"
},
{
"value_selector": ["balanced_strategy", "text"],
"variable": "balanced_result"
}
]
}
}
],
"edges": [
{"id": "start-select", "source": "start", "target": "select_strategy"},
{"id": "select-fast", "source": "select_strategy", "target": "fast_strategy", "sourceHandle": "fast"},
{"id": "select-accurate", "source": "select_strategy", "target": "accurate_strategy", "sourceHandle": "accurate"},
{"id": "select-balanced", "source": "select_strategy", "target": "balanced_strategy", "sourceHandle": "balanced"},
{"id": "fast-end", "source": "fast_strategy", "target": "end"},
{"id": "accurate-end", "source": "accurate_strategy", "target": "end"},
{"id": "balanced-end", "source": "balanced_strategy", "target": "end"}
]
}
}
4. 性能优化¶
4.1 缓存策略¶
Python
def create_cached_workflow():
"""创建带缓存的工作流"""
return {
"graph": {
"nodes": [
{
"id": "start",
"type": "start",
"data": {
"title": "开始",
"variables": [
{
"variable": "query",
"label": "查询",
"type": "paragraph",
"required": True
}
]
}
},
{
"id": "check_cache",
"type": "code",
"data": {
"title": "检查缓存",
"code": """
import hashlib
# ⚠️ 重要提示:Dify代码节点在沙盒中运行,
# 变量不会在不同的工作流运行之间持久化。
# 生产环境中应通过HTTP请求节点调用外部缓存服务
#(如Redis)来实现真正的缓存功能。
# 以下代码仅演示缓存键生成逻辑。
def main(query: str) -> dict:
# 生成缓存键
cache_key = hashlib.md5(query.encode()).hexdigest()
# 在实际应用中,这里应通过HTTP节点
# 调用Redis等外部缓存服务查询缓存
return {
"cached": False,
"result": None,
"cache_key": cache_key
}
""",
"outputs": [
{"variable": "cached", "type": "boolean"},
{"variable": "result", "type": "string"},
{"variable": "cache_key", "type": "string"}
]
}
},
{
"id": "process",
"type": "llm",
"data": {
"title": "处理",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个专业的助手。"
},
{
"role": "user",
"text": "{{#start.query#}}"
}
]
}
},
{
"id": "save_cache",
"type": "code",
"data": {
"title": "保存缓存",
"code": """
def main(cache_key: str, result: str) -> dict:
# ⚠️ 注意:此处cache_store在沙盒中不持久化
# 生产环境应通过HTTP请求节点将结果保存到Redis等外部缓存
return {
"saved": True,
"cache_key": cache_key,
"result": result
}
""",
"outputs": [
{"variable": "saved", "type": "boolean"},
{"variable": "result", "type": "string"}
]
}
},
{
"id": "end",
"type": "end",
"data": {
"title": "结束",
"outputs": [
{
"value_selector": ["check_cache", "result"],
"variable": "cached_result"
},
{
"value_selector": ["save_cache", "result"],
"variable": "processed_result"
}
]
}
}
],
"edges": [
{"id": "start-check", "source": "start", "target": "check_cache"},
{"id": "check-process", "source": "check_cache", "target": "process", "sourceHandle": "false"},
{"id": "process-save", "source": "process", "target": "save_cache"},
{"id": "check-end", "source": "check_cache", "target": "end", "sourceHandle": "true"},
{"id": "save-end", "source": "save_cache", "target": "end"}
]
}
}
4.2 批处理优化¶
Python
def create_batch_workflow():
"""创建批处理工作流"""
return {
"graph": {
"nodes": [
{
"id": "start",
"type": "start",
"data": {
"title": "开始",
"variables": [
{
"variable": "queries",
"label": "查询列表",
"type": "paragraph",
"required": True
}
]
}
},
{
"id": "parse",
"type": "code",
"data": {
"title": "解析输入",
"code": """
import json
def main(queries: str) -> dict:
# 解析JSON数组格式的查询列表
query_list = json.loads(queries) # json.loads将JSON字符串转为Python对象
# 按固定大小分批,避免单次LLM请求过大导致超时或token溢出
batch_size = 5
batches = [
query_list[i:i + batch_size] # 列表切片实现分批
for i in range(0, len(query_list), batch_size)
]
return {
"batches": batches, # 分批后的查询二维列表
"total": len(query_list), # 总查询数
"batch_count": len(batches) # 批次数量
}
""",
"outputs": [
{"variable": "batches", "type": "array"},
{"variable": "total", "type": "number"},
{"variable": "batch_count", "type": "number"}
]
}
},
{
"id": "process_batch",
"type": "llm",
"data": {
"title": "处理批次",
"model": {
"provider": "openai",
"name": "gpt-4o-mini",
"mode": "chat"
},
"prompt_template": [
{
"role": "system",
"text": "你是一个批量处理专家。"
},
{
"role": "user",
"text": "请处理以下查询列表,返回JSON格式的结果:\n{{#parse.batches#}}"
}
]
}
},
{
"id": "end",
"type": "end",
"data": {
"title": "结束",
"outputs": [
{
"value_selector": ["process_batch", "text"],
"variable": "results"
}
]
}
}
],
"edges": [
{"id": "start-parse", "source": "start", "target": "parse"},
{"id": "parse-process", "source": "parse", "target": "process_batch"},
{"id": "process-end", "source": "process_batch", "target": "end"}
]
}
}
5. 最佳实践¶
✅ 推荐做法¶
- 模块化设计
- 将复杂流程分解为多个模块
- 每个模块负责单一功能
-
便于维护和调试
-
充分测试
- 测试每个节点
- 测试整个工作流
-
边界条件测试
-
错误处理
- 添加异常处理节点
- 提供友好的错误提示
-
记录详细日志
-
性能优化
- 合理使用缓存
- 优化节点顺序
- 减少不必要的计算
❌ 避免做法¶
- 过度复杂
- 保持工作流简洁
- 避免不必要的节点
-
优化流程
-
硬编码
- 使用变量和参数
- 提高灵活性
-
便于维护
-
忽视安全
- 验证用户输入
- 保护敏感信息
- 实施访问控制
6. 常见问题¶
Q1: 如何调试复杂的工作流?¶
A: 调试技巧: - 使用调试模式查看每个节点的输出 - 添加日志节点记录中间结果 - 逐步测试每个节点 - 使用测试数据验证逻辑
Q2: 如何优化工作流性能?¶
A: 优化方法: - 减少不必要的节点 - 优化节点顺序 - 使用缓存减少重复计算 - 选择合适的模型
Q3: 如何处理长文本?¶
A: 处理方法: - 使用知识库节点 - 分块处理 - 提取关键信息 - 使用摘要生成
7. 总结¶
本章深入介绍了工作流设计的核心内容,包括:
- 工作流架构:DAG架构、拓扑排序、循环检测
- 工作流类型:顺序、分支、并行、循环
- 节点高级用法:上下文管理、多模型切换、混合检索
- 设计模式:责任链模式、策略模式
- 性能优化:缓存策略、批处理优化
通过本章的学习,你应该能够设计复杂的Dify工作流了。
8. 下一步¶
继续学习04-数据源集成,深入了解数据源的集成和管理。
最后更新日期:2026-02-12 适用版本:Dify实战教程 v2026