跳转至

03 - 工作流设计

Dify工作流设计示意图

工作流概念、节点类型、连接方式

📖 章节概述

本章将深入探讨Dify工作流的设计方法,包括工作流的核心概念、各种节点类型的高级用法、连接方式的设计模式、性能优化技巧以及实际应用场景。通过详细的代码示例和实践指导,帮助读者掌握复杂工作流的设计和实现。

🎯 学习目标

完成本章后,你将能够:

  • 深入理解工作流的核心概念和架构原理
  • 熟练掌握各种节点类型的高级配置
  • 设计高效、可维护的复杂工作流
  • 掌握工作流性能优化的方法
  • 理解工作流的最佳实践和设计模式
  • 能够解决实际业务中的复杂问题

1. 工作流概念深度解析

1.1 工作流架构原理

技术架构: Dify工作流采用DAG(有向无环图)架构,这是一种数据流处理模型,具有以下特点:

  • 节点(Node):代表一个独立的处理单元
  • 边(Edge):代表数据流向和依赖关系
  • 无环(Acyclic):确保流程的可预测性和终止性
  • 并行执行:支持多个节点并行处理

核心组件

Python
class WorkflowGraph:
    """工作流图数据结构"""

    def __init__(self):
        self.nodes = {}  # 节点字典 {node_id: node_data}
        self.edges = []  # 边列表
        self.adjacency = {}  # 邻接表

    def add_node(self, node_id, node_type, node_data):
        """添加节点"""
        self.nodes[node_id] = {
            "id": node_id,
            "type": node_type,
            "data": node_data
        }
        self.adjacency[node_id] = []

    def add_edge(self, source_id, target_id, edge_type="sequential"):
        """添加边"""
        self.edges.append({
            "id": f"{source_id}-{target_id}",
            "source": source_id,
            "target": target_id,
            "type": edge_type
        })
        self.adjacency[source_id].append(target_id)

    def validate(self):
        """验证工作流是否为DAG"""
        # 使用DFS(深度优先搜索)检测有向图中是否存在环
        visited = set()
        recursion_stack = set()  # 追踪当前递归路径上的节点,用于检测回边

        def has_cycle(node_id):
            visited.add(node_id)
            recursion_stack.add(node_id)

            for neighbor in self.adjacency.get(node_id, []):
                if neighbor not in visited:
                    if has_cycle(neighbor):
                        return True
                elif neighbor in recursion_stack:
                    return True

            recursion_stack.remove(node_id)
            return False

        for node_id in self.nodes:
            if node_id not in visited:
                if has_cycle(node_id):
                    return False, "工作流存在循环依赖"

        return True, "工作流验证通过"

    def topological_sort(self):
        """拓扑排序"""
        # 基于Kahn算法:反复移除入度为0的节点,确定工作流节点的执行顺序
        in_degree = {node_id: 0 for node_id in self.nodes}

        # 计算入度
        for edge in self.edges:
            in_degree[edge["target"]] += 1

        # 找到入度为0的节点
        queue = [node_id for node_id, degree in in_degree.items() if degree == 0]
        result = []

        while queue:
            node_id = queue.pop(0)
            result.append(node_id)

            for neighbor in self.adjacency.get(node_id, []):
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)

        if len(result) != len(self.nodes):
            return None, "工作流存在循环"

        return result, "排序成功"

# 使用示例
if __name__ == "__main__":
    graph = WorkflowGraph()

    # 添加节点:每个节点有唯一ID、类型(start/llm/end等)和配置数据
    graph.add_node("start", "start", {"title": "开始"})
    graph.add_node("process", "llm", {"title": "处理"})
    graph.add_node("end", "end", {"title": "结束"})

    # 添加边:定义节点间的数据流向(source → target)
    graph.add_edge("start", "process")
    graph.add_edge("process", "end")

    # 验证DAG合法性(无环检测)
    is_valid, message = graph.validate()
    print(f"验证结果: {is_valid}, {message}")

    # 拓扑排序:确定节点执行的先后顺序
    order, message = graph.topological_sort()
    print(f"执行顺序: {order}")

1.2 工作流类型详解

1.2.1 顺序工作流

特点:节点按顺序执行,每个节点的输出作为下一个节点的输入

应用场景:简单的线性处理流程

代码示例

Python
class SequentialWorkflow:
    """顺序工作流"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def build(self):
        """构建顺序工作流"""
        # 四阶段流水线:开始 → 预处理(代码节点) → LLM分析 → 格式化(代码节点) → 结束
        return {
            "graph": {
                "nodes": [
                    {
                        "id": "start",
                        "type": "start",  # 开始节点:定义工作流的输入变量
                        "data": {
                            "title": "开始",
                            "variables": [
                                {
                                    "variable": "input_text",
                                    "label": "输入文本",
                                    "type": "paragraph",
                                    "required": True
                                }
                            ]
                        }
                    },
                    {
                        "id": "preprocess",
                        "type": "code",  # 代码节点:运行在Dify安全沙箱中的Python代码
                        "data": {
                            "title": "预处理",
                            "code": """
def main(input_text: str) -> dict:  # 代码节点入口函数,参数名需与上游变量匹配
    # 文本清洗
    cleaned = input_text.strip().lower()
    # 分词
    words = cleaned.split()
    return {
        "cleaned_text": cleaned,
        "word_count": len(words),
        "words": words
    }
""",
                            "outputs": [
                                {"variable": "cleaned_text", "type": "string"},
                                {"variable": "word_count", "type": "number"},
                                {"variable": "words", "type": "array"}
                            ]
                        }
                    },
                    {
                        "id": "analyze",
                        "type": "llm",  # LLM节点:调用大语言模型进行推理
                        "data": {
                            "title": "分析",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o-mini",
                                "mode": "chat"  # chat模式支持多轮对话格式的prompt
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "你是一个文本分析专家。"
                                },
                                {
                                    "role": "user",
                                    # Dify变量引用语法:{{#节点ID.变量名#}},引用上游节点的输出
                                    "text": """请分析以下文本:
文本:{{#preprocess.cleaned_text#}}
词数:{{#preprocess.word_count#}}

请提供:
1. 情感倾向(正面/负面/中性)
2. 关键词提取
3. 主题分类"""
                                }
                            ]
                        }
                    },
                    {
                        "id": "format",
                        "type": "code",
                        "data": {
                            "title": "格式化",
                            "code": """
import json

def main(llm_output: str) -> dict:
    # 尝试解析JSON
    try:
        result = json.loads(llm_output)
    except:
        result = {"raw_output": llm_output}

    return result
""",
                            "outputs": [
                                {"variable": "formatted_result", "type": "object"}
                            ]
                        }
                    },
                    {
                        "id": "end",
                        "type": "end",  # 结束节点:定义工作流的最终输出
                        "data": {
                            "title": "结束",
                            "outputs": [
                                {
                                    "value_selector": ["format", "formatted_result"],  # [节点ID, 变量名] 选择上游输出
                                    "variable": "result"
                                }
                            ]
                        }
                    }
                ],
                # 边定义数据流向:source节点的输出传递给target节点作为输入
                "edges": [
                    {"id": "start-preprocess", "source": "start", "target": "preprocess"},
                    {"id": "preprocess-analyze", "source": "preprocess", "target": "analyze"},
                    {"id": "analyze-format", "source": "analyze", "target": "format"},
                    {"id": "format-end", "source": "format", "target": "end"}
                ]
            }
        }

1.2.2 分支工作流

特点:根据条件选择不同的执行路径

应用场景:需要根据输入或中间结果进行决策的场景

代码示例

Python
class BranchingWorkflow:
    """分支工作流"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def build(self):
        """构建分支工作流"""
        # 根据query_type条件路由到不同处理器:知识问答/代码生成/翻译/总结
        return {
            "graph": {
                "nodes": [
                    {
                        "id": "start",
                        "type": "start",
                        "data": {
                            "title": "开始",
                            "variables": [
                                {
                                    "variable": "user_query",
                                    "label": "用户查询",
                                    "type": "paragraph",
                                    "required": True
                                },
                                {
                                    "variable": "query_type",
                                    "label": "查询类型",
                                    "type": "select",
                                    "required": True,
                                    "options": ["知识问答", "代码生成", "翻译", "总结"]
                                }
                            ]
                        }
                    },
                    {
                        "id": "classify",
                        "type": "if-else",  # 条件分支节点:根据条件将流程路由到不同分支
                        "data": {
                            "title": "分类",
                            "cases": [  # 每个case定义一个分支及其匹配条件
                                {
                                    "case_id": "qa",
                                    "conditions": [
                                        {
                                            "variable_selector": ["start", "query_type"],
                                            "comparison_operator": "is",
                                            "value": "知识问答"
                                        }
                                    ],
                                    "logical_operator": "and"
                                },
                                {
                                    "case_id": "code",
                                    "conditions": [
                                        {
                                            "variable_selector": ["start", "query_type"],
                                            "comparison_operator": "is",
                                            "value": "代码生成"
                                        }
                                    ],
                                    "logical_operator": "and"
                                },
                                {
                                    "case_id": "translate",
                                    "conditions": [
                                        {
                                            "variable_selector": ["start", "query_type"],
                                            "comparison_operator": "is",
                                            "value": "翻译"
                                        }
                                    ],
                                    "logical_operator": "and"
                                },
                                {
                                    "case_id": "summarize",
                                    "conditions": [
                                        {
                                            "variable_selector": ["start", "query_type"],
                                            "comparison_operator": "is",
                                            "value": "总结"
                                        }
                                    ],
                                    "logical_operator": "and"
                                }
                            ]
                        }
                    },
                    {
                        "id": "qa_handler",
                        "type": "knowledge-retrieval",  # 知识库检索节点:从已导入的数据集中检索相关内容
                        "data": {
                            "title": "知识问答处理",
                            "dataset_ids": ["qa_dataset_id"],  # 关联的知识库ID列表
                            "retrieval_mode": "multiple",  # multiple模式:从多个数据集联合检索
                            "multiple_retrieval_config": {
                                "top_k": 3,  # 返回最相关的前3条结果
                                "score_threshold": 0.5  # 相似度阈值,低于此值的结果被过滤
                            }
                        }
                    },
                    {
                        "id": "code_handler",
                        "type": "llm",
                        "data": {
                            "title": "代码生成处理",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o",
                                "mode": "chat"
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "你是一个专业的程序员,擅长编写高质量代码。"
                                },
                                {
                                    "role": "user",
                                    "text": "请为以下需求编写代码:\n{{#start.user_query#}}\n\n要求:\n1. 代码清晰易读\n2. 添加注释\n3. 包含错误处理"
                                }
                            ]
                        }
                    },
                    {
                        "id": "translate_handler",
                        "type": "llm",
                        "data": {
                            "title": "翻译处理",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o-mini",
                                "mode": "chat"
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "你是一个专业的翻译,擅长中英互译。"
                                },
                                {
                                    "role": "user",
                                    "text": "请翻译以下文本:\n{{#start.user_query#}}"
                                }
                            ]
                        }
                    },
                    {
                        "id": "summarize_handler",
                        "type": "llm",
                        "data": {
                            "title": "总结处理",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o-mini",
                                "mode": "chat"
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "你是一个专业的文本总结专家。"
                                },
                                {
                                    "role": "user",
                                    "text": "请总结以下文本:\n{{#start.user_query#}}\n\n要求:\n1. 提取关键信息\n2. 保持简洁\n3. 结构清晰"
                                }
                            ]
                        }
                    },
                    {
                        "id": "end",
                        "type": "end",
                        "data": {
                            "title": "结束",
                            "outputs": [
                                {
                                    "value_selector": ["qa_handler", "context"],
                                    "variable": "qa_result"
                                },
                                {
                                    "value_selector": ["code_handler", "text"],
                                    "variable": "code_result"
                                },
                                {
                                    "value_selector": ["translate_handler", "text"],
                                    "variable": "translate_result"
                                },
                                {
                                    "value_selector": ["summarize_handler", "text"],
                                    "variable": "summarize_result"
                                }
                            ]
                        }
                    }
                ],
                # sourceHandle指定if-else节点的具体分支输出,对应cases中的case_id
                "edges": [
                    {"id": "start-classify", "source": "start", "target": "classify"},
                    {"id": "classify-qa", "source": "classify", "target": "qa_handler", "sourceHandle": "qa"},
                    {"id": "classify-code", "source": "classify", "target": "code_handler", "sourceHandle": "code"},
                    {"id": "classify-translate", "source": "classify", "target": "translate_handler", "sourceHandle": "translate"},
                    {"id": "classify-summarize", "source": "classify", "target": "summarize_handler", "sourceHandle": "summarize"},
                    {"id": "qa-end", "source": "qa_handler", "target": "end"},
                    {"id": "code-end", "source": "code_handler", "target": "end"},
                    {"id": "translate-end", "source": "translate_handler", "target": "end"},
                    {"id": "summarize-end", "source": "summarize_handler", "target": "end"}
                ]
            }
        }

1.2.3 并行工作流

特点:多个节点同时执行,最后合并结果

应用场景:需要同时处理多个独立任务的场景

代码示例

Python
class ParallelWorkflow:
    """并行工作流"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def build(self):
        """构建并行工作流"""
        # 扇出-扇入模式:开始节点同时触发多个并行LLM任务,最后由代码节点合并结果
        return {
            "graph": {
                "nodes": [
                    {
                        "id": "start",
                        "type": "start",
                        "data": {
                            "title": "开始",
                            "variables": [
                                {
                                    "variable": "text",
                                    "label": "待分析文本",
                                    "type": "paragraph",
                                    "required": True
                                }
                            ]
                        }
                    },
                    {
                        "id": "sentiment",
                        "type": "llm",
                        "data": {
                            "title": "情感分析",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o-mini",
                                "mode": "chat"
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "你是一个情感分析专家。"
                                },
                                {
                                    "role": "user",
                                    "text": "请分析以下文本的情感倾向(正面/负面/中性):\n{{#start.text#}}\n\n只返回情感倾向。"
                                }
                            ]
                        }
                    },
                    {
                        "id": "keywords",
                        "type": "llm",
                        "data": {
                            "title": "关键词提取",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o-mini",
                                "mode": "chat"
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "你是一个关键词提取专家。"
                                },
                                {
                                    "role": "user",
                                    "text": "请从以下文本中提取5个关键词:\n{{#start.text#}}\n\n以JSON格式返回:{\"keywords\": [\"关键词1\", \"关键词2\", ...]}"
                                }
                            ]
                        }
                    },
                    {
                        "id": "summary",
                        "type": "llm",
                        "data": {
                            "title": "文本摘要",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o-mini",
                                "mode": "chat"
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "你是一个文本摘要专家。"
                                },
                                {
                                    "role": "user",
                                    "text": "请为以下文本生成一个简短摘要(不超过100字):\n{{#start.text#}}"
                                }
                            ]
                        }
                    },
                    {
                        "id": "merge",
                        "type": "code",
                        "data": {
                            "title": "合并结果",
                            "code": """
import json

def main(sentiment: str, keywords: str, summary: str) -> dict:
    # 解析关键词
    try:  # try/except捕获异常
        keywords_data = json.loads(keywords)
        keyword_list = keywords_data.get("keywords", [])
    except:
        keyword_list = []

    # 合并结果
    result = {
        "sentiment": sentiment.strip(),
        "keywords": keyword_list,
        "summary": summary.strip()
    }

    return result
""",
                            "outputs": [
                                {"variable": "merged_result", "type": "object"}
                            ]
                        }
                    },
                    {
                        "id": "end",
                        "type": "end",
                        "data": {
                            "title": "结束",
                            "outputs": [
                                {
                                    "value_selector": ["merge", "merged_result"],
                                    "variable": "analysis_result"
                                }
                            ]
                        }
                    }
                ],
                # 并行扇出:start同时连接3个LLM节点并行执行;扇入:3个节点汇聚到merge
                "edges": [
                    {"id": "start-sentiment", "source": "start", "target": "sentiment"},
                    {"id": "start-keywords", "source": "start", "target": "keywords"},
                    {"id": "start-summary", "source": "start", "target": "summary"},
                    {"id": "sentiment-merge", "source": "sentiment", "target": "merge"},
                    {"id": "keywords-merge", "source": "keywords", "target": "merge"},
                    {"id": "summary-merge", "source": "summary", "target": "merge"},
                    {"id": "merge-end", "source": "merge", "target": "end"}
                ]
            }
        }

1.2.4 循环工作流

特点:重复执行某些节点,直到满足条件

应用场景:需要迭代处理的场景

代码示例

Python
class LoopWorkflow:
    """循环工作流"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def build(self):
        """构建循环工作流"""
        # 迭代优化模式:初始化 → 条件检查 → LLM处理 → 状态更新,直到达到最大迭代次数
        return {
            "graph": {
                "nodes": [
                    {
                        "id": "start",
                        "type": "start",
                        "data": {
                            "title": "开始",
                            "variables": [
                                {
                                    "variable": "target_text",
                                    "label": "目标文本",
                                    "type": "paragraph",
                                    "required": True
                                },
                                {
                                    "variable": "max_iterations",
                                    "label": "最大迭代次数",
                                    "type": "number",
                                    "default": 5,
                                    "required": True
                                }
                            ]
                        }
                    },
                    {
                        "id": "initialize",
                        "type": "code",
                        "data": {
                            "title": "初始化",
                            "code": """
def main(target_text: str, max_iterations: int) -> dict:
    # 初始化迭代状态:当前文本、迭代计数器、完成标志
    return {
        "current_text": target_text,
        "iteration": 0,
        "max_iterations": max_iterations,
        "is_complete": False
    }
""",  # outputs定义代码节点向下游暴露的变量
                            "outputs": [
                                {"variable": "current_text", "type": "string"},
                                {"variable": "iteration", "type": "number"},
                                {"variable": "max_iterations", "type": "number"},
                                {"variable": "is_complete", "type": "boolean"}
                            ]
                        }
                    },
                    {
                        "id": "check_condition",
                        "type": "if-else",
                        "data": {
                            "title": "检查条件",
                            "cases": [
                                {
                                    "case_id": "continue",
                                    "conditions": [
                                        {
                                            "variable_selector": ["initialize", "is_complete"],
                                            "comparison_operator": "is",
                                            "value": False
                                        },
                                        {
                                            "variable_selector": ["initialize", "iteration"],
                                            "comparison_operator": "<",
                                            "value": "{{#initialize.max_iterations#}}"
                                        }
                                    ],
                                    "logical_operator": "and"
                                },
                                {
                                    "case_id": "break",
                                    "conditions": [],
                                    "logical_operator": "and"
                                }
                            ]
                        }
                    },
                    {
                        "id": "process",
                        "type": "llm",
                        "data": {
                            "title": "处理文本",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o-mini",
                                "mode": "chat"
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "你是一个文本优化专家。"
                                },
                                {
                                    "role": "user",
                                    "text": "请优化以下文本,使其更加简洁清晰:\n{{#initialize.current_text#}}\n\n只返回优化后的文本,不要添加其他说明。"
                                }
                            ]
                        }
                    },
                    {
                        "id": "update",
                        "type": "code",
                        "data": {
                            "title": "更新状态",
                            "code": """
def main(processed_text: str, iteration: int, max_iterations: int) -> dict:
    # 更新迭代状态:递增计数器,判断是否达到终止条件
    new_iteration = iteration + 1
    is_complete = new_iteration >= max_iterations  # 达到最大次数时标记完成

    return {
        "current_text": processed_text,
        "iteration": new_iteration,
        "max_iterations": max_iterations,
        "is_complete": is_complete
    }
""",
                            "outputs": [
                                {"variable": "current_text", "type": "string"},
                                {"variable": "iteration", "type": "number"},
                                {"variable": "max_iterations", "type": "number"},
                                {"variable": "is_complete", "type": "boolean"}
                            ]
                        }
                    },
                    {
                        "id": "end",
                        "type": "end",
                        "data": {
                            "title": "结束",
                            "outputs": [
                                {
                                    "value_selector": ["initialize", "current_text"],
                                    "variable": "final_text"
                                },
                                {
                                    "value_selector": ["initialize", "iteration"],
                                    "variable": "total_iterations"
                                }
                            ]
                        }
                    }
                ],
                "edges": [
                    {"id": "start-initialize", "source": "start", "target": "initialize"},
                    {"id": "initialize-check", "source": "initialize", "target": "check_condition"},
                    {"id": "check-process", "source": "check_condition", "target": "process", "sourceHandle": "continue"},
                    {"id": "check-end", "source": "check_condition", "target": "end", "sourceHandle": "break"},
                    {"id": "process-update", "source": "process", "target": "update"},
                    {"id": "update-end", "source": "update", "target": "end"}
                ]
                # 注意:Dify工作流基于DAG(有向无环图),不支持循环边
                # 要实现迭代处理,请使用Dify内置的“迭代”节点类型
            }
        }

2. 节点类型高级用法

2.1 LLM节点高级配置

2.1.1 上下文管理

Python
def create_llm_with_context(node_id, context_config):
    """创建带上下文的LLM节点"""
    return {
        "id": node_id,
        "type": "llm",
        "data": {
            "title": "LLM处理",
            "model": {
                "provider": "openai",
                "name": "gpt-4o-mini",
                "mode": "chat"
            },
            "prompt_template": [
                {
                    "role": "system",
                    "text": "你是一个专业的助手。"
                },
                {
                    "role": "user",
                    "text": "{{#user_input#}}"
                }
            ],
            # 上下文配置:将知识库检索结果等外部信息注入LLM提示词
            "context": {
                "enabled": True,
                "variable_selector": context_config["variables"],  # 关联的上下文变量(如知识库检索结果)
                "max_tokens": context_config.get("max_tokens", 2000),  # 上下文最大token数
                "max_segments": context_config.get("max_segments", 10)  # 最大检索片段数
            },
            # 对话记忆配置:仅在Chatflow类型工作流中生效
            "memory": {
                "enabled": context_config.get("memory_enabled", False),
                "window": {
                    "enabled": True,
                    "size": context_config.get("memory_size", 10)  # 保留最近N轮对话历史
                }
            }
        }
    }

# 使用示例
llm_node = create_llm_with_context(
    node_id="llm_with_context",
    context_config={
        "variables": ["kb_1.context", "user_history"],
        "max_tokens": 3000,
        "max_segments": 15,
        "memory_enabled": True,
        "memory_size": 5
    }
)

2.1.2 多模型切换

Python
def create_multi_model_llm(node_id, model_fallback):
    """创建多模型切换的LLM节点"""
    return {
        "id": node_id,
        "type": "llm",
        "data": {
            "title": "多模型LLM",
            "model": {
                "provider": model_fallback["primary"]["provider"],
                "name": model_fallback["primary"]["name"],
                "mode": "chat",
                "completion_params": {
                    "temperature": 0.7,
                    "max_tokens": 2048
                }
            },
            "fallback_model": model_fallback["fallback"][0] if model_fallback["fallback"] else None,
            # 注意:Dify LLM节点不直接支持fallback_models列表
            # 可以通过条件分支+多个LLM节点实现模型回退逻辑
            "prompt_template": [
                {
                    "role": "system",
                    "text": "你是一个专业的助手。"
                },
                {
                    "role": "user",
                    "text": "{{#user_input#}}"
                }
            ]
        }
    }

# 使用示例
llm_node = create_multi_model_llm(
    node_id="multi_model_llm",
    model_fallback={
        "primary": {
            "provider": "openai",
            "name": "gpt-4o"
        },
        "fallback": [
            {
                "provider": "openai",
                "name": "gpt-4o-mini"
            },
            {
                "provider": "anthropic",
                "name": "claude-3-sonnet-20240229"
            }
        ]
    }
)

2.2 知识库节点高级配置

2.2.1 混合检索策略

Python
def create_hybrid_retrieval(node_id, retrieval_config):
    """创建混合检索节点"""
    # 混合检索 = 向量语义检索 + 关键词检索,配合重排序模型提升检索质量
    return {
        "id": node_id,
        "type": "knowledge-retrieval",
        "data": {
            "title": "混合检索",
            "dataset_ids": retrieval_config["dataset_ids"],
            "retrieval_mode": "multiple",  # single=单数据集检索, multiple=多数据集联合检索
            "multiple_retrieval_config": {
                "top_k": retrieval_config.get("top_k", 5),
                "score_threshold": retrieval_config.get("score_threshold", 0.5),
                "reranking_enable": retrieval_config.get("reranking_enable", True),  # 启用重排序优化结果排名
                "reranking_model": {
                    "reranking_provider": "cohere",
                    "reranking_model_name": "rerank-multilingual-v3.0"  # 多语言重排序模型
                },
                # 混合检索权重:语义相似度与关键词精确匹配的权重分配
                "weights": {
                    "vector_weight": retrieval_config.get("vector_weight", 0.7),  # 向量语义检索权重
                    "keyword_weight": retrieval_config.get("keyword_weight", 0.3)  # 关键词匹配权重
                }
            }
        }
    }

# 使用示例
kb_node = create_hybrid_retrieval(
    node_id="hybrid_kb",
    retrieval_config={
        "dataset_ids": ["dataset_1", "dataset_2"],
        "top_k": 10,
        "score_threshold": 0.6,
        "reranking_enable": True,
        "vector_weight": 0.8,
        "keyword_weight": 0.2
    }
)

2.2.2 动态过滤

Python
def create_filtered_retrieval(node_id, filter_config):
    """创建带过滤的检索节点"""
    # 基于元数据(metadata)过滤检索,可按分类、语言、日期等条件筛选文档
    return {
        "id": node_id,
        "type": "knowledge-retrieval",
        "data": {
            "title": "过滤检索",
            "dataset_ids": filter_config["dataset_ids"],
            "retrieval_mode": "multiple",
            "multiple_retrieval_config": {
                "top_k": filter_config.get("top_k", 5),
                "score_threshold": filter_config.get("score_threshold", 0.5),
                "filtering": {
                    "filter_by": filter_config["filter_by"],
                    "filters": filter_config["filters"]
                }
            }
        }
    }

# 使用示例
kb_node = create_filtered_retrieval(
    node_id="filtered_kb",
    filter_config={
        "dataset_ids": ["dataset_1"],
        "top_k": 5,
        "score_threshold": 0.6,
        "filter_by": "metadata",
        # 元数据过滤条件:支持精确匹配和比较运算符
        "filters": [
            {
                "key": "category",
                "value": "技术文档"  # 精确匹配文档分类
            },
            {
                "key": "language",
                "value": "中文"
            },
            {
                "key": "date",
                "operator": ">=",  # 支持 >=, <=, >, < 等比较运算符
                "value": "2024-01-01"
            }
        ]
    }
)

2.3 代码节点高级用法

2.3.1 外部库调用

Python
def create_code_with_libraries(node_id, code_config):
    """创建可调用外部库的代码节点"""
    return {
        "id": node_id,
        "type": "code",
        "data": {
            "title": "代码执行",
            "code": code_config["code"],
            "outputs": code_config["outputs"],
            "environment_variables": code_config.get("environment_variables", {}),
            "timeout": code_config.get("timeout", 30)
        }
    }

# 使用示例 - 基本数据处理(Dify沙箱环境支持的操作)
code_node = create_code_with_libraries(
    node_id="data_processing",
    code_config={
        "code": """
import json

def main(data: str, operation: str) -> dict:
    # 解析输入数据
    # 注意:Dify代码节点运行在安全沙箱中
    # 不支持numpy、pandas等外部库,不支持文件I/O操作
    # 可使用的标准库:json, math, re, datetime等
    data_dict = json.loads(data)

    result = {}
    values = list(data_dict.values())

    if operation == "mean":
        # 手动计算平均值
        if values and all(isinstance(v, (int, float)) for v in values):  # isinstance检查对象类型
            result = {"mean": sum(values) / len(values)}
    elif operation == "sum":
        if values and all(isinstance(v, (int, float)) for v in values):
            result = {"sum": sum(values)}
    elif operation == "count":
        result = {"count": len(data_dict)}

    return {
        "result": result,
        "keys": list(data_dict.keys()),
        "count": len(data_dict)
    }
""",
        "outputs": [
            {"variable": "result", "type": "object"},
            {"variable": "keys", "type": "array"},
            {"variable": "count", "type": "number"}
        ],
        "timeout": 60
    }
)

2.3.2 外部服务调用

Python
def create_http_request_node(node_id, http_config):
    """
    创建HTTP请求节点

    注意:Dify代码节点运行在安全沙箱中,不支持文件I/O操作
    如需调用外部服务,应使用HTTP请求节点而非代码节点
    """
    return {
        "id": node_id,
        "type": "http-request",  # Dify内置HTTP请求节点,用于调用外部REST API
        "data": {
            "title": "外部服务调用",
            "method": http_config.get("method", "POST"),
            "url": http_config["url"],  # URL支持Dify变量模板,如 {{#start.api_url#}}
            "headers": http_config.get("headers", {}),
            "body": http_config.get("body", {}),  # 请求体支持变量引用
            "timeout": http_config.get("timeout", 30)  # 请求超时时间(秒)
        }
    }

# 使用示例
http_node = create_http_request_node(
    node_id="api_call",
    http_config={
        "url": "https://api.example.com/process",
        "method": "POST",
        "headers": {"Content-Type": "application/json"},
        "body": {"data": "{{#start.input_data#}}"},
        "timeout": 30
    }
)

3. 工作流设计模式

3.1 责任链模式

应用场景:需要多个节点依次处理数据的场景

Python
class ResponsibilityChainWorkflow:
    """责任链模式工作流"""

    def build(self):
        """构建责任链工作流"""
        # 责任链:验证器 → 条件判断 → 处理器1(清洗) → 处理器2(增强);验证失败走错误处理分支
        return {
            "graph": {
                "nodes": [
                    {
                        "id": "start",
                        "type": "start",
                        "data": {
                            "title": "开始",
                            "variables": [
                                {
                                    "variable": "input_data",
                                    "label": "输入数据",
                                    "type": "paragraph",
                                    "required": True
                                }
                            ]
                        }
                    },
                    {
                        "id": "validator",
                        "type": "code",
                        "data": {
                            "title": "验证器",
                            "code": """
def main(input_data: str) -> dict:
    # 验证数据
    if not input_data or len(input_data) < 10:
        return {
            "valid": False,
            "error": "输入数据太短",
            "data": None
        }

    return {
        "valid": True,
        "error": None,
        "data": input_data
    }
""",
                            "outputs": [
                                {"variable": "valid", "type": "boolean"},
                                {"variable": "error", "type": "string"},
                                {"variable": "data", "type": "string"}
                            ]
                        }
                    },
                    {
                        "id": "check_valid",
                        "type": "if-else",
                        "data": {
                            "title": "检查验证",
                            "cases": [
                                {
                                    "case_id": "valid",
                                    "conditions": [
                                        {
                                            "variable_selector": ["validator", "valid"],
                                            "comparison_operator": "is",
                                            "value": True
                                        }
                                    ],
                                    "logical_operator": "and"
                                },
                                {
                                    "case_id": "invalid",
                                    "conditions": [],
                                    "logical_operator": "and"
                                }
                            ]
                        }
                    },
                    {
                        "id": "processor1",
                        "type": "llm",
                        "data": {
                            "title": "处理器1",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o-mini",
                                "mode": "chat"
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "你是一个数据清洗专家。"
                                },
                                {
                                    "role": "user",
                                    "text": "请清洗以下数据:\n{{#validator.data#}}"
                                }
                            ]
                        }
                    },
                    {
                        "id": "processor2",
                        "type": "llm",
                        "data": {
                            "title": "处理器2",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o-mini",
                                "mode": "chat"
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "你是一个数据增强专家。"
                                },
                                {
                                    "role": "user",
                                    "text": "请增强以下数据:\n{{#processor1.text#}}"
                                }
                            ]
                        }
                    },
                    {
                        "id": "error_handler",
                        "type": "code",
                        "data": {
                            "title": "错误处理",
                            "code": """
def main(error: str) -> dict:
    return {
        "status": "error",
        "message": error,
        "result": None
    }
""",
                            "outputs": [
                                {"variable": "status", "type": "string"},
                                {"variable": "message", "type": "string"},
                                {"variable": "result", "type": "string"}
                            ]
                        }
                    },
                    {
                        "id": "end",
                        "type": "end",
                        "data": {
                            "title": "结束",
                            "outputs": [
                                {
                                    "value_selector": ["processor2", "text"],
                                    "variable": "result"
                                },
                                {
                                    "value_selector": ["error_handler", "status"],
                                    "variable": "status"
                                }
                            ]
                        }
                    }
                ],
                "edges": [
                    {"id": "start-validator", "source": "start", "target": "validator"},
                    {"id": "validator-check", "source": "validator", "target": "check_valid"},
                    {"id": "check-processor1", "source": "check_valid", "target": "processor1", "sourceHandle": "valid"},
                    {"id": "check-error", "source": "check_valid", "target": "error_handler", "sourceHandle": "invalid"},
                    {"id": "processor1-processor2", "source": "processor1", "target": "processor2"},
                    {"id": "processor2-end", "source": "processor2", "target": "end"},
                    {"id": "error-end", "source": "error_handler", "target": "end"}
                ]
            }
        }

3.2 策略模式

应用场景:需要根据不同条件选择不同处理策略的场景

Python
class StrategyWorkflow:
    """策略模式工作流"""

    def build(self):
        """构建策略模式工作流"""
        # 策略模式:根据用户选择的策略(快速/准确/平衡)路由到不同配置的LLM节点
        return {
            "graph": {
                "nodes": [
                    {
                        "id": "start",
                        "type": "start",
                        "data": {
                            "title": "开始",
                            "variables": [
                                {
                                    "variable": "input",
                                    "label": "输入",
                                    "type": "paragraph",
                                    "required": True
                                },
                                {
                                    "variable": "strategy",
                                    "label": "策略",
                                    "type": "select",
                                    "required": True,
                                    "options": ["快速", "准确", "平衡"]
                                }
                            ]
                        }
                    },
                    {
                        "id": "select_strategy",
                        "type": "if-else",
                        "data": {
                            "title": "选择策略",
                            "cases": [
                                {
                                    "case_id": "fast",
                                    "conditions": [
                                        {
                                            "variable_selector": ["start", "strategy"],
                                            "comparison_operator": "is",
                                            "value": "快速"
                                        }
                                    ],
                                    "logical_operator": "and"
                                },
                                {
                                    "case_id": "accurate",
                                    "conditions": [
                                        {
                                            "variable_selector": ["start", "strategy"],
                                            "comparison_operator": "is",
                                            "value": "准确"
                                        }
                                    ],
                                    "logical_operator": "and"
                                },
                                {
                                    "case_id": "balanced",
                                    "conditions": [
                                        {
                                            "variable_selector": ["start", "strategy"],
                                            "comparison_operator": "is",
                                            "value": "平衡"
                                        }
                                    ],
                                    "logical_operator": "and"
                                }
                            ]
                        }
                    },
                    {
                        "id": "fast_strategy",
                        "type": "llm",
                        "data": {
                            "title": "快速策略",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o-mini",  # 轻量模型,推理速度快
                                "mode": "chat",
                                "completion_params": {
                                    "temperature": 0.3,  # 低温度 = 更确定性输出,减少推理时间
                                    "max_tokens": 500  # 限制输出长度以加快响应
                                }
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "快速处理,简洁回答。"
                                },
                                {
                                    "role": "user",
                                    "text": "{{#start.input#}}"
                                }
                            ]
                        }
                    },
                    {
                        "id": "accurate_strategy",
                        "type": "llm",
                        "data": {
                            "title": "准确策略",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o",  # 高性能模型,推理质量更高
                                "mode": "chat",
                                "completion_params": {
                                    "temperature": 0.1,  # 极低温度 = 最大确定性,适合精确分析任务
                                    "max_tokens": 2000  # 允许更长输出以提供详细分析
                                }
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "仔细分析,详细回答。"
                                },
                                {
                                    "role": "user",
                                    "text": "{{#start.input#}}"
                                }
                            ]
                        }
                    },
                    {
                        "id": "balanced_strategy",
                        "type": "llm",
                        "data": {
                            "title": "平衡策略",
                            "model": {
                                "provider": "openai",
                                "name": "gpt-4o-mini",
                                "mode": "chat",
                                "completion_params": {
                                    "temperature": 0.7,  # 中等温度 = 兼顾创造性与准确性
                                    "max_tokens": 1000  # 中等输出长度
                                }
                            },
                            "prompt_template": [
                                {
                                    "role": "system",
                                    "text": "平衡速度和准确性。"
                                },
                                {
                                    "role": "user",
                                    "text": "{{#start.input#}}"
                                }
                            ]
                        }
                    },
                    {
                        "id": "end",
                        "type": "end",
                        "data": {
                            "title": "结束",
                            "outputs": [
                                {
                                    "value_selector": ["fast_strategy", "text"],
                                    "variable": "fast_result"
                                },
                                {
                                    "value_selector": ["accurate_strategy", "text"],
                                    "variable": "accurate_result"
                                },
                                {
                                    "value_selector": ["balanced_strategy", "text"],
                                    "variable": "balanced_result"
                                }
                            ]
                        }
                    }
                ],
                "edges": [
                    {"id": "start-select", "source": "start", "target": "select_strategy"},
                    {"id": "select-fast", "source": "select_strategy", "target": "fast_strategy", "sourceHandle": "fast"},
                    {"id": "select-accurate", "source": "select_strategy", "target": "accurate_strategy", "sourceHandle": "accurate"},
                    {"id": "select-balanced", "source": "select_strategy", "target": "balanced_strategy", "sourceHandle": "balanced"},
                    {"id": "fast-end", "source": "fast_strategy", "target": "end"},
                    {"id": "accurate-end", "source": "accurate_strategy", "target": "end"},
                    {"id": "balanced-end", "source": "balanced_strategy", "target": "end"}
                ]
            }
        }

4. 性能优化

4.1 缓存策略

Python
def create_cached_workflow():
    """创建带缓存的工作流"""
    return {
        "graph": {
            "nodes": [
                {
                    "id": "start",
                    "type": "start",
                    "data": {
                        "title": "开始",
                        "variables": [
                            {
                                "variable": "query",
                                "label": "查询",
                                "type": "paragraph",
                                "required": True
                            }
                        ]
                    }
                },
                {
                    "id": "check_cache",
                    "type": "code",
                    "data": {
                        "title": "检查缓存",
                        "code": """
import hashlib

# ⚠️ 重要提示:Dify代码节点在沙盒中运行,
# 变量不会在不同的工作流运行之间持久化。
# 生产环境中应通过HTTP请求节点调用外部缓存服务
#(如Redis)来实现真正的缓存功能。
# 以下代码仅演示缓存键生成逻辑。

def main(query: str) -> dict:
    # 生成缓存键
    cache_key = hashlib.md5(query.encode()).hexdigest()

    # 在实际应用中,这里应通过HTTP节点
    # 调用Redis等外部缓存服务查询缓存
    return {
        "cached": False,
        "result": None,
        "cache_key": cache_key
    }
""",
                        "outputs": [
                            {"variable": "cached", "type": "boolean"},
                            {"variable": "result", "type": "string"},
                            {"variable": "cache_key", "type": "string"}
                        ]
                    }
                },
                {
                    "id": "process",
                    "type": "llm",
                    "data": {
                        "title": "处理",
                        "model": {
                            "provider": "openai",
                            "name": "gpt-4o-mini",
                            "mode": "chat"
                        },
                        "prompt_template": [
                            {
                                "role": "system",
                                "text": "你是一个专业的助手。"
                            },
                            {
                                "role": "user",
                                "text": "{{#start.query#}}"
                            }
                        ]
                    }
                },
                {
                    "id": "save_cache",
                    "type": "code",
                    "data": {
                        "title": "保存缓存",
                        "code": """
def main(cache_key: str, result: str) -> dict:
    # ⚠️ 注意:此处cache_store在沙盒中不持久化
    # 生产环境应通过HTTP请求节点将结果保存到Redis等外部缓存

    return {
        "saved": True,
        "cache_key": cache_key,
        "result": result
    }
""",
                        "outputs": [
                            {"variable": "saved", "type": "boolean"},
                            {"variable": "result", "type": "string"}
                        ]
                    }
                },
                {
                    "id": "end",
                    "type": "end",
                    "data": {
                        "title": "结束",
                        "outputs": [
                            {
                                "value_selector": ["check_cache", "result"],
                                "variable": "cached_result"
                            },
                            {
                                "value_selector": ["save_cache", "result"],
                                "variable": "processed_result"
                            }
                        ]
                    }
                }
            ],
            "edges": [
                {"id": "start-check", "source": "start", "target": "check_cache"},
                {"id": "check-process", "source": "check_cache", "target": "process", "sourceHandle": "false"},
                {"id": "process-save", "source": "process", "target": "save_cache"},
                {"id": "check-end", "source": "check_cache", "target": "end", "sourceHandle": "true"},
                {"id": "save-end", "source": "save_cache", "target": "end"}
            ]
        }
    }

4.2 批处理优化

Python
def create_batch_workflow():
    """创建批处理工作流"""
    return {
        "graph": {
            "nodes": [
                {
                    "id": "start",
                    "type": "start",
                    "data": {
                        "title": "开始",
                        "variables": [
                            {
                                "variable": "queries",
                                "label": "查询列表",
                                "type": "paragraph",
                                "required": True
                            }
                        ]
                    }
                },
                {
                    "id": "parse",
                    "type": "code",
                    "data": {
                        "title": "解析输入",
                        "code": """
import json

def main(queries: str) -> dict:
    # 解析JSON数组格式的查询列表
    query_list = json.loads(queries)  # json.loads将JSON字符串转为Python对象

    # 按固定大小分批,避免单次LLM请求过大导致超时或token溢出
    batch_size = 5
    batches = [
        query_list[i:i + batch_size]  # 列表切片实现分批
        for i in range(0, len(query_list), batch_size)
    ]

    return {
        "batches": batches,       # 分批后的查询二维列表
        "total": len(query_list), # 总查询数
        "batch_count": len(batches)  # 批次数量
    }
""",
                        "outputs": [
                            {"variable": "batches", "type": "array"},
                            {"variable": "total", "type": "number"},
                            {"variable": "batch_count", "type": "number"}
                        ]
                    }
                },
                {
                    "id": "process_batch",
                    "type": "llm",
                    "data": {
                        "title": "处理批次",
                        "model": {
                            "provider": "openai",
                            "name": "gpt-4o-mini",
                            "mode": "chat"
                        },
                        "prompt_template": [
                            {
                                "role": "system",
                                "text": "你是一个批量处理专家。"
                            },
                            {
                                "role": "user",
                                "text": "请处理以下查询列表,返回JSON格式的结果:\n{{#parse.batches#}}"
                            }
                        ]
                    }
                },
                {
                    "id": "end",
                    "type": "end",
                    "data": {
                        "title": "结束",
                        "outputs": [
                            {
                                "value_selector": ["process_batch", "text"],
                                "variable": "results"
                            }
                        ]
                    }
                }
            ],
            "edges": [
                {"id": "start-parse", "source": "start", "target": "parse"},
                {"id": "parse-process", "source": "parse", "target": "process_batch"},
                {"id": "process-end", "source": "process_batch", "target": "end"}
            ]
        }
    }

5. 最佳实践

✅ 推荐做法

  1. 模块化设计
  2. 将复杂流程分解为多个模块
  3. 每个模块负责单一功能
  4. 便于维护和调试

  5. 充分测试

  6. 测试每个节点
  7. 测试整个工作流
  8. 边界条件测试

  9. 错误处理

  10. 添加异常处理节点
  11. 提供友好的错误提示
  12. 记录详细日志

  13. 性能优化

  14. 合理使用缓存
  15. 优化节点顺序
  16. 减少不必要的计算

❌ 避免做法

  1. 过度复杂
  2. 保持工作流简洁
  3. 避免不必要的节点
  4. 优化流程

  5. 硬编码

  6. 使用变量和参数
  7. 提高灵活性
  8. 便于维护

  9. 忽视安全

  10. 验证用户输入
  11. 保护敏感信息
  12. 实施访问控制

6. 常见问题

Q1: 如何调试复杂的工作流?

A: 调试技巧: - 使用调试模式查看每个节点的输出 - 添加日志节点记录中间结果 - 逐步测试每个节点 - 使用测试数据验证逻辑

Q2: 如何优化工作流性能?

A: 优化方法: - 减少不必要的节点 - 优化节点顺序 - 使用缓存减少重复计算 - 选择合适的模型

Q3: 如何处理长文本?

A: 处理方法: - 使用知识库节点 - 分块处理 - 提取关键信息 - 使用摘要生成

7. 总结

本章深入介绍了工作流设计的核心内容,包括:

  1. 工作流架构:DAG架构、拓扑排序、循环检测
  2. 工作流类型:顺序、分支、并行、循环
  3. 节点高级用法:上下文管理、多模型切换、混合检索
  4. 设计模式:责任链模式、策略模式
  5. 性能优化:缓存策略、批处理优化

通过本章的学习,你应该能够设计复杂的Dify工作流了。

8. 下一步

继续学习04-数据源集成,深入了解数据源的集成和管理。


最后更新日期:2026-02-12 适用版本:Dify实战教程 v2026