Wexflow REST API深度解析:如何通过API管理所有工作流
Wexflow REST API深度解析:如何通过API管理所有工作流
【免费下载链接】wexflowWorkflow Automation Engine项目地址: https://gitcode.com/gh_mirrors/we/wexflow
Wexflow是一个功能强大的工作流自动化引擎,其REST API提供了完整的程序化访问能力,让开发者能够通过代码全面管理所有工作流。本文将深入解析Wexflow REST API的核心功能和使用方法,帮助您快速掌握通过API自动化管理工作流的技巧。🚀
Wexflow REST API概述
Wexflow的REST API是一个基于HTTP的完整接口,运行在默认端口8000上,为开发者提供了对工作流引擎的全面控制能力。通过API,您可以实现工作流的创建、启动、监控、暂停、恢复和删除等所有操作,完全无需手动操作Web界面。
Wexflow的API设计遵循RESTful原则,使用标准的HTTP方法(GET、POST、PUT、DELETE)和JSON格式进行数据交换。所有API请求都需要通过JWT令牌进行身份验证,确保系统安全。
核心API端点详解
🔐 身份验证API
Wexflow使用基于JWT的认证机制。首先需要通过登录API获取访问令牌:
POST /api/v1/login Content-Type: application/json { "username": "admin", "password": "wexflow2018", "stayConnected": false }成功登录后,服务器会返回一个JWT令牌,后续所有API请求都需要在Authorization头中携带此令牌。
📊 工作流管理API
1. 获取工作流列表
GET /api/v1/search?s=关键词&page=1&pageSize=10 Authorization: Bearer <token>这个API支持分页和关键词搜索,返回所有工作流的详细信息,包括状态、创建时间、最后修改时间等。
2. 获取特定工作流详情
GET /api/v1/workflow?w=工作流ID Authorization: Bearer <token>3. 启动工作流
POST /api/v1/start?w=工作流ID Authorization: Bearer <token>启动工作流时可以传递变量:
POST /api/v1/start-vars Content-Type: application/json Authorization: Bearer <token> { "workflowId": 1, "variables": { "inputFile": "/path/to/file.txt", "outputFormat": "json" } }4. 停止工作流
POST /api/v1/stop?w=工作流ID&i=实例ID Authorization: Bearer <token>5. 暂停与恢复工作流
POST /api/v1/suspend?w=工作流ID&i=实例ID POST /api/v1/resume?w=工作流ID&i=实例ID📈 监控与统计API
1. 获取状态统计
GET /api/v1/status-count Authorization: Bearer <token>返回各种状态的工作流数量统计,包括运行中、已完成、失败、挂起等状态。
2. 获取执行历史
GET /api/v1/search-history-entries-by-page-order-by Authorization: Bearer <token>支持按时间范围、状态等条件筛选历史执行记录。
3. 实时状态更新(Server-Sent Events)
GET /api/v1/sse/{workflowId}/{jobId}Wexflow支持SSE(Server-Sent Events),允许客户端实时接收工作流状态更新,无需轮询。
🛠️ 工作流设计API
1. 获取工作流XML定义
GET /api/v1/xml/{工作流ID} Authorization: Bearer <token>返回工作流的XML定义,这是Wexflow工作流的原生格式。
2. 保存工作流
POST /api/v1/save-workflow Content-Type: application/json Authorization: Bearer <token> { "xml": "<Workflow>...</Workflow>" }3. 验证工作流XML
POST /api/v1/is-xml-workflow-valid Content-Type: application/json Authorization: Bearer <token> { "xml": "<Workflow>...</Workflow>" }👥 用户与权限管理API
1. 用户管理
GET /api/v1/search-users POST /api/v1/insert-user PUT /api/v1/update-user DELETE /api/v1/delete-user2. 工作流权限分配
GET /api/v1/get-user-workflows?userId=用户ID POST /api/v1/save-user-workflows Content-Type: application/json { "userId": "用户ID", "workflowIds": [1, 2, 3] }实际应用场景示例
场景1:自动化部署流水线
假设您需要自动化部署流程,可以通过Wexflow API实现:
import requests # 1. 登录获取令牌 response = requests.post( "http://localhost:8000/api/v1/login", json={"username": "admin", "password": "wexflow2018"} ) token = response.json()["access_token"] headers = {"Authorization": f"Bearer {token}"} # 2. 启动部署工作流 deploy_response = requests.post( "http://localhost:8000/api/v1/start-vars", headers=headers, json={ "workflowId": 101, "variables": { "sourcePath": "/build/artifact", "targetServer": "production", "backupEnabled": "true" } } ) # 3. 监控执行状态 status_response = requests.get( "http://localhost:8000/api/v1/status-count", headers=headers )场景2:批量数据处理
对于需要处理大量文件的场景:
// 批量启动图片处理工作流 async function batchProcessImages(imageFiles) { const token = await login(); for (const imageFile of imageFiles) { await startWorkflow(42, { inputImage: imageFile.path, outputFormat: 'webp', quality: 85 }); // 等待处理完成 await waitForCompletion(42); } }场景3:系统监控与告警
#!/bin/bash # 监控工作流状态并发送告警 TOKEN=$(curl -s -X POST http://localhost:8000/api/v1/login \ -H "Content-Type: application/json" \ -d '{"username":"admin","password":"wexflow2018"}' | jq -r '.access_token') # 获取失败的工作流 FAILED_COUNT=$(curl -s -H "Authorization: Bearer $TOKEN" \ http://localhost:8000/api/v1/status-count | jq '.failed') if [ "$FAILED_COUNT" -gt 0 ]; then # 发送告警 curl -X POST http://localhost:8000/api/v1/notify \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{ "message": "有工作流执行失败,请立即检查!", "type": "error" }' fiAPI最佳实践与性能优化
1. 连接管理
- 使用连接池减少连接建立开销
- 合理设置超时时间
- 实现重试机制处理网络波动
2. 认证优化
- 缓存JWT令牌,避免频繁登录
- 实现令牌自动刷新机制
- 使用环境变量存储敏感信息
3. 错误处理
def call_wexflow_api(endpoint, data=None): try: response = requests.post( f"http://localhost:8000/api/v1/{endpoint}", headers=headers, json=data, timeout=30 ) response.raise_for_status() return response.json() except requests.exceptions.Timeout: logger.error("API请求超时") return None except requests.exceptions.RequestException as e: logger.error(f"API请求失败: {e}") return None4. 批量操作优化
对于需要批量启动多个工作流的场景,建议:
- 使用异步调用避免阻塞
- 控制并发数量,避免系统过载
- 实现队列机制处理大量请求
安全注意事项
1. 访问控制
- 为不同用户分配最小必要权限
- 定期审计API访问日志
- 实现IP白名单限制
2. 数据传输安全
- 在生产环境启用HTTPS
- 使用强密码策略
- 定期轮换JWT密钥
3. 输入验证
// 验证工作流ID function validateWorkflowId(id) { if (!Number.isInteger(id) || id <= 0) { throw new Error('无效的工作流ID'); } return true; } // 验证变量参数 function validateVariables(vars) { if (typeof vars !== 'object' || vars === null) { throw new Error('变量必须是对象类型'); } return true; }故障排查与调试
常见问题解决方案
认证失败
- 检查用户名密码是否正确
- 验证JWT令牌是否过期
- 确认用户有相应权限
工作流启动失败
- 检查工作流ID是否存在
- 验证输入变量格式
- 查看服务器日志获取详细错误信息
性能问题
- 监控API响应时间
- 优化数据库查询
- 考虑使用缓存机制
调试工具推荐
- 使用curl测试API
# 测试登录API curl -X POST http://localhost:8000/api/v1/login \ -H "Content-Type: application/json" \ -d '{"username":"admin","password":"wexflow2018"}' # 测试工作流列表API curl -H "Authorization: Bearer <token>" \ http://localhost:8000/api/v1/search- 使用Postman或Insomnia
- 创建API测试集合
- 设置环境变量
- 自动化测试脚本
高级功能与扩展
1. 自定义Webhook集成
Wexflow API可以轻松集成到现有系统中:
class WexflowWebhookHandler: def __init__(self, api_url, token): self.api_url = api_url self.headers = {"Authorization": f"Bearer {token}"} def handle_webhook(self, event_type, data): """处理外部系统的Webhook事件""" if event_type == "file_uploaded": # 启动文件处理工作流 self.start_workflow(201, { "filePath": data["file_path"], "processType": data["process_type"] }) elif event_type == "data_updated": # 触发数据同步工作流 self.start_workflow(202, { "source": data["source"], "target": data["target"] })2. 工作流编排
通过API可以实现复杂的工作流编排:
// 编排多个工作流执行 async function orchestrateWorkflows() { // 步骤1:数据提取 const extractJobId = await startWorkflow(101, { source: "database", query: "SELECT * FROM data" }); // 步骤2:数据处理(等待步骤1完成) await waitForWorkflowCompletion(101, extractJobId); const processJobId = await startWorkflow(102, { inputFile: "/output/extracted.csv", processRules: "clean,transform,validate" }); // 步骤3:结果导出 await waitForWorkflowCompletion(102, processJobId); await startWorkflow(103, { processedFile: "/output/processed.csv", exportFormat: "json" }); }3. 监控仪表板集成
将Wexflow监控数据集成到现有仪表板:
// 实时监控面板 class WexflowDashboard { private statusCountInterval: NodeJS.Timeout; startRealTimeMonitoring() { // 每30秒更新一次状态 this.statusCountInterval = setInterval(async () => { const status = await this.getStatusCount(); this.updateDashboard(status); }, 30000); } async getStatusCount() { const response = await fetch( `${this.apiBaseUrl}/status-count`, { headers: this.headers } ); return response.json(); } }总结
Wexflow REST API提供了强大而灵活的工作流管理能力,通过标准化的HTTP接口,您可以轻松地将工作流自动化集成到任何系统中。无论是简单的文件处理任务,还是复杂的业务流程编排,Wexflow API都能满足您的需求。
关键要点总结:
- 全面覆盖:API覆盖了工作流生命周期的所有操作
- 易于集成:基于REST标准,支持各种编程语言
- 实时监控:支持SSE实时状态更新
- 安全可靠:基于JWT的身份验证和细粒度权限控制
- 扩展性强:支持自定义变量和Webhook集成
通过合理利用Wexflow REST API,您可以构建出高效、可靠的工作流自动化系统,大幅提升业务处理效率。无论是开发人员还是系统管理员,掌握Wexflow API都将为您的工作带来极大的便利。
立即开始使用Wexflow API,让您的工作流管理更加智能高效!💪
【免费下载链接】wexflowWorkflow Automation Engine项目地址: https://gitcode.com/gh_mirrors/we/wexflow
创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考