基于CAMEL 的Workforce 实现多智能体协同工作系统

文章目录

    • 一、workforce 简介
      • 1.架构设计
      • 2.通信机制
    • 二、workforce 工作流程图示例
      • 1.用户角色
      • 2.工作流程
    • 三、workforce 中重要函数说明
      • 1.`__init__`函数
      • 2.`add_single_agent_worker` 函数
      • 3.`add_role_playing_worker` 函数
      • 4.`add_workforce` 函数
    • 四、基于workforce实现多智能体协调
      • (1)创建一个 Workforce 实例
      • (2)worker定义
      • (3)添加到Workforce 的工作节点
      • (4)创建一个任务
      • (5)启动 Workforce 的任务处理流程
      • 完整示例代码
    • 五、遇到问题

一、workforce 简介

Workforce是CAMEL框架中的一个多智能体协同工作系统。它以一种简洁的方式让多个智能体协作完成任务,类似于一个高效的团队合作系统。

1.架构设计

Workforce采用层级架构设计。一个workforce可以包含多个工作节点(worker nodes),每个工作节点可以包含一个或多个智能体作为工作者。工作节点由workforce内部的**协调智能体(coordinator agent)**管理,协调智能体根据工作节点的描述及其工具集来分配任务。

2.通信机制

Workforce内部的通信基于任务通道(task channel)。Workforce初始化时会创建一个所有节点共享的通道。任务会被发布到这个通道中,每个工作节点会监听通道,接受分配给它的任务并解决。当任务完成后,工作节点会将结果发布回通道,结果会作为其他任务的"依赖项"保留在通道中,供所有工作节点共享。

通过这种机制,workforce能够以协作和高效的方式解决任务。

二、workforce 工作流程图示例

如图,展示了如何通过以下智能体协作完成一个请求,即“创建一个产品的登录页面”

在这里插入图片描述

1.用户角色

  • Root Node (Manager):作为系统的管理者,负责接收任务并协调任务的分解和分配。

  • Coordinator Agent (协调智能体)Task Manager Agent (任务管理智能体):管理任务分解、依赖关系、分发任务,以及监控任务完成情况。

  • Leaf Nodes (Workers):执行任务的实际智能体,分别承担不同的角色(如“内容撰写者”和“代码撰写者”)。

2.工作流程

(a) 用户需求的接收

用户发出任务请求(例如“创建一个登录页面”,图中 1)。

Coordinator Agent 接收需求,作为入口点。

(b) 任务分解与定义

Coordinator Agent 通过任务分解策略,将请求拆分为多个子任务(如任务 A.1 和 A.2),并定义:

这些任务被送到 Task Manager Agent 进行分发(图中 2 和 3)。

© 任务的分配

Task Manager Agent 将任务分发到 Channel(图中 4),这是一个任务管理中枢。

任务按角色需求分配到 Leaf Nodes (Workers),包括:

(d) Leaf Nodes 执行任务

内容撰写者 (Content Writer)

代码撰写者 (Code Writer)

(e) 结果整合与返回

Coordinator Agent 汇总所有任务结果(如 A.1 和 A.2 的结果)。

将完整的任务结果返回给用户(图中 18)。

三、workforce 中重要函数说明

1.__init__函数

    def __init__(self,description: str,children: Optional[List[BaseNode]] = None,coordinator_agent_kwargs: Optional[Dict] = None,task_agent_kwargs: Optional[Dict] = None,new_worker_agent_kwargs: Optional[Dict] = None,) -> None:super().__init__(description)self._child_listening_tasks: Deque[asyncio.Task] = deque()self._children = children or []self.new_worker_agent_kwargs = new_worker_agent_kwargscoord_agent_sys_msg = BaseMessage.make_assistant_message(role_name="Workforce Manager",content="You are coordinating a group of workers. A worker can be ""a group of agents or a single agent. Each worker is ""created to solve a specific kind of task. Your job ""includes assigning tasks to a existing worker, creating ""a new worker for a task, etc.",)self.coordinator_agent = ChatAgent(coord_agent_sys_msg, **(coordinator_agent_kwargs or {}))task_sys_msg = BaseMessage.make_assistant_message(role_name="Task Planner",content="You are going to compose and decompose tasks.",)self.task_agent = ChatAgent(task_sys_msg, **(task_agent_kwargs or {}))# If there is one, will set by the workforce class wrapping thisself._task: Optional[Task] = Noneself._pending_tasks: Deque[Task] = deque()

其中coordinator_agent的本质为ChatAgent,role_name=“Workforce Manager”,对应的提示词为:

"You are coordinating a group of workers. A worker can be ""a group of agents or a single agent. Each worker is ""created to solve a specific kind of task. Your job ""includes assigning tasks to a existing worker, creating ""a new worker for a task, etc."

task_agentt的本质也是ChatAgent,role_name=“Task Planner”,对应的提示词为:

"You are going to compose and decompose tasks."

2.add_single_agent_worker 函数

    @check_if_running(False)def add_single_agent_worker(self, description: str, worker: ChatAgent) -> Workforce:r"""Add a worker node to the workforce that uses a single agent.Args:description (str): Description of the worker node.worker (ChatAgent): The agent to be added.Returns:Workforce: The workforce node itself."""worker_node = SingleAgentWorker(description, worker)self._children.append(worker_node)return self

调用add_single_agent_worker时会添加单个工作节点worker_node.

3.add_role_playing_worker 函数

    def add_role_playing_worker(self,description: str,assistant_role_name: str,user_role_name: str,assistant_agent_kwargs: Optional[Dict] = None,user_agent_kwargs: Optional[Dict] = None,chat_turn_limit: int = 3,) -> Workforce:r"""Add a worker node to the workforce that uses `RolePlaying` system.Args:description (str): Description of the node.assistant_role_name (str): The role name of the assistant agent.user_role_name (str): The role name of the user agent.assistant_agent_kwargs (Optional[Dict], optional): The keywordarguments to initialize the assistant agent in the roleplaying, like the model name, etc. Defaults to `None`.user_agent_kwargs (Optional[Dict], optional): The keyword argumentsto initialize the user agent in the role playing, like themodel name, etc. Defaults to `None`.chat_turn_limit (int, optional): The maximum number of chat turnsin the role playing. Defaults to 3.Returns:Workforce: The workforce node itself."""worker_node = RolePlayingWorker(description,assistant_role_name,user_role_name,assistant_agent_kwargs,user_agent_kwargs,chat_turn_limit,)self._children.append(worker_node)return self

调用add_role_playing_worker时会添加RolePlayingWorker的工作节点.

4.add_workforce 函数

	@check_if_running(False)def add_workforce(self, workforce: Workforce) -> Workforce:r"""Add a workforce node to the workforce.Args:workforce (Workforce): The workforce node to be added.Returns:Workforce: The workforce node itself."""self._children.append(workforce)return self

调用add_workforce时会添加workforce类型的工作节点.

四、基于workforce实现多智能体协调

关键步骤为

(1)创建一个 Workforce 实例

workforce = Workforce(description="旅游攻略制作与评估工作组",new_worker_agent_kwargs={'model':model},coordinator_agent_kwargs={'model':model},task_agent_kwargs={'model':model})

(2)worker定义

planner_agent = ChatAgent(system_message="""你是一个专业的旅行规划师。你的职责是:1. 根据景点分布规划合理的游览顺序2. 为每天安排适量的景点和活动3. 考虑用餐、休息等时间4. 注意不同季节的特点请确保行程安排合理且具有可行性。""",model=model,output_language='中文')

(3)添加到Workforce 的工作节点

workforce.add_single_agent_worker("负责制定详细行程规划",worker=planner_agent
)

(4)创建一个任务

from camel.tasks import Task# 创建一个用于测试的任务
task = Task(content="规划一个3天的巴黎旅行计划。",id="0",  # id可以是任何标记字符串
)

(5)启动 Workforce 的任务处理流程

task = workforce.process_task(task)

完整示例代码

from camel.agents import ChatAgent
from camel.models import ModelFactory
from camel.types import ModelPlatformType
from camel.messages import BaseMessage
from camel.societies.workforce import Workforce
from camel.toolkits import SearchToolkit
from camel.tasks import Task
from camel.toolkits import FunctionTool
from camel.configs import SiliconFlowConfig  # 关键from dotenv import load_dotenv
import osload_dotenv()api_key = os.getenv('Siliconflow_API_KEY')model = ModelFactory.create(model_platform=ModelPlatformType.SILICONFLOW,model_type="Qwen/Qwen2.5-72B-Instruct",model_config_dict=SiliconFlowConfig(temperature=0.2).as_dict(),api_key=api_key
)# 创建一个 Workforce 实例
workforce = Workforce(description="旅游攻略制作与评估工作组",new_worker_agent_kwargs={'model':model},coordinator_agent_kwargs={'model':model},task_agent_kwargs={'model':model})search_tool = FunctionTool(SearchToolkit().search_duckduckgo)search_agent = ChatAgent(system_message="""你是一个专业的旅游信息搜索助手。你的职责是:1. 搜索目的地的主要景点信息2. 搜索当地特色美食信息3. 搜索交通和住宿相关信息请确保信息的准确性和实用性。""",model=model,tools=[search_tool],output_language='中文')planner_agent = ChatAgent(system_message="""你是一个专业的旅行规划师。你的职责是:1. 根据景点分布规划合理的游览顺序2. 为每天安排适量的景点和活动3. 考虑用餐、休息等时间4. 注意不同季节的特点请确保行程安排合理且具有可行性。""",model=model,output_language='中文')reviewer_agent = ChatAgent(system_message="""你是一个经验丰富的旅行爱好者。你的职责是:1. 从游客角度评估行程的合理性2. 指出可能的问题和改进建议3. 补充实用的旅行小贴士4. 评估行程的性价比请基于实际旅行经验给出中肯的建议。""",model=model,output_language='中文'
)# 添加工作节点
workforce.add_single_agent_worker("负责搜索目的地相关信息",worker=search_agent
).add_single_agent_worker("负责制定详细行程规划",worker=planner_agent
).add_single_agent_worker("负责从游客角度评估行程",worker=reviewer_agent
)from camel.tasks import Task# 创建一个用于测试的任务
task = Task(content="规划一个3天的巴黎旅行计划。",id="0",  # id可以是任何标记字符串
)task = workforce.process_task(task)print(task.result)

五、遇到问题

关键报错代码

If you have a specific format in mind or a particular context for this number, please let me know!
Traceback (most recent call last):File "/home/allyoung/camel_course/03-2-33-Workforce.py", line 84, in <module>task = workforce.process_task(task)File "/home/allyoung/camel/camel/societies/workforce/utils.py", line 69, in wrapperreturn func(self, *args, **kwargs)File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 153, in process_taskasyncio.run(self.start())File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/asyncio/runners.py", line 44, in runreturn loop.run_until_complete(main)File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/asyncio/base_events.py", line 641, in run_until_completereturn future.result()File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 469, in startawait self._listen_to_channel()File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 437, in _listen_to_channelawait self._post_ready_tasks()File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 402, in _post_ready_tasksassignee_id = self._find_assignee(task=ready_task)File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 288, in _find_assigneeresult_dict = json.loads(response.msg.content)File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/__init__.py", line 346, in loadsreturn _default_decoder.decode(s)File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/decoder.py", line 337, in decodeobj, end = self.raw_decode(s, idx=_w(s, 0).end())File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/decoder.py", line 355, in raw_decoderaise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/74155.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

每日一题力扣2974.最小数字游戏c++

2974. 最小数字游戏 - 力扣&#xff08;LeetCode&#xff09; class Solution { public:vector<int> numberGame(vector<int>& nums) {vector<int> arr(nums.size());sort(nums.begin(),nums.end());for(size_t i0;i<nums.size();i2){arr[i]nums[i1]…

对接马来西亚、印度、韩国、越南等全球金融数据示例

Python对接StockTV全球金融数据API的封装实现及使用教程&#xff1a; import requests import websockets import asyncio from typing import Dict, List, Optional, Union from datetime import datetimeclass StockTVClient:"""StockTV全球金融数据API客户端…

Adobe After Effects 操作

Adobe After Effects &#xff08;AE&#xff09;可以实现将多个元素进行合成&#xff0c;实现特殊效果。AE的项目文件是aep&#xff0c;可以将素材、层、效果等一切信息&#xff0c;保存在这个项目文件中。 AE的原理&#xff0c;和PS的原理非常类似。 操作界面 操作界面如…

【React】基于自定义Hook提取公共逻辑

目录 自定义Hook自定义Hook 1自定义Hook 2使用 注意事项 自定义Hook 作用&#xff1a;提取封装一些公共的处理逻辑 玩法&#xff1a;创建一个函数&#xff0c;名字需要是 useXxx &#xff0c;后期就可以在组件中调用这个方法&#xff01; 自定义Hook 1 页面加载的时候修改浏…

AUTOSAR与arxml的文档解析

如下是文档脑图 一、文档概述 该文档是 AUTOSAR 经典平台的应用接口用户指南&#xff0c;主要解释 **Al Table&#xff08;应用接口表&#xff09;** 的结构、方法论及相关技术细节&#xff0c;帮助开发者理解如何通过标准化接口实现软件组件的互操作性。 关键内容 目的&#…

油候插件、idea、VsCode插件推荐(自用)

开发软件&#xff1a; 之前的文章&#xff1a; 开发必装最实用工具软件与网站 推荐一下我使用的开发工具 目前在用的 油候插件 AC-baidu-重定向优化百度搜狗谷歌必应搜索_favicon_双列 让查询变成多列&#xff0c;而且可以流式翻页 Github 增强 - 高速下载 github下载 TimerHo…

阿里云平台服务器操作以及发布静态项目

目录&#xff1a; 1、云服务器介绍2、云服务器界面3、发布静态项目1、启动nginx2、ngixn访问3、外网访问测试4、拷贝静态资源到nginx目录下并重启nginx 1、云服务器介绍 2、云服务器界面 实例详情&#xff1a;里面主要显示云服务的内外网地址以及一些启动/停止的操作。监控&…

Spring Cache 实战指南

redis中常见的问题 前言 在本文中&#xff0c;我们将探讨 Redis 在缓存中的应用&#xff0c;并解决一些常见的缓存问题。为了简化理解&#xff0c;本文中的一些配置是直接写死的&#xff0c;实际项目中建议将这些配置写入配置文件&#xff0c;并通过配置文件读取。 一、为什…

区块链开发技术公司:引领数字经济的创新力量

在数字化浪潮席卷全球的今天&#xff0c;区块链技术作为新兴技术的代表&#xff0c;正以其独特的去中心化、不可篡改和透明性等特点&#xff0c;深刻改变着各行各业的发展格局。区块链开发技术公司&#xff0c;作为这一领域的先锋和推动者&#xff0c;正不断研发创新&#xff0…

EJS缓存解决多页面相同闪动问题

基于 EJS 的模板引擎特性及其缓存机制&#xff0c;以下是关于缓存相同模块的详细解答&#xff1a; 一、EJS 缓存机制的核心能力 模板编译缓存 EJS 默认会将编译后的模板函数缓存在内存中&#xff0c;当相同模板文件被多次渲染时&#xff0c;会直接复用已编译的模板函数&#x…

多条件排序(C# and Lua)

C# 升序排序 OrderBy 按升序对序列的元素进行排序 ThenBy 按升序对序列中的元素执行后续排序 降序排序 OrderByDescending 按降序对序列的元素排序 ThenByDescending 按降序对序列中的元素执行后续排序 public class Fruit {public int id;public string name;publi…

React19源码系列之Hooks(useId)

useId的介绍 https://zh-hans.react.dev/reference/react/useId useId 是 React 18 引入的一个新 Hook&#xff0c;主要用于生成全局唯一的 ID。在开发中&#xff0c;我们经常需要为元素&#xff08;如表单元素、模态框等&#xff09;生成唯一 ID&#xff0c;以便在 JavaScri…

经典面试题:C/C++中static关键字的三大核心作用与实战应用

一、修饰局部变量&#xff1a;改变生命周期&#xff0c;保留跨调用状态 核心作用&#xff1a; ​延长生命周期&#xff1a;将局部变量从栈区移至静态存储区&#xff08;数据段或BSS段&#xff09;&#xff0c;生命周期与程序一致​保留状态&#xff1a;变量在函数多次调用间保…

Redisson 分布式锁原理

加锁原理 # 如果锁不存在 if (redis.call(exists, KEYS[1]) 0) then# hash结构,锁名称为key,线程唯一标识为itemKey&#xff0c;itemValue为一个计数器。支持相同客户端线程可重入,每次加锁计数器1.redis.call(hincrby, KEYS[1], ARGV[2], 1);# 设置过期时间redis.call(pexpi…

【数据结构】栈与队列:基础 + 竞赛高频算法实操(含代码实现)

什么是栈&#xff1f;什么是队列&#xff1f; 什么是先进后出&#xff1f;什么是先进先出&#xff1f; 了解基础之后&#xff0c;又如何用来写算法题&#xff1f; 带着这些疑问&#xff0c;让我带领你&#xff0c;走进栈与队列的世界 栈与队列 栈&#xff1a; 1、栈的基本…

单元化架构在字节跳动的落地实践

资料来源&#xff1a;火山引擎-开发者社区 什么是单元化 单元化的核心理念是将业务按照某种维度划分成一个个单元&#xff0c; 理想情况下每个单元内部都是完成所有业务操作的自包含集合&#xff0c;能独立处理业务流程&#xff0c;各个单元均有其中一部分数据&#xff0c;所有…

基于Python的垃圾短信分类

垃圾短信分类 1 垃圾短信分类问题介绍 1.1 垃圾短信 随着移动互联科技的高速发展&#xff0c;信息技术在不断改变着我们的生活&#xff0c;让我们的生活更方便&#xff0c;其中移动通信技术己经在我们生活起到至关重要的作用&#xff0c;与我们每个人人息息相关。短信作为移…

leetcode1971.寻找图中是否存在路径

初尝并查集&#xff0c;直接套用模板 class Solution { private:vector<int> father;void init() {for(int i0;i<father.size();i)father[i]i;}int find(int v) {return vfather[v]?v:father[v]find(father[v]);//路径压缩}bool isSame(int u,int v){ufind(u);vfind…

QAI AppBuilder 快速上手(7):目标检测应用实例

YOLOv8_det是YOLO 系列目标检测模型&#xff0c;专为高效、准确地检测图像中的物体而设计。该模型通过引入新的功能和改进点&#xff0c;如因式分解卷积&#xff08;factorized convolutions&#xff09;和批量归一化&#xff08;batch normalization&#xff09;&#xff0c;在…

景联文科技:以高质量数据标注推动人工智能领域创新与发展

在当今这个由数据驱动的时代&#xff0c;高质量的数据标注对于推动机器学习、自然语言处理&#xff08;NLP&#xff09;、计算机视觉等领域的发展具有不可替代的重要性。数据标注过程涉及对原始数据进行加工&#xff0c;通过标注特定对象的特征来生成能够被机器学习模型识别和使…