06 - 部署与发布¶
应用部署、API集成、监控管理
📖 章节概述¶
本章将深入介绍如何部署和发布Dify应用,包括云端部署、本地部署、API集成、监控管理、性能优化以及安全配置。通过详细的代码示例和实践指导,帮助读者掌握应用部署和发布的核心技能。
🎯 学习目标¶
完成本章后,你将能够:
- 深入理解不同的部署方式和适用场景
- 掌握Docker和Kubernetes部署方法
- 熟练集成和使用Dify API
- 理解应用监控和日志管理
- 能够优化应用性能和安全性
- 掌握版本管理和灰度发布技巧
1. 部署方式¶
1.1 云端部署¶
1.1.1 Docker部署¶
技术原理: Docker容器化部署可以确保应用在不同环境中的一致性,简化部署流程。
代码示例 - Dockerfile:
Docker
# Dify本地部署推荐使用官方docker-compose方式
# 以下为基于Dify API的自定义应用的Dockerfile示例
FROM python:3.11-slim # FROM指定基础镜像
# 设置工作目录
WORKDIR /app # WORKDIR设置工作目录
# 安装系统依赖
RUN apt-get update && apt-get install -y \ # RUN在构建时执行命令
gcc \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt . # COPY将文件复制到镜像中
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 5001 # EXPOSE声明容器监听的端口
# 启动命令
CMD ["gunicorn", "--bind", "0.0.0.0:5001", "--workers", "4", "app:app"] # CMD容器启动时执行的默认命令
代码示例 - docker-compose.yml:
YAML
# docker-compose.yml
# Dify官方推荐使用官方docker-compose部署
# 参考: https://github.com/langgenius/dify/blob/main/docker/docker-compose.yaml
# 以下为基于Dify API的自定义应用的docker-compose示例
# 注意: Docker Compose V2 不再需要 version 字段
services: # services定义各个服务容器
# 自定义应用服务
app:
build: .
ports:
- "5001:5001"
environment:
- DIFY_API_KEY=your-dify-api-key
- DIFY_BASE_URL=https://api.dify.ai/v1
restart: unless-stopped
# 如需本地部署Dify,请使用官方docker-compose文件
# git clone https://github.com/langgenius/dify.git
# cd dify/docker
# cp .env.example .env
# docker compose up -d
代码示例 - 部署脚本:
Python
#!/usr/bin/env python3
"""
Dify应用部署脚本
"""
import os
import subprocess
import requests
from typing import Dict, Optional
class DifyDeployer:
"""Dify部署器"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.dify.ai/v1"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def deploy_to_cloud(self, app_id: str,
environment: str = "production") -> Dict:
"""
部署到云端
注意:Dify不提供通过API部署应用的接口
应用发布需在Dify Web界面完成:应用 -> 发布
云端版本(cloud.dify.ai)创建并发布应用后即可通过API访问
"""
print(f"请在Dify Web界面发布应用 {app_id} 到 {environment} 环境")
return {"message": "请在Dify Web界面发布应用"}
def get_deployment_status(self, app_id: str) -> Dict:
"""
获取部署状态
注意:Dify不提供部署状态查询API
可通过调用应用API验证应用是否正常运行
"""
url = f"{self.base_url}/parameters"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return {"status": "running", "details": response.json()}
except requests.exceptions.RequestException as e:
return {"status": "error", "error": str(e)}
def check_app_health(self, app_id: str) -> Dict:
"""检查应用健康状态"""
# 通过发送测试请求检查应用是否正常
url = f"{self.base_url}/parameters"
try:
response = requests.get(url, headers=self.headers)
return {"healthy": response.status_code == 200}
except requests.exceptions.RequestException:
return {"healthy": False}
class DockerDeployer:
"""Docker部署器"""
def __init__(self, project_name: str):
self.project_name = project_name
def build_image(self, dockerfile_path: str = "Dockerfile") -> bool:
"""
构建Docker镜像
Args:
dockerfile_path: Dockerfile路径
"""
try:
cmd = [
"docker", "build",
"-t", f"{self.project_name}:latest",
"-f", dockerfile_path,
"."
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"镜像构建成功: {self.project_name}:latest")
return True
else:
print(f"镜像构建失败: {result.stderr}")
return False
except Exception as e:
print(f"构建镜像时出错: {str(e)}")
return False
def run_container(self, port: int = 5001,
env_vars: Dict = None) -> bool:
"""
运行Docker容器
Args:
port: 端口号(Dify API默认端口5001)
env_vars: 环境变量
"""
try:
cmd = ["docker", "run", "-d", "-p", f"{port}:5001"]
# 添加环境变量
if env_vars:
for key, value in env_vars.items():
cmd.extend(["-e", f"{key}={value}"])
cmd.append(f"{self.project_name}:latest")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
container_id = result.stdout.strip()
print(f"容器运行成功: {container_id}")
print(f"访问地址: http://localhost:{port}")
return True
else:
print(f"容器运行失败: {result.stderr}")
return False
except Exception as e:
print(f"运行容器时出错: {str(e)}")
return False
def deploy_with_compose(self, compose_file: str = "docker-compose.yml") -> bool:
"""
使用docker-compose部署
Args:
compose_file: docker-compose文件路径
"""
try:
# 停止现有容器
subprocess.run(
["docker", "compose", "-f", compose_file, "down"],
capture_output=True
)
# 构建并启动
cmd = ["docker", "compose", "-f", compose_file, "up", "-d", "--build"]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("部署成功!")
return True
else:
print(f"部署失败: {result.stderr}")
return False
except Exception as e:
print(f"部署时出错: {str(e)}")
return False
def get_container_logs(self, container_name: Optional[str] = None) -> str:
"""
获取容器日志
Args:
container_name: 容器名称
"""
try:
if container_name:
cmd = ["docker", "logs", container_name]
else:
cmd = ["docker", "compose", "logs"]
result = subprocess.run(cmd, capture_output=True, text=True)
return result.stdout
except Exception as e:
return f"获取日志时出错: {str(e)}"
def stop_container(self, container_name: Optional[str] = None) -> bool:
"""
停止容器
Args:
container_name: 容器名称
"""
try:
if container_name:
cmd = ["docker", "stop", container_name]
else:
cmd = ["docker", "compose", "stop"]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("容器停止成功")
return True
else:
print(f"容器停止失败: {result.stderr}")
return False
except Exception as e:
print(f"停止容器时出错: {str(e)}")
return False
# 使用示例
if __name__ == "__main__":
# 部署到云端
cloud_deployer = DifyDeployer(api_key="your_api_key_here")
result = cloud_deployer.deploy_to_cloud(app_id="app_id_here")
print(f"云端部署结果: {result}")
# Docker部署
docker_deployer = DockerDeployer(project_name="my-dify-app")
# 构建镜像
docker_deployer.build_image()
# 运行容器
env_vars = {
"DATABASE_URL": "postgresql://user:pass@db:5432/dify",
"REDIS_URL": "redis://redis:6379/0",
"SECRET_KEY": "your-secret-key"
}
docker_deployer.run_container(port=5000, env_vars=env_vars)
# 使用docker-compose部署
docker_deployer.deploy_with_compose()
1.1.2 Kubernetes部署¶
技术原理: Kubernetes提供容器编排、自动扩缩容、负载均衡等功能,适合大规模生产环境。
代码示例 - Kubernetes配置:
YAML
# deployment.yaml
apiVersion: apps/v1 # apiVersion指定K8s API版本
kind: Deployment # kind指定资源类型
metadata:
name: dify-web
labels:
app: dify-web
spec: # spec定义资源的期望状态
replicas: 3
selector:
matchLabels:
app: dify-web
template:
metadata:
labels:
app: dify-web
spec:
containers:
- name: dify-web
image: my-dify-app:latest
ports:
- containerPort: 5000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: dify-secrets
key: database-url
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: dify-secrets
key: redis-url
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: dify-secrets
key: secret-key
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 5000
initialDelaySeconds: 10
periodSeconds: 5
--- # YAML文档分隔符
# service.yaml
apiVersion: v1
kind: Service
metadata:
name: dify-web-service
spec:
selector:
app: dify-web
ports:
- protocol: TCP
port: 80
targetPort: 5000
type: LoadBalancer
---
# hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: dify-web-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: dify-web
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
代码示例 - Kubernetes部署脚本:
Python
#!/usr/bin/env python3
"""
Kubernetes部署脚本
"""
import subprocess
from typing import Dict, List
class K8sDeployer:
"""Kubernetes部署器"""
def __init__(self, namespace: str = "default"):
self.namespace = namespace
def apply_manifest(self, manifest_path: str) -> bool:
"""
应用Kubernetes配置
Args:
manifest_path: 配置文件路径
"""
try:
cmd = [
"kubectl", "apply",
"-f", manifest_path,
"-n", self.namespace
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"配置应用成功: {manifest_path}")
return True
else:
print(f"配置应用失败: {result.stderr}")
return False
except Exception as e:
print(f"应用配置时出错: {str(e)}")
return False
def get_pods(self) -> List[Dict]:
"""获取Pod列表"""
try:
cmd = [
"kubectl", "get", "pods",
"-n", self.namespace,
"-o", "json"
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
import json
data = json.loads(result.stdout)
return data.get("items", [])
else:
return []
except Exception as e:
print(f"获取Pod列表时出错: {str(e)}")
return []
def get_pod_logs(self, pod_name: str,
tail: int = 100) -> str:
"""
获取Pod日志
Args:
pod_name: Pod名称
tail: 日志行数
"""
try:
cmd = [
"kubectl", "logs",
pod_name,
"-n", self.namespace,
"--tail", str(tail)
]
result = subprocess.run(cmd, capture_output=True, text=True)
return result.stdout
except Exception as e:
return f"获取日志时出错: {str(e)}"
def scale_deployment(self, deployment_name: str,
replicas: int) -> bool:
"""
扩缩容Deployment
Args:
deployment_name: Deployment名称
replicas: 副本数量
"""
try:
cmd = [
"kubectl", "scale",
"deployment", deployment_name,
f"--replicas={replicas}",
"-n", self.namespace
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"扩缩容成功: {deployment_name} -> {replicas}")
return True
else:
print(f"扩缩容失败: {result.stderr}")
return False
except Exception as e:
print(f"扩缩容时出错: {str(e)}")
return False
def rollout_restart(self, deployment_name: str) -> bool:
"""
重启Deployment
Args:
deployment_name: Deployment名称
"""
try:
cmd = [
"kubectl", "rollout",
"restart", "deployment", deployment_name,
"-n", self.namespace
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"重启成功: {deployment_name}")
return True
else:
print(f"重启失败: {result.stderr}")
return False
except Exception as e:
print(f"重启时出错: {str(e)}")
return False
def get_deployment_status(self, deployment_name: str) -> Dict:
"""
获取Deployment状态
Args:
deployment_name: Deployment名称
"""
try:
cmd = [
"kubectl", "get", "deployment", deployment_name,
"-n", self.namespace,
"-o", "json"
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
import json
data = json.loads(result.stdout)
status = data.get("status", {})
return {
"ready_replicas": status.get("readyReplicas", 0),
"available_replicas": status.get("availableReplicas", 0),
"updated_replicas": status.get("updatedReplicas", 0),
"replicas": status.get("replicas", 0)
}
else:
return {}
except Exception as e:
print(f"获取状态时出错: {str(e)}")
return {}
# 使用示例
if __name__ == "__main__":
deployer = K8sDeployer(namespace="dify")
# 应用配置
deployer.apply_manifest("deployment.yaml")
deployer.apply_manifest("service.yaml")
deployer.apply_manifest("hpa.yaml")
# 获取Pod列表
pods = deployer.get_pods()
print(f"当前Pod数量: {len(pods)}")
# 获取部署状态
status = deployer.get_deployment_status("dify-web")
print(f"部署状态: {status}")
# 扩缩容
deployer.scale_deployment("dify-web", replicas=5)
# 重启
deployer.rollout_restart("dify-web")
1.2 本地部署¶
技术原理: 本地部署适用于数据敏感、需要完全控制、或测试开发场景。
代码示例 - 本地部署脚本:
Python
#!/usr/bin/env python3
"""
本地部署脚本
"""
import os
import subprocess
import shutil
from typing import Dict, Optional
class LocalDeployer:
"""本地部署器"""
def __init__(self, install_dir: str = "~/dify"):
self.install_dir = os.path.expanduser(install_dir)
def clone_repository(self, repo_url: str = "https://github.com/langgenius/dify.git",
branch: str = "main") -> bool:
"""
克隆代码仓库
Args:
repo_url: 仓库URL
branch: 分支名称
"""
try:
# 创建目录
os.makedirs(self.install_dir, exist_ok=True)
# 克隆仓库
cmd = [
"git", "clone",
"-b", branch,
repo_url,
self.install_dir
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print(f"代码克隆成功: {self.install_dir}")
return True
else:
print(f"代码克隆失败: {result.stderr}")
return False
except Exception as e:
print(f"克隆代码时出错: {str(e)}")
return False
def install_dependencies(self) -> bool:
"""安装依赖"""
try:
# 安装Python依赖
requirements_file = os.path.join(self.install_dir, "requirements.txt")
if os.path.exists(requirements_file):
cmd = ["pip", "install", "-r", requirements_file]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"依赖安装失败: {result.stderr}")
return False
print("依赖安装成功")
return True
except Exception as e:
print(f"安装依赖时出错: {str(e)}")
return False
def configure_environment(self, config: Dict) -> bool:
"""
配置环境
Args:
config: 配置字典
"""
try:
# 创建.env文件
env_file = os.path.join(self.install_dir, ".env")
with open(env_file, 'w') as f:
for key, value in config.items():
f.write(f"{key}={value}\n")
print(f"环境配置成功: {env_file}")
return True
except Exception as e:
print(f"配置环境时出错: {str(e)}")
return False
def initialize_database(self) -> bool:
"""
初始化数据库
注意:Dify使用Flask框架,数据库迁移使用flask db命令
本地部署推荐使用docker-compose方式,自动处理数据库初始化
"""
try:
# Dify使用Flask和Alembic进行数据库迁移
cmd = ["flask", "db", "upgrade"]
result = subprocess.run(cmd, cwd=self.install_dir, capture_output=True, text=True)
if result.returncode != 0:
print(f"数据库初始化失败: {result.stderr}")
return False
print("数据库初始化成功")
return True
except Exception as e:
print(f"初始化数据库时出错: {str(e)}")
return False
def start_services(self) -> bool:
"""
启动服务
注意:Dify本地部署推荐使用docker-compose方式
API服务默认端口为5001
"""
try:
# 使用docker-compose启动服务
cmd = ["docker", "compose", "up", "-d"]
result = subprocess.run(
cmd,
cwd=os.path.join(self.install_dir, "docker"),
capture_output=True,
text=True
)
if result.returncode == 0:
print("服务启动成功")
print("访问地址: http://localhost")
print("API地址: http://localhost:5001")
return True
else:
print(f"服务启动失败: {result.stderr}")
return False
except Exception as e:
print(f"启动服务时出错: {str(e)}")
return False
def stop_services(self) -> bool:
"""停止服务"""
try:
cmd = ["docker", "compose", "down"]
subprocess.run(
cmd,
cwd=os.path.join(self.install_dir, "docker"),
capture_output=True
)
print("服务停止成功")
return True
except Exception as e:
print(f"停止服务时出错: {str(e)}")
return False
def check_status(self) -> Dict:
"""检查服务状态"""
try:
# 检查端口是否监听
cmd = "netstat -tuln | grep 5001"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
is_running = "LISTEN" in result.stdout
return {
"running": is_running,
"install_dir": self.install_dir
}
except Exception as e:
return {
"running": False,
"error": str(e)
}
# 使用示例
if __name__ == "__main__":
deployer = LocalDeployer(install_dir="~/dify-local")
# 克隆代码
deployer.clone_repository()
# 安装依赖
deployer.install_dependencies()
# 配置环境
config = {
"DATABASE_URL": "postgresql://dify:dify@localhost:5432/dify",
"REDIS_URL": "redis://localhost:6379/0",
"SECRET_KEY": "your-secret-key-here",
"OPENAI_API_KEY": "your-openai-api-key"
}
deployer.configure_environment(config)
# 初始化数据库
deployer.initialize_database()
# 启动服务
deployer.start_services()
# 检查状态
status = deployer.check_status()
print(f"服务状态: {status}")
2. API集成¶
2.1 API调用¶
代码示例 - 完整的API客户端:
Python
import requests
import json
from typing import Dict, List, Optional, Generator
import time
class DifyAPIClient:
"""Dify API客户端"""
def __init__(self, api_key: str, base_url: str = "https://api.dify.ai/v1"):
"""
初始化API客户端
Args:
api_key: API密钥
base_url: API基础URL
"""
self.api_key = api_key
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# ========== 聊天API ==========
def chat(self, app_id: str, query: str,
conversation_id: Optional[str] = None,
user: str = "default_user",
inputs: Dict = None) -> Dict:
"""
发送聊天消息
Args:
app_id: 应用ID
query: 用户消息
conversation_id: 会话ID
user: 用户ID
inputs: 额外输入
"""
url = f"{self.base_url}/chat-messages"
payload = {
"inputs": inputs or {},
"query": query,
"response_mode": "blocking",
"conversation_id": conversation_id,
"user": user
}
try:
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def chat_stream(self, app_id: str, query: str,
conversation_id: Optional[str] = None,
user: str = "default_user",
inputs: Dict = None) -> Generator[str, None, None]:
"""
流式聊天
Args:
app_id: 应用ID
query: 用户消息
conversation_id: 会话ID
user: 用户ID
inputs: 额外输入
"""
url = f"{self.base_url}/chat-messages"
payload = {
"inputs": inputs or {},
"query": query,
"response_mode": "streaming",
"conversation_id": conversation_id,
"user": user
}
try:
response = requests.post(
url,
headers=self.headers,
json=payload,
stream=True
)
response.raise_for_status()
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = json.loads(line[6:]) # 切片操作:[start:end:step]提取子序列
if data.get('event') == 'message':
yield data.get('answer', '')
elif data.get('event') == 'message_end':
break
except requests.exceptions.RequestException as e:
yield f"Error: {str(e)}"
# ========== 文本补全API ==========
def completion(self, app_id: str,
user: str = "default_user",
inputs: Dict = None) -> Dict:
"""
文本补全(用于Completion类型应用)
Args:
app_id: 应用ID
user: 用户ID
inputs: 输入参数字典,键值对应应用定义的变量
"""
url = f"{self.base_url}/completion-messages"
payload = {
"inputs": inputs or {},
"response_mode": "blocking",
"user": user
}
try:
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def completion_stream(self, app_id: str,
user: str = "default_user",
inputs: Dict = None) -> Generator[str, None, None]:
"""
流式文本补全
Args:
app_id: 应用ID
user: 用户ID
inputs: 输入参数字典
"""
url = f"{self.base_url}/completion-messages"
payload = {
"inputs": inputs or {},
"response_mode": "streaming",
"user": user
}
try:
response = requests.post(
url,
headers=self.headers,
json=payload,
stream=True
)
response.raise_for_status()
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = json.loads(line[6:])
if data.get('event') == 'message':
yield data.get('answer', '')
elif data.get('event') == 'message_end':
break
except requests.exceptions.RequestException as e:
yield f"Error: {str(e)}"
# ========== 工作流API ==========
def run_workflow(self, app_id: str, inputs: Dict,
user: str = "default_user") -> Dict:
"""
运行工作流
Args:
app_id: 应用ID
inputs: 输入参数
user: 用户ID
"""
url = f"{self.base_url}/workflows/run"
payload = {
"inputs": inputs,
"response_mode": "blocking",
"user": user
}
try:
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# ========== 对话历史API ==========
def get_conversation_history(self, conversation_id: str,
limit: int = 20) -> Dict:
"""
获取对话历史
Args:
conversation_id: 会话ID
limit: 返回数量限制
"""
url = f"{self.base_url}/messages"
params = {
"conversation_id": conversation_id,
"limit": limit
}
try:
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def delete_conversation(self, conversation_id: str,
user: str = "default_user") -> Dict:
"""
删除对话
Args:
conversation_id: 会话ID
user: 用户标识(必填)
"""
url = f"{self.base_url}/conversations/{conversation_id}"
try:
response = requests.delete(
url, headers=self.headers,
json={"user": user}
)
response.raise_for_status()
return {"message": "删除成功"}
except requests.exceptions.RequestException as e:
return {"error": str(e)}
# ========== 文件上传API ==========
def upload_file(self, file_path: str, user: str = "default_user") -> Dict:
"""
上传文件
Args:
file_path: 文件路径
user: 用户ID
"""
url = f"{self.base_url}/files/upload"
headers = {
"Authorization": f"Bearer {self.api_key}"
}
try:
with open(file_path, 'rb') as f: # with自动管理资源,确保文件正确关闭
files = {'file': f}
data = {'user': user}
response = requests.post(url, headers=headers, files=files, data=data)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": str(e)}
# ========== 参数验证和错误处理 ==========
def validate_response(self, response: Dict) -> bool:
"""
验证API响应
Args:
response: API响应
"""
if "error" in response:
print(f"API错误: {response['error']}")
return False
return True
def get_usage_info(self, response: Dict) -> Dict:
"""
获取使用信息
Args:
response: API响应
"""
metadata = response.get("metadata", {})
usage = metadata.get("usage", {})
return {
"total_tokens": usage.get("total_tokens", 0),
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0)
}
# 使用示例
if __name__ == "__main__":
client = DifyAPIClient(api_key="your_api_key_here")
# 聊天
response = client.chat(
app_id="app_id_here",
query="你好!"
)
if client.validate_response(response):
print(f"回复: {response.get('answer', '')}")
usage = client.get_usage_info(response)
print(f"使用情况: {usage}")
# 流式聊天
print("\n流式回复:")
for chunk in client.chat_stream(
app_id="app_id_here",
query="介绍一下Python"
):
print(chunk, end="", flush=True)
print()
# 工作流
response = client.run_workflow(
app_id="app_id_here",
inputs={"input_text": "测试输入"}
)
if client.validate_response(response):
outputs = response.get("data", {}).get("outputs", {})
print(f"工作流输出: {outputs}")
2.2 SDK集成¶
代码示例 - Python SDK封装:
Python
"""
Dify Python SDK
"""
from typing import Dict, List, Optional, Generator, Any
import requests
import json
class DifySDK:
"""Dify SDK"""
def __init__(self, api_key: str, base_url: str = "https://api.dify.ai/v1"):
"""
初始化SDK
Args:
api_key: API密钥
base_url: API基础URL
"""
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
})
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()
# ========== 装饰器 ==========
def handle_errors(func):
"""错误处理装饰器"""
def wrapper(self, *args, **kwargs): # *args接收任意位置参数;**kwargs接收任意关键字参数
try:
return func(self, *args, **kwargs)
except requests.exceptions.RequestException as e:
return {"error": str(e)}
except json.JSONDecodeError as e:
return {"error": f"JSON解析错误: {str(e)}"}
except Exception as e:
return {"error": f"未知错误: {str(e)}"}
return wrapper
# ========== 应用信息 ==========
# 注意:Dify Service API使用App-specific API Key,
# 每个Key对应一个应用,不存在list_apps接口
@handle_errors
def get_app_info(self) -> Dict:
"""获取当前应用基本信息"""
url = f"{self.base_url}/info"
response = self.session.get(url)
response.raise_for_status()
return response.json()
@handle_errors
def get_app_parameters(self) -> Dict:
"""获取当前应用参数信息"""
url = f"{self.base_url}/parameters"
response = self.session.get(url)
response.raise_for_status()
return response.json()
# ========== 聊天 ==========
@handle_errors
def chat(self, app_id: str, query: str,
conversation_id: Optional[str] = None,
user: str = "default_user",
inputs: Dict = None) -> Dict:
"""聊天"""
url = f"{self.base_url}/chat-messages"
payload = {
"inputs": inputs or {},
"query": query,
"response_mode": "blocking",
"conversation_id": conversation_id,
"user": user
}
response = self.session.post(url, json=payload)
response.raise_for_status()
return response.json()
def chat_stream(self, app_id: str, query: str,
conversation_id: Optional[str] = None,
user: str = "default_user",
inputs: Dict = None) -> Generator[str, None, None]:
"""流式聊天"""
url = f"{self.base_url}/chat-messages"
payload = {
"inputs": inputs or {},
"query": query,
"response_mode": "streaming",
"conversation_id": conversation_id,
"user": user
}
with self.session.post(url, json=payload, stream=True) as response:
response.raise_for_status()
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = json.loads(line[6:]) # json.loads将JSON字符串转为Python对象
if data.get('event') == 'message':
yield data.get('answer', '') # yield生成器:惰性产出值,节省内存
elif data.get('event') == 'message_end':
break
# ========== 工作流 ==========
@handle_errors
def run_workflow(self, app_id: str, inputs: Dict,
user: str = "default_user") -> Dict:
"""运行工作流"""
url = f"{self.base_url}/workflows/run"
payload = {
"inputs": inputs,
"response_mode": "blocking",
"user": user
}
response = self.session.post(url, json=payload)
response.raise_for_status()
return response.json()
# ========== 批量操作 ==========
def batch_chat(self, app_id: str, queries: List[str],
user: str = "default_user") -> List[Dict]:
"""批量聊天"""
results = []
for query in queries:
result = self.chat(app_id, query, user=user)
results.append(result)
return results
def batch_run_workflow(self, app_id: str, inputs_list: List[Dict],
user: str = "default_user") -> List[Dict]:
"""批量运行工作流"""
results = []
for inputs in inputs_list:
result = self.run_workflow(app_id, inputs, user=user)
results.append(result)
return results
# 使用示例
if __name__ == "__main__":
# 使用上下文管理器
with DifySDK(api_key="your_api_key_here") as sdk:
# 获取应用信息
app_info = sdk.get_app_info()
print(f"应用信息: {app_info}")
# 获取应用参数
params = sdk.get_app_parameters()
print(f"应用参数: {params}")
# 聊天
response = sdk.chat(
app_id="app_id_here",
query="你好!"
)
print(f"回复: {response.get('answer', '')}")
# 流式聊天
print("\n流式回复:")
for chunk in sdk.chat_stream(
app_id="app_id_here",
query="介绍一下Python"
):
print(chunk, end="", flush=True)
print()
# 批量操作
queries = ["你好", "介绍一下AI", "再见"]
results = sdk.batch_chat("app_id_here", queries)
print(f"\n批量结果: {len(results)} 个回复")
3. 监控管理¶
3.1 性能监控¶
代码示例 - 监控客户端:
Python
import requests
import time
from typing import Dict, List
from datetime import datetime
class DifyMonitor:
"""Dify监控器"""
def __init__(self, api_key: str):
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}"
}
self.base_url = "https://api.dify.ai/v1"
def get_app_metrics(self, app_id: str) -> Dict:
"""
获取应用指标
注意:Dify Service API不提供应用指标查询接口
应用指标可在Dify Web控制台的"分析"页面查看
以下通过调用应用API作为健康检查
Args:
app_id: 应用ID
"""
url = f"{self.base_url}/parameters"
try: # try/except捕获异常
response = requests.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def get_conversation_metrics(self, app_id: str,
start_time: datetime = None,
end_time: datetime = None) -> Dict:
"""
获取对话指标
注意:Dify Service API不提供对话统计指标接口
可通过获取对话列表来计算指标
Args:
app_id: 应用ID
start_time: 开始时间
end_time: 结束时间
"""
url = f"{self.base_url}/conversations"
params = {}
if start_time:
params["start_time"] = start_time.isoformat()
if end_time:
params["end_time"] = end_time.isoformat()
try:
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def monitor_response_time(self, app_id: str, query: str) -> Dict:
"""
监控响应时间
Args:
app_id: 应用ID
query: 测试查询
"""
url = f"{self.base_url}/chat-messages"
start_time = time.time()
try:
response = requests.post(
url,
headers=self.headers,
json={
"inputs": {},
"query": query,
"response_mode": "blocking",
"user": "monitor"
}
)
end_time = time.time()
response_time = end_time - start_time
return {
"response_time": response_time,
"status": "success" if response.status_code == 200 else "error",
"status_code": response.status_code
}
except requests.exceptions.RequestException as e:
end_time = time.time()
return {
"response_time": end_time - start_time,
"status": "error",
"error": str(e)
}
def get_token_usage(self, app_id: str,
start_time: datetime = None,
end_time: datetime = None) -> Dict:
"""
获取Token使用情况
注意:Dify Service API不提供 Token 用量统计接口
Token用量可在Dify Web控制台的"分析"页面查看
代码中可通过每次API调用的响应中的metadata.usage字段来累计
Args:
app_id: 应用ID
start_time: 开始时间
end_time: 结束时间
"""
# 由于没有专门的Token统计API,可通过获取对话列表间接估算
url = f"{self.base_url}/conversations"
params = {}
if start_time:
params["start_time"] = start_time.isoformat()
if end_time:
params["end_time"] = end_time.isoformat()
try:
response = requests.get(url, headers=self.headers, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def generate_report(self, app_id: str) -> Dict:
"""
生成监控报告
Args:
app_id: 应用ID
"""
# 获取各种指标
metrics = self.get_app_metrics(app_id)
token_usage = self.get_token_usage(app_id)
# 测试响应时间
response_time = self.monitor_response_time(app_id, "测试查询")
return {
"app_id": app_id,
"timestamp": datetime.now().isoformat(),
"metrics": metrics,
"token_usage": token_usage,
"response_time": response_time
}
# 使用示例
if __name__ == "__main__":
monitor = DifyMonitor(api_key="your_api_key_here")
# 获取应用指标
metrics = monitor.get_app_metrics(app_id="app_id_here")
print(f"应用指标: {metrics}")
# 监控响应时间
response_time = monitor.monitor_response_time(
app_id="app_id_here",
query="你好"
)
print(f"响应时间: {response_time}")
# 获取Token使用情况
from datetime import datetime, timedelta
end_time = datetime.now()
start_time = end_time - timedelta(days=7)
token_usage = monitor.get_token_usage(
app_id="app_id_here",
start_time=start_time,
end_time=end_time
)
print(f"Token使用: {token_usage}")
# 生成报告
report = monitor.generate_report(app_id="app_id_here")
print(f"监控报告: {report}")
4. 练习题¶
基础练习¶
- 部署应用
- Docker部署
- 测试应用
- 验证功能
进阶练习¶
- 生产环境部署
- Kubernetes部署
- 配置负载均衡
- 实现自动扩缩容
5. 最佳实践¶
✅ 推荐做法¶
- 充分测试
- 测试所有功能
- 压力测试
-
安全测试
-
监控告警
- 设置监控指标
- 配置告警规则
- 及时响应问题
❌ 避免做法¶
- 忽视安全
- 实施认证授权
- 加密敏感数据
- 定期安全审计
6. 常见问题¶
Q1: 如何实现灰度发布?¶
A: 实现方法: - 使用多个版本的应用 - 通过负载均衡分配流量 - 逐步增加新版本流量 - 监控关键指标
Q2: 如何处理API限流?¶
A: 处理方法: - 实现请求队列 - 使用指数退避 - 缓存常用请求 - 优化请求频率
7. 总结¶
本章深入介绍了部署与发布的核心内容,包括:
- 部署方式:云端部署、本地部署
- API集成:完整API客户端、SDK封装
- 监控管理:性能监控、日志管理
通过本章的学习,你应该能够熟练部署和管理Dify应用了。
8. 下一步¶
继续学习07-实战项目,通过完整项目巩固所学知识。
最后更新日期:2026-02-12 适用版本:Dify实战教程 v2026