跳转至

06 - 部署与发布

应用部署、 API 集成、监控管理

📖 章节概述

本章将深入介绍如何部署和发布 Dify 应用,包括云端部署、本地部署、 API 集成、监控管理、性能优化以及安全配置。通过详细的代码示例和实践指导,帮助读者掌握应用部署和发布的核心技能。

🎯 学习目标

完成本章后,你将能够:

  • 深入理解不同的部署方式和适用场景
  • 掌握 Docker 和 Kubernetes 部署方法
  • 熟练集成和使用 Dify API
  • 理解应用监控和日志管理
  • 能够优化应用性能和安全性
  • 掌握版本管理和灰度发布技巧

1. 部署方式

1.1 云端部署

⚠️ 部署说明:Dify 支持多种部署方式。本文于 2026-03-26 重新核实:GitHub Releases 当前可见最新稳定版为 v1.13.22026-03-26 发布),最新预发布版为 1.14.0-rc12026-03-26 发布)。部署时请优先阅读目标版本对应的官方 Release Notes 和升级指南。

1.1.1 Docker 部署

技术原理: Docker 容器化部署可以确保应用在不同环境中的一致性,简化部署流程。

代码示例 - Dockerfile

Docker
# Dify本地部署推荐使用官方docker-compose方式
# 以下为基于Dify API的自定义应用的Dockerfile示例
FROM python:3.11-slim  # FROM指定基础镜像

# 设置工作目录
WORKDIR /app  # WORKDIR设置工作目录

# 安装系统依赖
RUN apt-get update && apt-get install -y \  # RUN在构建时执行命令
    gcc \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .  # COPY将文件复制到镜像中

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 5001  # EXPOSE声明容器监听的端口

# 启动命令
CMD ["gunicorn", "--bind", "0.0.0.0:5001", "--workers", "4", "app:app"]  # CMD容器启动时执行的默认命令

代码示例 - Docker-compose.yml

YAML
# docker-compose.yml
# Dify官方推荐使用官方docker-compose部署
# 参考: https://github.com/langgenius/dify/blob/main/docker/docker-compose.yaml
# 以下为基于Dify API的自定义应用的docker-compose示例
# 注意: Docker Compose V2 不再需要 version 字段

# Dify 快速部署命令(2026年3月更新):
# git clone https://github.com/langgenius/dify.git
# cd dify/docker
# cp .env.example .env
# docker compose up -d

services:  # services定义各个服务容器
  # 自定义应用服务
  app:
    build: .
    ports:
      - "5001:5001"
    environment:
      - DIFY_API_KEY=your-dify-api-key
      - DIFY_BASE_URL=https://api.dify.ai/v1
    restart: unless-stopped

  # 如需本地部署Dify,请使用官方docker-compose文件
  # git clone https://github.com/langgenius/dify.git
  # cd dify/docker
  # cp .env.example .env
  # docker compose up -d

代码示例 - 部署脚本

Python
#!/usr/bin/env python3
"""
Dify应用部署脚本
"""

import os
import subprocess
import requests
from typing import Dict, Optional

class DifyDeployer:
    """Dify部署器"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.dify.ai/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def deploy_to_cloud(self, app_id: str,
                      environment: str = "production") -> Dict:
        """
        部署到云端

        注意:Dify不提供通过API部署应用的接口
        应用发布需在Dify Web界面完成:应用 -> 发布
        云端版本(cloud.dify.ai)创建并发布应用后即可通过API访问
        """
        print(f"请在Dify Web界面发布应用 {app_id}{environment} 环境")
        return {"message": "请在Dify Web界面发布应用"}

    def get_deployment_status(self, app_id: str) -> Dict:
        """
        获取部署状态

        注意:Dify不提供部署状态查询API
        可通过调用应用API验证应用是否正常运行
        """
        url = f"{self.base_url}/parameters"

        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return {"status": "running", "details": response.json()}
        except requests.exceptions.RequestException as e:
            return {"status": "error", "error": str(e)}

    def check_app_health(self, app_id: str) -> Dict:
        """检查应用健康状态"""
        # 通过发送测试请求检查应用是否正常
        url = f"{self.base_url}/parameters"

        try:
            response = requests.get(url, headers=self.headers)
            return {"healthy": response.status_code == 200}
        except requests.exceptions.RequestException:
            return {"healthy": False}

class DockerDeployer:
    """Docker部署器"""

    def __init__(self, project_name: str):
        self.project_name = project_name

    def build_image(self, dockerfile_path: str = "Dockerfile") -> bool:
        """
        构建Docker镜像

        Args:
            dockerfile_path: Dockerfile路径
        """
        try:
            cmd = [
                "docker", "build",
                "-t", f"{self.project_name}:latest",
                "-f", dockerfile_path,
                "."
            ]

            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                print(f"镜像构建成功: {self.project_name}:latest")
                return True
            else:
                print(f"镜像构建失败: {result.stderr}")
                return False

        except Exception as e:
            print(f"构建镜像时出错: {str(e)}")
            return False

    def run_container(self, port: int = 5001,
                    env_vars: Dict = None) -> bool:
        """
        运行Docker容器

        Args:
            port: 端口号(Dify API默认端口5001)
            env_vars: 环境变量
        """
        try:
            cmd = ["docker", "run", "-d", "-p", f"{port}:5001"]

            # 添加环境变量
            if env_vars:
                for key, value in env_vars.items():
                    cmd.extend(["-e", f"{key}={value}"])

            cmd.append(f"{self.project_name}:latest")

            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                container_id = result.stdout.strip()
                print(f"容器运行成功: {container_id}")
                print(f"访问地址: http://localhost:{port}")
                return True
            else:
                print(f"容器运行失败: {result.stderr}")
                return False

        except Exception as e:
            print(f"运行容器时出错: {str(e)}")
            return False

    def deploy_with_compose(self, compose_file: str = "docker-compose.yml") -> bool:
        """
        使用docker-compose部署

        Args:
            compose_file: docker-compose文件路径
        """
        try:
            # 停止现有容器
            subprocess.run(
                ["docker", "compose", "-f", compose_file, "down"],
                capture_output=True
            )

            # 构建并启动
            cmd = ["docker", "compose", "-f", compose_file, "up", "-d", "--build"]
            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                print("部署成功!")
                return True
            else:
                print(f"部署失败: {result.stderr}")
                return False

        except Exception as e:
            print(f"部署时出错: {str(e)}")
            return False

    def get_container_logs(self, container_name: Optional[str] = None) -> str:
        """
        获取容器日志

        Args:
            container_name: 容器名称
        """
        try:
            if container_name:
                cmd = ["docker", "logs", container_name]
            else:
                cmd = ["docker", "compose", "logs"]

            result = subprocess.run(cmd, capture_output=True, text=True)
            return result.stdout

        except Exception as e:
            return f"获取日志时出错: {str(e)}"

    def stop_container(self, container_name: Optional[str] = None) -> bool:
        """
        停止容器

        Args:
            container_name: 容器名称
        """
        try:
            if container_name:
                cmd = ["docker", "stop", container_name]
            else:
                cmd = ["docker", "compose", "stop"]

            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                print("容器停止成功")
                return True
            else:
                print(f"容器停止失败: {result.stderr}")
                return False

        except Exception as e:
            print(f"停止容器时出错: {str(e)}")
            return False

# 使用示例
if __name__ == "__main__":
    # 部署到云端
    cloud_deployer = DifyDeployer(api_key="your_api_key_here")
    result = cloud_deployer.deploy_to_cloud(app_id="app_id_here")
    print(f"云端部署结果: {result}")

    # Docker部署
    docker_deployer = DockerDeployer(project_name="my-dify-app")

    # 构建镜像
    docker_deployer.build_image()

    # 运行容器
    env_vars = {
        "DATABASE_URL": "postgresql://user:pass@db:5432/dify",
        "REDIS_URL": "redis://redis:6379/0",
        "SECRET_KEY": "your-secret-key"
    }
    docker_deployer.run_container(port=5000, env_vars=env_vars)

    # 使用docker-compose部署
    docker_deployer.deploy_with_compose()

1.1.2 Kubernetes 部署

技术原理: Kubernetes 提供容器编排、自动扩缩容、负载均衡等功能,适合大规模生产环境。

代码示例 - Kubernetes 配置

YAML
# deployment.yaml
apiVersion: apps/v1  # apiVersion指定K8s API版本
kind: Deployment  # kind指定资源类型
metadata:
  name: dify-web
  labels:
    app: dify-web
spec:  # spec定义资源的期望状态
  replicas: 3
  selector:
    matchLabels:
      app: dify-web
  template:
    metadata:
      labels:
        app: dify-web
    spec:
      containers:
      - name: dify-web
        image: my-dify-app:latest
        ports:
        - containerPort: 5000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: dify-secrets
              key: database-url
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: dify-secrets
              key: redis-url
        - name: SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: dify-secrets
              key: secret-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 5000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 5000
          initialDelaySeconds: 10
          periodSeconds: 5

---  # YAML文档分隔符
# service.yaml
apiVersion: v1
kind: Service
metadata:
  name: dify-web-service
spec:
  selector:
    app: dify-web
  ports:
  - protocol: TCP
    port: 80
    targetPort: 5000
  type: LoadBalancer

---
# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: dify-web-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: dify-web
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

代码示例 - Kubernetes 部署脚本

Python
#!/usr/bin/env python3
"""
Kubernetes部署脚本
"""

import subprocess
from typing import Dict, List

class K8sDeployer:
    """Kubernetes部署器"""

    def __init__(self, namespace: str = "default"):
        self.namespace = namespace

    def apply_manifest(self, manifest_path: str) -> bool:
        """
        应用Kubernetes配置

        Args:
            manifest_path: 配置文件路径
        """
        try:
            cmd = [
                "kubectl", "apply",
                "-f", manifest_path,
                "-n", self.namespace
            ]

            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                print(f"配置应用成功: {manifest_path}")
                return True
            else:
                print(f"配置应用失败: {result.stderr}")
                return False

        except Exception as e:
            print(f"应用配置时出错: {str(e)}")
            return False

    def get_pods(self) -> List[Dict]:
        """获取Pod列表"""
        try:
            cmd = [
                "kubectl", "get", "pods",
                "-n", self.namespace,
                "-o", "json"
            ]

            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                import json
                data = json.loads(result.stdout)
                return data.get("items", [])
            else:
                return []

        except Exception as e:
            print(f"获取Pod列表时出错: {str(e)}")
            return []

    def get_pod_logs(self, pod_name: str,
                     tail: int = 100) -> str:
        """
        获取Pod日志

        Args:
            pod_name: Pod名称
            tail: 日志行数
        """
        try:
            cmd = [
                "kubectl", "logs",
                pod_name,
                "-n", self.namespace,
                "--tail", str(tail)
            ]

            result = subprocess.run(cmd, capture_output=True, text=True)
            return result.stdout

        except Exception as e:
            return f"获取日志时出错: {str(e)}"

    def scale_deployment(self, deployment_name: str,
                       replicas: int) -> bool:
        """
        扩缩容Deployment

        Args:
            deployment_name: Deployment名称
            replicas: 副本数量
        """
        try:
            cmd = [
                "kubectl", "scale",
                "deployment", deployment_name,
                f"--replicas={replicas}",
                "-n", self.namespace
            ]

            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                print(f"扩缩容成功: {deployment_name} -> {replicas}")
                return True
            else:
                print(f"扩缩容失败: {result.stderr}")
                return False

        except Exception as e:
            print(f"扩缩容时出错: {str(e)}")
            return False

    def rollout_restart(self, deployment_name: str) -> bool:
        """
        重启Deployment

        Args:
            deployment_name: Deployment名称
        """
        try:
            cmd = [
                "kubectl", "rollout",
                "restart", "deployment", deployment_name,
                "-n", self.namespace
            ]

            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                print(f"重启成功: {deployment_name}")
                return True
            else:
                print(f"重启失败: {result.stderr}")
                return False

        except Exception as e:
            print(f"重启时出错: {str(e)}")
            return False

    def get_deployment_status(self, deployment_name: str) -> Dict:
        """
        获取Deployment状态

        Args:
            deployment_name: Deployment名称
        """
        try:
            cmd = [
                "kubectl", "get", "deployment", deployment_name,
                "-n", self.namespace,
                "-o", "json"
            ]

            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                import json
                data = json.loads(result.stdout)
                status = data.get("status", {})
                return {
                    "ready_replicas": status.get("readyReplicas", 0),
                    "available_replicas": status.get("availableReplicas", 0),
                    "updated_replicas": status.get("updatedReplicas", 0),
                    "replicas": status.get("replicas", 0)
                }
            else:
                return {}

        except Exception as e:
            print(f"获取状态时出错: {str(e)}")
            return {}

# 使用示例
if __name__ == "__main__":
    deployer = K8sDeployer(namespace="dify")

    # 应用配置
    deployer.apply_manifest("deployment.yaml")
    deployer.apply_manifest("service.yaml")
    deployer.apply_manifest("hpa.yaml")

    # 获取Pod列表
    pods = deployer.get_pods()
    print(f"当前Pod数量: {len(pods)}")

    # 获取部署状态
    status = deployer.get_deployment_status("dify-web")
    print(f"部署状态: {status}")

    # 扩缩容
    deployer.scale_deployment("dify-web", replicas=5)

    # 重启
    deployer.rollout_restart("dify-web")

1.2 本地部署

技术原理: 本地部署适用于数据敏感、需要完全控制、或测试开发场景。

代码示例 - 本地部署脚本

Python
#!/usr/bin/env python3
"""
本地部署脚本
"""

import os
import subprocess
import shutil
from typing import Dict, Optional

class LocalDeployer:
    """本地部署器"""

    def __init__(self, install_dir: str = "~/dify"):
        self.install_dir = os.path.expanduser(install_dir)

    def clone_repository(self, repo_url: str = "https://github.com/langgenius/dify.git",
                       branch: str = "main") -> bool:
        """
        克隆代码仓库

        Args:
            repo_url: 仓库URL
            branch: 分支名称
        """
        try:
            # 创建目录
            os.makedirs(self.install_dir, exist_ok=True)

            # 克隆仓库
            cmd = [
                "git", "clone",
                "-b", branch,
                repo_url,
                self.install_dir
            ]

            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                print(f"代码克隆成功: {self.install_dir}")
                return True
            else:
                print(f"代码克隆失败: {result.stderr}")
                return False

        except Exception as e:
            print(f"克隆代码时出错: {str(e)}")
            return False

    def install_dependencies(self) -> bool:
        """安装依赖"""
        try:
            # 安装Python依赖
            requirements_file = os.path.join(self.install_dir, "requirements.txt")
            if os.path.exists(requirements_file):
                cmd = ["pip", "install", "-r", requirements_file]
                result = subprocess.run(cmd, capture_output=True, text=True)

                if result.returncode != 0:
                    print(f"依赖安装失败: {result.stderr}")
                    return False

            print("依赖安装成功")
            return True

        except Exception as e:
            print(f"安装依赖时出错: {str(e)}")
            return False

    def configure_environment(self, config: Dict) -> bool:
        """
        配置环境

        Args:
            config: 配置字典
        """
        try:
            # 创建.env文件
            env_file = os.path.join(self.install_dir, ".env")

            with open(env_file, 'w') as f:
                for key, value in config.items():
                    f.write(f"{key}={value}\n")

            print(f"环境配置成功: {env_file}")
            return True

        except Exception as e:
            print(f"配置环境时出错: {str(e)}")
            return False

    def initialize_database(self) -> bool:
        """
        初始化数据库

        注意:Dify使用Flask框架,数据库迁移使用flask db命令
        本地部署推荐使用docker-compose方式,自动处理数据库初始化
        """
        try:
            # Dify使用Flask和Alembic进行数据库迁移
            cmd = ["flask", "db", "upgrade"]
            result = subprocess.run(cmd, cwd=self.install_dir, capture_output=True, text=True)

            if result.returncode != 0:
                print(f"数据库初始化失败: {result.stderr}")
                return False

            print("数据库初始化成功")
            return True

        except Exception as e:
            print(f"初始化数据库时出错: {str(e)}")
            return False

    def start_services(self) -> bool:
        """
        启动服务

        注意:Dify本地部署推荐使用docker-compose方式
        API服务默认端口为5001
        """
        try:
            # 使用docker-compose启动服务
            cmd = ["docker", "compose", "up", "-d"]

            result = subprocess.run(
                cmd,
                cwd=os.path.join(self.install_dir, "docker"),
                capture_output=True,
                text=True
            )

            if result.returncode == 0:
                print("服务启动成功")
                print("访问地址: http://localhost")
                print("API地址: http://localhost:5001")
                return True
            else:
                print(f"服务启动失败: {result.stderr}")
                return False

        except Exception as e:
            print(f"启动服务时出错: {str(e)}")
            return False

    def stop_services(self) -> bool:
        """停止服务"""
        try:
            cmd = ["docker", "compose", "down"]
            subprocess.run(
                cmd,
                cwd=os.path.join(self.install_dir, "docker"),
                capture_output=True
            )

            print("服务停止成功")
            return True

        except Exception as e:
            print(f"停止服务时出错: {str(e)}")
            return False

    def check_status(self) -> Dict:
        """检查服务状态"""
        try:
            # 检查端口是否监听
            cmd = "netstat -tuln | grep 5001"
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

            is_running = "LISTEN" in result.stdout

            return {
                "running": is_running,
                "install_dir": self.install_dir
            }

        except Exception as e:
            return {
                "running": False,
                "error": str(e)
            }

# 使用示例
if __name__ == "__main__":
    deployer = LocalDeployer(install_dir="~/dify-local")

    # 克隆代码
    deployer.clone_repository()

    # 安装依赖
    deployer.install_dependencies()

    # 配置环境
    config = {
        "DATABASE_URL": "postgresql://dify:dify@localhost:5432/dify",
        "REDIS_URL": "redis://localhost:6379/0",
        "SECRET_KEY": "your-secret-key-here",
        "OPENAI_API_KEY": "your-openai-api-key"
    }
    deployer.configure_environment(config)

    # 初始化数据库
    deployer.initialize_database()

    # 启动服务
    deployer.start_services()

    # 检查状态
    status = deployer.check_status()
    print(f"服务状态: {status}")

2. API 集成

2.1 API 调用

代码示例 - 完整的 API 客户端

Python
import requests
import json
from typing import Dict, List, Optional, Generator
import time

class DifyAPIClient:
    """Dify API客户端"""

    def __init__(self, api_key: str, base_url: str = "https://api.dify.ai/v1"):
        """
        初始化API客户端

        Args:
            api_key: API密钥
            base_url: API基础URL
        """
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    # ========== 聊天API ==========

    def chat(self, app_id: str, query: str,
             conversation_id: Optional[str] = None,
             user: str = "default_user",
             inputs: Dict = None) -> Dict:
        """
        发送聊天消息

        Args:
            app_id: 应用ID
            query: 用户消息
            conversation_id: 会话ID
            user: 用户ID
            inputs: 额外输入
        """
        url = f"{self.base_url}/chat-messages"

        payload = {
            "inputs": inputs or {},
            "query": query,
            "response_mode": "blocking",
            "conversation_id": conversation_id,
            "user": user
        }

        try:
            response = requests.post(url, headers=self.headers, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def chat_stream(self, app_id: str, query: str,
                   conversation_id: Optional[str] = None,
                   user: str = "default_user",
                   inputs: Dict = None) -> Generator[str, None, None]:
        """
        流式聊天

        Args:
            app_id: 应用ID
            query: 用户消息
            conversation_id: 会话ID
            user: 用户ID
            inputs: 额外输入
        """
        url = f"{self.base_url}/chat-messages"

        payload = {
            "inputs": inputs or {},
            "query": query,
            "response_mode": "streaming",
            "conversation_id": conversation_id,
            "user": user
        }

        try:
            response = requests.post(
                url,
                headers=self.headers,
                json=payload,
                stream=True
            )
            response.raise_for_status()

            for line in response.iter_lines():
                if line:
                    line = line.decode('utf-8')
                    if line.startswith('data: '):
                        data = json.loads(line[6:])  # 切片操作:[start:end:step]提取子序列
                        if data.get('event') == 'message':
                            yield data.get('answer', '')
                        elif data.get('event') == 'message_end':
                            break

        except requests.exceptions.RequestException as e:
            yield f"Error: {str(e)}"

    # ========== 文本补全API ==========

    def completion(self, app_id: str,
                  user: str = "default_user",
                  inputs: Dict = None) -> Dict:
        """
        文本补全(用于Completion类型应用)

        Args:
            app_id: 应用ID
            user: 用户ID
            inputs: 输入参数字典,键值对应应用定义的变量
        """
        url = f"{self.base_url}/completion-messages"

        payload = {
            "inputs": inputs or {},
            "response_mode": "blocking",
            "user": user
        }

        try:
            response = requests.post(url, headers=self.headers, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def completion_stream(self, app_id: str,
                         user: str = "default_user",
                         inputs: Dict = None) -> Generator[str, None, None]:
        """
        流式文本补全

        Args:
            app_id: 应用ID
            user: 用户ID
            inputs: 输入参数字典
        """
        url = f"{self.base_url}/completion-messages"

        payload = {
            "inputs": inputs or {},
            "response_mode": "streaming",
            "user": user
        }

        try:
            response = requests.post(
                url,
                headers=self.headers,
                json=payload,
                stream=True
            )
            response.raise_for_status()

            for line in response.iter_lines():
                if line:
                    line = line.decode('utf-8')
                    if line.startswith('data: '):
                        data = json.loads(line[6:])
                        if data.get('event') == 'message':
                            yield data.get('answer', '')
                        elif data.get('event') == 'message_end':
                            break

        except requests.exceptions.RequestException as e:
            yield f"Error: {str(e)}"

    # ========== 工作流API ==========

    def run_workflow(self, app_id: str, inputs: Dict,
                   user: str = "default_user") -> Dict:
        """
        运行工作流

        Args:
            app_id: 应用ID
            inputs: 输入参数
            user: 用户ID
        """
        url = f"{self.base_url}/workflows/run"

        payload = {
            "inputs": inputs,
            "response_mode": "blocking",
            "user": user
        }

        try:
            response = requests.post(url, headers=self.headers, json=payload)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    # ========== 对话历史API ==========

    def get_conversation_history(self, conversation_id: str,
                               limit: int = 20) -> Dict:
        """
        获取对话历史

        Args:
            conversation_id: 会话ID
            limit: 返回数量限制
        """
        url = f"{self.base_url}/messages"
        params = {
            "conversation_id": conversation_id,
            "limit": limit
        }

        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def delete_conversation(self, conversation_id: str,
                            user: str = "default_user") -> Dict:
        """
        删除对话

        Args:
            conversation_id: 会话ID
            user: 用户标识(必填)
        """
        url = f"{self.base_url}/conversations/{conversation_id}"

        try:
            response = requests.delete(
                url, headers=self.headers,
                json={"user": user}
            )
            response.raise_for_status()
            return {"message": "删除成功"}
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    # ========== 文件上传API ==========

    def upload_file(self, file_path: str, user: str = "default_user") -> Dict:
        """
        上传文件

        Args:
            file_path: 文件路径
            user: 用户ID
        """
        url = f"{self.base_url}/files/upload"
        headers = {
            "Authorization": f"Bearer {self.api_key}"
        }

        try:
            with open(file_path, 'rb') as f:  # with自动管理资源,确保文件正确关闭
                files = {'file': f}
                data = {'user': user}

                response = requests.post(url, headers=headers, files=files, data=data)
                response.raise_for_status()
                return response.json()
        except Exception as e:
            return {"error": str(e)}

    # ========== 参数验证和错误处理 ==========

    def validate_response(self, response: Dict) -> bool:
        """
        验证API响应

        Args:
            response: API响应
        """
        if "error" in response:
            print(f"API错误: {response['error']}")
            return False
        return True

    def get_usage_info(self, response: Dict) -> Dict:
        """
        获取使用信息

        Args:
            response: API响应
        """
        metadata = response.get("metadata", {})
        usage = metadata.get("usage", {})

        return {
            "total_tokens": usage.get("total_tokens", 0),
            "prompt_tokens": usage.get("prompt_tokens", 0),
            "completion_tokens": usage.get("completion_tokens", 0)
        }

# 使用示例
if __name__ == "__main__":
    client = DifyAPIClient(api_key="your_api_key_here")

    # 聊天
    response = client.chat(
        app_id="app_id_here",
        query="你好!"
    )

    if client.validate_response(response):
        print(f"回复: {response.get('answer', '')}")
        usage = client.get_usage_info(response)
        print(f"使用情况: {usage}")

    # 流式聊天
    print("\n流式回复:")
    for chunk in client.chat_stream(
        app_id="app_id_here",
        query="介绍一下Python"
    ):
        print(chunk, end="", flush=True)
    print()

    # 工作流
    response = client.run_workflow(
        app_id="app_id_here",
        inputs={"input_text": "测试输入"}
    )

    if client.validate_response(response):
        outputs = response.get("data", {}).get("outputs", {})
        print(f"工作流输出: {outputs}")

2.2 SDK 集成

代码示例 - Python SDK 封装

Python
"""
Dify Python SDK
"""

from typing import Dict, List, Optional, Generator, Any
import requests
import json

class DifySDK:
    """Dify SDK"""

    def __init__(self, api_key: str, base_url: str = "https://api.dify.ai/v1"):
        """
        初始化SDK

        Args:
            api_key: API密钥
            base_url: API基础URL
        """
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        })

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.session.close()

    # ========== 装饰器 ==========

    def handle_errors(func):
        """错误处理装饰器"""
        def wrapper(self, *args, **kwargs):  # *args接收任意位置参数;**kwargs接收任意关键字参数
            try:
                return func(self, *args, **kwargs)
            except requests.exceptions.RequestException as e:
                return {"error": str(e)}
            except json.JSONDecodeError as e:
                return {"error": f"JSON解析错误: {str(e)}"}
            except Exception as e:
                return {"error": f"未知错误: {str(e)}"}
        return wrapper

    # ========== 应用信息 ==========
    # 注意:Dify Service API使用App-specific API Key,
    # 每个Key对应一个应用,不存在list_apps接口

    @handle_errors
    def get_app_info(self) -> Dict:
        """获取当前应用基本信息"""
        url = f"{self.base_url}/info"
        response = self.session.get(url)
        response.raise_for_status()
        return response.json()

    @handle_errors
    def get_app_parameters(self) -> Dict:
        """获取当前应用参数信息"""
        url = f"{self.base_url}/parameters"
        response = self.session.get(url)
        response.raise_for_status()
        return response.json()

    # ========== 聊天 ==========

    @handle_errors
    def chat(self, app_id: str, query: str,
             conversation_id: Optional[str] = None,
             user: str = "default_user",
             inputs: Dict = None) -> Dict:
        """聊天"""
        url = f"{self.base_url}/chat-messages"
        payload = {
            "inputs": inputs or {},
            "query": query,
            "response_mode": "blocking",
            "conversation_id": conversation_id,
            "user": user
        }
        response = self.session.post(url, json=payload)
        response.raise_for_status()
        return response.json()

    def chat_stream(self, app_id: str, query: str,
                   conversation_id: Optional[str] = None,
                   user: str = "default_user",
                   inputs: Dict = None) -> Generator[str, None, None]:
        """流式聊天"""
        url = f"{self.base_url}/chat-messages"
        payload = {
            "inputs": inputs or {},
            "query": query,
            "response_mode": "streaming",
            "conversation_id": conversation_id,
            "user": user
        }

        with self.session.post(url, json=payload, stream=True) as response:
            response.raise_for_status()

            for line in response.iter_lines():
                if line:
                    line = line.decode('utf-8')
                    if line.startswith('data: '):
                        data = json.loads(line[6:])  # json.loads将JSON字符串转为Python对象
                        if data.get('event') == 'message':
                            yield data.get('answer', '')  # yield生成器:惰性产出值,节省内存
                        elif data.get('event') == 'message_end':
                            break

    # ========== 工作流 ==========

    @handle_errors
    def run_workflow(self, app_id: str, inputs: Dict,
                   user: str = "default_user") -> Dict:
        """运行工作流"""
        url = f"{self.base_url}/workflows/run"
        payload = {
            "inputs": inputs,
            "response_mode": "blocking",
            "user": user
        }
        response = self.session.post(url, json=payload)
        response.raise_for_status()
        return response.json()

    # ========== 批量操作 ==========

    def batch_chat(self, app_id: str, queries: List[str],
                  user: str = "default_user") -> List[Dict]:
        """批量聊天"""
        results = []
        for query in queries:
            result = self.chat(app_id, query, user=user)
            results.append(result)
        return results

    def batch_run_workflow(self, app_id: str, inputs_list: List[Dict],
                         user: str = "default_user") -> List[Dict]:
        """批量运行工作流"""
        results = []
        for inputs in inputs_list:
            result = self.run_workflow(app_id, inputs, user=user)
            results.append(result)
        return results

# 使用示例
if __name__ == "__main__":
    # 使用上下文管理器
    with DifySDK(api_key="your_api_key_here") as sdk:
        # 获取应用信息
        app_info = sdk.get_app_info()
        print(f"应用信息: {app_info}")

        # 获取应用参数
        params = sdk.get_app_parameters()
        print(f"应用参数: {params}")

        # 聊天
        response = sdk.chat(
            app_id="app_id_here",
            query="你好!"
        )
        print(f"回复: {response.get('answer', '')}")

        # 流式聊天
        print("\n流式回复:")
        for chunk in sdk.chat_stream(
            app_id="app_id_here",
            query="介绍一下Python"
        ):
            print(chunk, end="", flush=True)
        print()

        # 批量操作
        queries = ["你好", "介绍一下AI", "再见"]
        results = sdk.batch_chat("app_id_here", queries)
        print(f"\n批量结果: {len(results)} 个回复")

3. 监控管理

3.1 性能监控

代码示例 - 监控客户端

Python
import requests
import time
from typing import Dict, List
from datetime import datetime

class DifyMonitor:
    """Dify监控器"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}"
        }
        self.base_url = "https://api.dify.ai/v1"

    def get_app_metrics(self, app_id: str) -> Dict:
        """
        获取应用指标

        注意:Dify Service API不提供应用指标查询接口
        应用指标可在Dify Web控制台的"分析"页面查看
        以下通过调用应用API作为健康检查

        Args:
            app_id: 应用ID
        """
        url = f"{self.base_url}/parameters"

        try:  # try/except捕获异常
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def get_conversation_metrics(self, app_id: str,
                                start_time: datetime = None,
                                end_time: datetime = None) -> Dict:
        """
        获取对话指标

        注意:Dify Service API不提供对话统计指标接口
        可通过获取对话列表来计算指标

        Args:
            app_id: 应用ID
            start_time: 开始时间
            end_time: 结束时间
        """
        url = f"{self.base_url}/conversations"

        params = {}
        if start_time:
            params["start_time"] = start_time.isoformat()
        if end_time:
            params["end_time"] = end_time.isoformat()

        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def monitor_response_time(self, app_id: str, query: str) -> Dict:
        """
        监控响应时间

        Args:
            app_id: 应用ID
            query: 测试查询
        """
        url = f"{self.base_url}/chat-messages"

        start_time = time.time()

        try:
            response = requests.post(
                url,
                headers=self.headers,
                json={
                    "inputs": {},
                    "query": query,
                    "response_mode": "blocking",
                    "user": "monitor"
                }
            )
            end_time = time.time()

            response_time = end_time - start_time

            return {
                "response_time": response_time,
                "status": "success" if response.status_code == 200 else "error",
                "status_code": response.status_code
            }

        except requests.exceptions.RequestException as e:
            end_time = time.time()
            return {
                "response_time": end_time - start_time,
                "status": "error",
                "error": str(e)
            }

    def get_token_usage(self, app_id: str,
                       start_time: datetime = None,
                       end_time: datetime = None) -> Dict:
        """
        获取Token使用情况

        注意:Dify Service API不提供 Token 用量统计接口
        Token用量可在Dify Web控制台的"分析"页面查看
        代码中可通过每次API调用的响应中的metadata.usage字段来累计

        Args:
            app_id: 应用ID
            start_time: 开始时间
            end_time: 结束时间
        """
        # 由于没有专门的Token统计API,可通过获取对话列表间接估算
        url = f"{self.base_url}/conversations"

        params = {}
        if start_time:
            params["start_time"] = start_time.isoformat()
        if end_time:
            params["end_time"] = end_time.isoformat()

        try:
            response = requests.get(url, headers=self.headers, params=params)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def generate_report(self, app_id: str) -> Dict:
        """
        生成监控报告

        Args:
            app_id: 应用ID
        """
        # 获取各种指标
        metrics = self.get_app_metrics(app_id)
        token_usage = self.get_token_usage(app_id)

        # 测试响应时间
        response_time = self.monitor_response_time(app_id, "测试查询")

        return {
            "app_id": app_id,
            "timestamp": datetime.now().isoformat(),
            "metrics": metrics,
            "token_usage": token_usage,
            "response_time": response_time
        }

# 使用示例
if __name__ == "__main__":
    monitor = DifyMonitor(api_key="your_api_key_here")

    # 获取应用指标
    metrics = monitor.get_app_metrics(app_id="app_id_here")
    print(f"应用指标: {metrics}")

    # 监控响应时间
    response_time = monitor.monitor_response_time(
        app_id="app_id_here",
        query="你好"
    )
    print(f"响应时间: {response_time}")

    # 获取Token使用情况
    from datetime import datetime, timedelta
    end_time = datetime.now()
    start_time = end_time - timedelta(days=7)

    token_usage = monitor.get_token_usage(
        app_id="app_id_here",
        start_time=start_time,
        end_time=end_time
    )
    print(f"Token使用: {token_usage}")

    # 生成报告
    report = monitor.generate_report(app_id="app_id_here")
    print(f"监控报告: {report}")

4. 练习题

基础练习

  1. 部署应用
  2. Docker 部署
  3. 测试应用
  4. 验证功能

进阶练习

  1. 生产环境部署
  2. Kubernetes 部署
  3. 配置负载均衡
  4. 实现自动扩缩容

5. 最佳实践

✅ 推荐做法

  1. 充分测试
  2. 测试所有功能
  3. 压力测试
  4. 安全测试

  5. 监控告警

  6. 设置监控指标
  7. 配置告警规则
  8. 及时响应问题

❌ 避免做法

  1. 忽视安全
  2. 实施认证授权
  3. 加密敏感数据
  4. 定期安全审计

6. 常见问题

Q1: 如何实现灰度发布

A: 实现方法: - 使用多个版本的应用 - 通过负载均衡分配流量 - 逐步增加新版本流量 - 监控关键指标

Q2: 如何处理 API 限流

A: 处理方法: - 实现请求队列 - 使用指数退避 - 缓存常用请求 - 优化请求频率

7. 总结

本章深入介绍了部署与发布的核心内容,包括:

  1. 部署方式:云端部署、本地部署
  2. API 集成:完整 API 客户端、 SDK 封装
  3. 监控管理:性能监控、日志管理

通过本章的学习,你应该能够熟练部署和管理 Dify 应用了。

8. 企业级高可用部署案例

8.1 架构设计

Text Only
┌─────────────────────────────────────────────────────────────────┐
│                        负载均衡层                                │
│                    (Nginx / ALB / CloudFlare)                   │
└─────────────────────────────────────────────────────────────────┘
                ┌───────────────┼───────────────┐
                ▼               ▼               ▼
        ┌───────────┐   ┌───────────┐   ┌───────────┐
        │  Dify API │   │  Dify API │   │  Dify API │
        │  (Pod 1)  │   │  (Pod 2)  │   │  (Pod 3)  │
        └───────────┘   └───────────┘   └───────────┘
                │               │               │
                └───────────────┼───────────────┘
        ┌───────────────────────┼───────────────────────┐
        │                       │                       │
        ▼                       ▼                       ▼
┌───────────────┐       ┌───────────────┐       ┌───────────────┐
│   PostgreSQL  │       │    Redis      │       │   Vector DB   │
│   (Primary +  │       │   (Cluster)   │       │  (Qdrant/     │
│   Replica)    │       │               │       │   Weaviate)   │
└───────────────┘       └───────────────┘       └───────────────┘

8.2 高可用配置

YAML
# dify-ha-deployment.yaml
# Dify高可用部署配置

apiVersion: apps/v1
kind: Deployment
metadata:
  name: dify-api
  labels:
    app: dify-api
spec:
  replicas: 3
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  selector:
    matchLabels:
      app: dify-api
  template:
    metadata:
      labels:
        app: dify-api
    spec:
      # 反亲和性:确保Pod分散在不同节点
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app
                  operator: In
                  values:
                  - dify-api
              topologyKey: kubernetes.io/hostname
      containers:
      - name: dify-api
        image: langgenius/dify-api:latest
        ports:
        - containerPort: 5001
        env:
        - name: DB_USERNAME
          valueFrom:
            secretKeyRef:
              name: dify-db-secret
              key: username
        - name: DB_PASSWORD
          valueFrom:
            secretKeyRef:
              name: dify-db-secret
              key: password
        - name: DB_HOST
          value: "postgres-primary.dify.svc.cluster.local"
        - name: REDIS_HOST
          value: "redis-cluster.dify.svc.cluster.local"
        - name: CELERY_BROKER_URL
          value: "redis://redis-cluster.dify.svc.cluster.local:6379/1"
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 5001
          initialDelaySeconds: 60
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 5001
          initialDelaySeconds: 30
          periodSeconds: 5
        volumeMounts:
        - name: app-storage
          mountPath: /app/storage
      volumes:
      - name: app-storage
        persistentVolumeClaim:
          claimName: dify-storage-pvc

---
# PostgreSQL高可用配置(使用PostgreSQL Operator或 Patroni)
apiVersion: v1
kind: ConfigMap
metadata:
  name: postgres-ha-config
data:
  postgresql.conf: |
    listen_addresses = '*'
    max_connections = 200
    shared_buffers = 256MB
    wal_level = replica
    max_wal_senders = 3
    synchronous_commit = on

8.3 自动扩缩容配置

YAML
# dify-hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: dify-api-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: dify-api
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: dify_active_conversations
      target:
        type: AverageValue
        averageValue: "100"
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60
    scaleUp:
      stabilizationWindowSeconds: 0
      policies:
      - type: Percent
        value: 100
        periodSeconds: 15
      - type: Pods
        value: 4
        periodSeconds: 15
      selectPolicy: Max

8.4 监控与告警

YAML
# prometheus-rules.yaml
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: dify-alerts
  labels:
    release: prometheus
spec:
  groups:
  - name: dify-alerts
    rules:
    - alert: DifyHighErrorRate
      expr: |
        sum(rate(http_requests_total{job="dify-api",status=~"5.."}[5m]))
        / sum(rate(http_requests_total{job="dify-api"}[5m])) > 0.05
      for: 5m
      labels:
        severity: critical
      annotations:
        summary: "Dify API高错误率"
        description: "Dify API 5xx错误率超过5%"

    - alert: DifyHighLatency
      expr: |
        histogram_quantile(0.95, sum(rate(http_request_duration_seconds_bucket{job="dify-api"}[5m])) by (le)) > 5
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "Dify API高延迟"
        description: "Dify API P95延迟超过5秒"

    - alert: DifyLLMTokenUsageHigh
      expr: |
        sum(rate(dify_llm_tokens_total[1h])) > 1000000
      for: 10m
      labels:
        severity: warning
      annotations:
        summary: "LLM Token使用量高"
        description: "每小时Token使用量超过100万"

8.5 备份与恢复

Python
#!/usr/bin/env python3
"""
Dify企业级备份脚本
支持:数据库备份、知识库备份、配置备份
"""

import os
import subprocess
import boto3
from datetime import datetime
from typing import Dict, List
import json

class DifyBackupManager:
    """Dify备份管理器"""

    def __init__(self, config: Dict):
        self.config = config
        self.s3_client = boto3.client(
            's3',
            endpoint_url=config.get('s3_endpoint'),
            aws_access_key_id=config['aws_access_key'],
            aws_secret_access_key=config['aws_secret_key'],
        )
        self.backup_bucket = config['backup_bucket']

    def backup_postgresql(self) -> str:
        """备份PostgreSQL数据库"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = f"/tmp/dify_db_{timestamp}.sql"

        # 使用pg_dump备份
        cmd = [
            "pg_dump",
            "-h", self.config['db_host'],
            "-U", self.config['db_user'],
            "-d", self.config['db_name'],
            "-F", "c",  # 自定义格式,支持并行恢复
            "-f", backup_file
        ]

        env = os.environ.copy()
        env['PGPASSWORD'] = self.config['db_password']

        result = subprocess.run(cmd, env=env, capture_output=True)

        if result.returncode == 0:
            print(f"数据库备份成功: {backup_file}")
            return backup_file
        else:
            raise Exception(f"数据库备份失败: {result.stderr}")

    def backup_vector_db(self) -> str:
        """备份向量数据库"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = f"/tmp/dify_vectors_{timestamp}.json"

        # 使用Qdrant/Weaviate的导出API
        # 这里以Qdrant为例
        import requests

        qdrant_url = self.config['qdrant_url']
        collections = requests.get(f"{qdrant_url}/collections").json()

        all_data = {}
        for collection in collections['result']['collections']:
            name = collection['name']
            # 导出collection数据
            points = requests.post(
                f"{qdrant_url}/collections/{name}/points/scroll",
                json={"limit": 10000}
            ).json()
            all_data[name] = points['result']

        with open(backup_file, 'w') as f:
            json.dump(all_data, f)

        print(f"向量库备份成功: {backup_file}")
        return backup_file

    def backup_storage(self) -> str:
        """备份文件存储"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = f"/tmp/dify_storage_{timestamp}.tar.gz"

        storage_path = self.config['storage_path']

        cmd = [
            "tar", "-czf", backup_file,
            "-C", os.path.dirname(storage_path),
            os.path.basename(storage_path)
        ]

        result = subprocess.run(cmd, capture_output=True)

        if result.returncode == 0:
            print(f"存储备份成功: {backup_file}")
            return backup_file
        else:
            raise Exception(f"存储备份失败: {result.stderr}")

    def upload_to_s3(self, file_path: str, backup_type: str) -> str:
        """上传备份到S3"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        object_key = f"dify-backups/{backup_type}/{os.path.basename(file_path)}"

        self.s3_client.upload_file(file_path, self.backup_bucket, object_key)

        print(f"上传成功: s3://{self.backup_bucket}/{object_key}")
        return object_key

    def full_backup(self) -> Dict[str, str]:
        """执行完整备份"""
        results = {}

        try:
            # 1. 数据库备份
            db_backup = self.backup_postgresql()
            results['database'] = self.upload_to_s3(db_backup, 'database')
            os.remove(db_backup)

            # 2. 向量库备份
            vector_backup = self.backup_vector_db()
            results['vector_db'] = self.upload_to_s3(vector_backup, 'vector_db')
            os.remove(vector_backup)

            # 3. 存储备份
            storage_backup = self.backup_storage()
            results['storage'] = self.upload_to_s3(storage_backup, 'storage')
            os.remove(storage_backup)

            # 保存备份清单
            manifest = {
                'timestamp': datetime.now().isoformat(),
                'backups': results,
                'version': '1.0.0'
            }
            manifest_key = f"dify-backups/manifests/manifest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
            self.s3_client.put_object(
                Bucket=self.backup_bucket,
                Key=manifest_key,
                Body=json.dumps(manifest)
            )
            results['manifest'] = manifest_key

            print("完整备份成功!")
            return results

        except Exception as e:
            print(f"备份失败: {str(e)}")
            raise

    def restore(self, manifest_key: str) -> bool:
        """从备份恢复"""
        try:
            # 下载清单
            manifest_obj = self.s3_client.get_object(
                Bucket=self.backup_bucket,
                Key=manifest_key
            )
            manifest = json.loads(manifest_obj['Body'].read())

            print(f"开始恢复,备份时间: {manifest['timestamp']}")

            # 恢复数据库
            if 'database' in manifest['backups']:
                db_key = manifest['backups']['database']
                db_file = f"/tmp/restore_{os.path.basename(db_key)}"
                self.s3_client.download_file(
                    self.backup_bucket, db_key, db_file
                )
                # 执行恢复
                cmd = [
                    "pg_restore",
                    "-h", self.config['db_host'],
                    "-U", self.config['db_user'],
                    "-d", self.config['db_name'],
                    "-c",  # 清理现有数据
                    db_file
                ]
                subprocess.run(cmd, env={'PGPASSWORD': self.config['db_password']})
                os.remove(db_file)
                print("数据库恢复完成")

            print("恢复完成!")
            return True

        except Exception as e:
            print(f"恢复失败: {str(e)}")
            return False

# 使用示例
if __name__ == "__main__":
    config = {
        'db_host': 'postgres.dify.svc.cluster.local',
        'db_user': 'postgres',
        'db_password': 'your_password',
        'db_name': 'dify',
        'qdrant_url': 'http://qdrant.dify.svc.cluster.local:6333',
        'storage_path': '/data/dify/storage',
        's3_endpoint': 'https://s3.amazonaws.com',
        'aws_access_key': 'your_access_key',
        'aws_secret_key': 'your_secret_key',
        'backup_bucket': 'dify-backups'
    }

    backup_manager = DifyBackupManager(config)
    results = backup_manager.full_backup()
    print(f"备份结果: {results}")

9. 下一步

继续学习07-实战项目,通过完整项目巩固所学知识。


最后更新日期: 2026-03-26 适用版本: Dify 实战教程 v2026

⚠️ 核验说明(2026-03-26):本页已纳入 2026-03-26 全站统一复核批次。若文中涉及外部模型、API、版本号、价格或第三方产品名称,请以官方文档和实际运行环境为准。