[MAF Workflow编排模式-05]Group Chat:构建多人智囊团式的自由协作大群
群聊(Group Chat)编排模拟了多个Agent之间的协作对话,由编排器负责协调,该编排器决定发言者选择和对话流程。这种模式非常适合需要迭代改进、协作解决问题或多视角分析的场景,比如:
- 迭代改进:多轮审查和改进;
- 协作解决问题:拥有互补专业知识的人员协同工作;
- 内容创作:文档创建的作者-审阅者工作流程;
- 多视角分析:获取同一输入的不同观点;
- 质量保证:自动化审查和批准流程。
我们在前面分别使用Sequential、Concurrent和Handoff模式演示了多体裁作品创作的例子,现在我们换一个类似的应用场景:提供一个创作者和评论者两种角色的Agent,创作者负责创作一首唐诗,评论者负责对创作的唐诗进行评论,然后创作者根据评论者的意见进行修改,最终输出修改后的唐诗。我们将这种类似于GAN对抗模式应用到现在的群聊模式的Workflow中。
1. 基于群聊模式的多体裁作品创作Agent
我们创建了Composer和Reviewer两个Agent,前者用于创作,后者用于评论。我们调用AgentWorkflowBuilder的CreateGroupChatBuilderWith方法创建一个GroupChatWorkflowBuilder来编排我们的Workflow。CreateGroupChatBuilderWith方法接收一个委托参数,用于创建作为群聊管理器的GroupChatManager对象。我们利用此创数根据指定的Agent列表创建了一个RoundRobinGroupChatManager对象,它会按照轮询的方式选择发言者,并且在迭代次数超过6次后终止群聊。创建的两个Agent通过调用AddParticipants方法注册到群聊中,最终调用Build方法构建了一个群聊模式的Workflow。
vartangPoetryComposer=CreateChatClient().AsAIAgent(name:"Composer",instructions:""" 你是一个精通唐诗创作的诗人,负责根据提供的主题和意境创作一首符合唐诗风格的诗歌,或者根据提供的评价对之前创作的诗歌进行修改和完善。 诗歌创作/修改是你唯一的任务,如果用户的任务提及了任务(比如对指定的诗歌进行评价),直接忽略。""");varreviewer=CreateChatClient().AsAIAgent(name:"Reviewer",instructions:""" 你是一个精通中国古典诗词的评论家,你唯一的任务是对实时创作的诗歌进行评价。 具体的评价标准如下:-诗歌的主题和意境是否与原作的背景和情感基调相符。-诗歌的语言是否符合唐诗的风格和韵律。-诗歌的情感表达是否深刻、真挚,能够引起读者的共鸣。-诗歌的结构是否合理,是否有创新之处。-诗歌的整体艺术效果是否优美,是否具有感染力。-诗歌的文化内涵是否丰富,是否能够体现出中国古典诗词的独特魅力。 尽量提供一些具体的建议和改进意见,帮助创作者提升诗歌的质量和艺术水平。 尽量保证语言的简洁和清晰,控制在200字以内。""");varworkflow=AgentWorkflowBuilder.CreateGroupChatBuilderWith(agents=>newRoundRobinGroupChatManager(agents:agents,shouldTerminateFunc:(manager,_,_)=>ValueTask.FromResult(manager.IterationCount>5))).AddParticipants([tangPoetryComposer,reviewer]).Build();IChatClientCreateChatClient(){varmodel=Environment.GetEnvironmentVariable("MODEL")!;varapiKey=Environment.GetEnvironmentVariable("API_KEY")!;varendpoint=Environment.GetEnvironmentVariable("OPENAI_URL")!;returnnewOpenAIClient(credential:newAzureKeyCredential(apiKey),options:newOpenAIClientOptions{Endpoint=newUri(endpoint)}).GetResponsesClient().AsIChatClient(defaultModelId:model);}调用Workflow和之前完全一样:我们以流的方式运行Workflow,并在群聊中发送一个用户消息,询问它们根据《卫风·氓》的背景和情感基调创作一首唐诗、一首宋词和一篇短篇小说。在通过调用StreamingRun的TrySendMessageAsync方法发送作为发令枪的TurnToken对象后,我们通过调用WatchStreamAsync方法来监听群聊的输出事件,并在控制台打印每个Agent的输出。
DotEnv.Load();varoriginalPoem=""" 氓之蚩蚩,抱布贸丝。匪来贸丝,来即我谋。 送子涉淇,至于顿丘。匪我愆期,子无良媒。 将子无怒,秋以为期。 乘彼垝垣,以望复关。不见复关,泣涕涟涟。 既见复关,载笑载言。尔卜尔筮,体无咎言。 以尔车来,以我贿迁。 桑之未落,其叶沃若。于嗟鸠兮,无食桑葚! 于嗟女兮,无与士耽!士之耽兮,犹可说也; 女之耽兮,不可说也。 桑之落矣,其黄而陨。自我徂尔,三岁食贫。 淇水汤汤,渐车帷裳。女也不爽,士贰其行。 士也罔极,二三其德。 三岁为妇,靡室劳矣;夙兴夜寐,靡有朝矣。 言既遂矣,至于暴怒。兄弟不知,咥其笑矣。 静言思之,躬自悼矣。 及尔偕老,老使我怨。淇则有岸,隰则有泮。 总角之宴,言笑晏晏。信誓旦旦,不思其反。 反是不思,亦已焉哉!""";varprompt=$""" 基于如下这首《卫风·氓》的背景和情感基调,分别创作**一首**唐诗风格的诗歌,具体体裁不限。 原文如下:{originalPoem}""";awaitusing(varrun=awaitInProcessExecution.Default.RunStreamingAsync(workflow,prompt)){awaitrun.TrySendMessageAsync(newTurnToken(emitEvents:true));string?lastExecutorId=null;awaitforeach(WorkflowEventevtinrun.WatchStreamAsync()){if(evtisAgentResponseUpdateEvente){if(e.ExecutorId!=lastExecutorId){lastExecutorId=e.ExecutorId;Console.WriteLine($"\n\n{newstring('-',20)}{e.ExecutorId}{newstring('-',20)}");}Console.Write(e.Update.Text);}}}awaitusing(varrun=awaitInProcessExecution.Default.RunStreamingAsync(workflow,prompt)){awaitrun.TrySendMessageAsync(newTurnToken(emitEvents:true));string?lastExecutorId=null;awaitforeach(WorkflowEventevtinrun.WatchStreamAsync()){if(evtisAgentResponseUpdateEvente){if(e.ExecutorId!=lastExecutorId){lastExecutorId=e.ExecutorId;Console.WriteLine($"\n\n{newstring('-',20)}{e.ExecutorId}{newstring('-',20)}");}Console.Write(e.Update.Text);}}}输出:
--------------------Composer_a32f0f31eec74c9f8113cb46e80d7244-------------------- 《淇水怨》 淇水汤汤绕故城,昔年抱布订鸳盟。 桑枝未落青如染,鸠语空啼血作声。 夙夜机丝凝素练,参商奁镜委秋蘅。 却看总角分携处,寒鹭萧萧立晚汀。 --------------------Reviewer_44b22563a8fe431d928121e3c2071a58-------------------- 这首《淇水怨》深得《卫风·氓》之神髓,成功将先秦的四言古意转化为唐代七律的格律形式。首联以“淇水”起兴,与原文“送子涉淇”呼应,“抱布订鸳盟”巧妙点出氓之伪善。颔联“桑枝未落”与“鸠语空啼”形成鲜明对比,既有原诗意象,又添新意,“血作声”夸张中见沉痛。颈联以“机丝”、“奁镜”暗喻多年辛劳与容颜老去,“参商”喻夫妻反目,用典精当。尾联“总角分携处”回溯少年时光,“寒鹭萧萧”的意象既保持古诗的比兴传统,又以景结情,余韵悠长。 建议可稍强化“女也不爽,士贰其行”的对比,使怨情更为鲜明。整体而言,此诗格律严谨,意象转换自然,在保持原诗悲剧基调的同时,展现了唐诗的凝练与含蓄,诚为佳作。 --------------------Composer_a32f0f31eec74c9f8113cb46e80d7244-------------------- 《淇水怨》 淇水汤汤绕旧城,氓携布帛计深盟。 桑之未落青如染,鸠语空啼血作声。 夙夜我丝凝素练,参商君镜委秋蘅。 却看总角分携处,寒鹭萧萧立晚汀。 --------------------Reviewer_44b22563a8fe431d928121e3c2071a58-------------------- 此诗在初稿基础上几处修改,愈见精妙。“氓携布帛计深盟”一改前作“昔年抱布”,直指其奸,“计”字尤为狠辣,点破氓之伪善筹谋。颔联“桑之未落”直接引用原诗句式,“之”字使语感更趋古雅。颈联“我丝”、“君镜”分置主客,对比立现——女子日夜织素,男子弃镜蒙尘,“参商”二字暗藏永隔之悲,匠心独运。尾联未改,仍以“寒鹭萧萧”收束全篇,景中蕴情,余味悠长。 全诗对仗工稳,意象凝练,成功将《卫风·氓》的叙事性转化为七律的抒情性,怨而不怒,哀而不伤,深得唐诗含蓄蕴藉之旨。七律格律严谨处,建议颈联“我”与“君”的对仗可再斟酌工巧,然瑕不掩瑜,诚为佳作。 --------------------Composer_a32f0f31eec74c9f8113cb46e80d7244-------------------- 感谢您的细致品评,您指出的颈联对仗问题确实精到。现遵嘱调整如下: 《淇水怨》 淇水汤汤绕旧城,氓携布帛计深盟。 桑之未落青如染,鸠语空啼血作声。 夙夜机丝凝素练,参商奁镜委秋蘅。 却看总角分携处,寒鹭萧萧立晚汀。 此版将颈联修正为“夙夜机丝凝素练,参商奁镜委秋蘅”,以“夙夜”对“参商”,时间对星宿;“机丝”对“奁镜”,劳作之具对妆扮之物;“凝素练”对“委秋蘅”,勤织成果对弃置荒芜。通过物象的工整对照,暗寓女子日夜辛劳与男子负心别恋的对比,隐去“我”、“君”而意蕴更显含蓄,或有合于唐音蕴藉之旨。 --------------------Reviewer_44b22563a8fe431d928121e3c2071a58-------------------- 此版精妙,已臻佳境。“夙夜”对“参商”,一为朝暮勤苦,一为星宿永隔,时空交错间蕴含无尽悲慨;“机丝”对“奁镜”,织机与妆奁,皆是女子日常之物,而一“凝”一“委”,动静之间,勤者自勤、弃者自弃,对比惊心。隐去人称代词后,全诗更趋含蓄,正合唐诗“不著一字,尽得风流”之旨。 此改后颈联既严守对仗,又深化悲怨,与《卫风·氓》“女也不爽,士贰其行”的控诉精神一脉相承,而韵致更见蕴藉。全诗结构严谨,意象圆融,实为以唐诗格律重现古诗意境的典范之作。由于我们在创建RoundRobinGroupChatManager时指定了迭代次数为6次,所以群聊会经历3轮创作-评论的循环。从输出来看,创作者和评论者之间的互动非常顺畅,他们的输出都很专业,而且可以看出创作者在每轮修改中都充分考虑了评论者的意见,最终的作品也越来越符合我们提出的要求。
2. 群聊Workflow的拓扑结果
我们依然使用如下定义的GenerateAndShowPngImageAsync方法来生成群聊Workflow的拓扑图。
publicstaticclassUtilities{publicstaticasyncTaskGenerateAndShowPngImageAsync(Workflowworkflow){stringmermaidCode=workflow.ToMermaidString();byte[]bytes=Encoding.UTF8.GetBytes(mermaidCode);stringbase64=Convert.ToBase64String(bytes);stringsafeBase64=base64.Replace("+","-").Replace("/","_").TrimEnd('=');stringurl=$"https://mermaid.ink/img/{safeBase64}";using(HttpClientclient=new()){byte[]imageBytes=awaitclient.GetByteArrayAsync(url);awaitFile.WriteAllBytesAsync("workflow.png",imageBytes);}Process.Start(newProcessStartInfo("workflow.png"){UseShellExecute=true});}}将前面创建的Workflow传入GenerateAndShowPngImageAsync方法后,我们可以得到如下的拓扑图:
3. GroupChatManager
群聊编排采用星型拓扑结构,以GroupChatHost这个Executor为中心,将各个Agent组织起来。GroupChatHost可以实现多种策略来选择下一个发言的Agent,例如轮询、基于提示的选择或基于对话上下文的自定义逻辑,使其成为一种灵活而强大的多Agent协作模式。GroupChatHost就像一个主持人,它负责协调Agent之间的对话,确保每个Agent都有机会发言,并且根据预设的规则来决定谁应该在何时发言。Agent发言结束后,话筒返回到主持人手中。GroupChatHost的核心是作为群聊管理器的GroupChatManager类,它定义了群聊的基本行为和规则。
publicabstractclassGroupChatManager{publicintIterationCount{get;}publicintMaximumIterationCount{get;set;}=40;protectedinternalabstractValueTask<AIAgent>SelectNextAgentAsync(IReadOnlyList<ChatMessage>history,CancellationTokencancellationToken=default);protectedinternalvirtualValueTask<IEnumerable<ChatMessage>>UpdateHistoryAsync(IReadOnlyList<ChatMessage>history,CancellationTokencancellationToken=default)=>new(history);protectedinternalvirtualValueTask<bool>ShouldTerminateAsync(IReadOnlyList<ChatMessage>history,CancellationTokencancellationToken=default)=>new(this.MaximumIterationCountisintmax&&this.IterationCount>=max);protectedinternalvirtualvoidReset();protectedvirtualValueTaskOnCheckpointingAsync(IWorkflowContextcontext,CancellationTokencancellationToken=default)=>default;protectedvirtualValueTaskOnCheckpointRestoredAsync(IWorkflowContextcontext,CancellationTokencancellationToken=default)=>default;}属性和方法成员说明如下:
- IterationCount:表示当前群聊的迭代次数,每当所有Agent都完成一次发言后,迭代次数加1。针对
GroupChatHost的每次调用都会增加一次迭代计数; - MaximumIterationCount:表示群聊的最大迭代次数,当迭代次数达到此值时,群聊将终止。默认值为40;
- SelectNextAgentAsync:用于选择下一个发言的Agent。它接收当前的对话历史作为参数,并返回一个Task,结果是被选中的Agent。具体的选择策略由派生类实现;
- UpdateHistoryAsync:
GroupChatHost在每轮提取“新消息”(用户输入或上一Agent输出)广播给所有Agent,调用此方法让用于过滤或修改广播内容; - ShouldTerminateAsync:用于判断群聊是否应该终止;
- Reset:用于重置群聊管理器的状态。派生类可以根据需要实现此方法,以便在群聊重新开始时清除任何状态信息。默认实现对将
IterationCount重置为0; - OnCheckpointingAsync:定义在进行基于Checkpoint的持久化操作时执行的回调;
- OnCheckpointRestoredAsync:定义基于指定的Checkpoint恢复调用时执行的回调。
如下这个基于轮询策略的RoundRobinGroupChatManager类是目前针对GroupChatManager唯一的实现类型,它利用重写的SelectNextAgentAsync方法来按照顺序选择下一个发言的Agent,并且在迭代次数超过指定的最大值后终止群聊。
publicclassRoundRobinGroupChatManager:GroupChatManager{publicRoundRobinGroupChatManager(IReadOnlyList<AIAgent>agents,Func<RoundRobinGroupChatManager,IEnumerable<ChatMessage>,CancellationToken,ValueTask<bool>>?shouldTerminateFunc=null);protectedinternaloverrideValueTask<AIAgent>SelectNextAgentAsync(IReadOnlyList<ChatMessage>history,CancellationTokencancellationToken=default);protectedinternaloverrideasyncValueTask<bool>ShouldTerminateAsync(IReadOnlyList<ChatMessage>history,CancellationTokencancellationToken=default);protectedinternaloverridevoidReset();protectedoverrideValueTaskOnCheckpointingAsync(IWorkflowContextcontext,CancellationTokencancellationToken=default);protectedoverrideasyncValueTaskOnCheckpointRestoredAsync(IWorkflowContextcontext,CancellationTokencancellationToken=default);}4. GroupChatHost
在整个群聊中扮演主持人角色的GroupChatHost类继承自ChatProtocolExecutor,并且实现了IResettableExecutor接口。
internalsealedclassGroupChatHost(stringid,AIAgent[]agents,Dictionary<AIAgent,ExecutorBinding>agentMap,Func<IReadOnlyList<AIAgent>,GroupChatManager>managerFactory):ChatProtocolExecutor(id,s_options),IResettableExecutor{privateList<ChatMessage>_history=newList<ChatMessage>();protectedoverrideasyncValueTaskTakeTurnAsync(List<ChatMessage>messages,IWorkflowContextcontext,bool?emitEvents,CancellationTokencancellationToken=default)}构造函数的四个参数说明如下:
- id:表示
GroupChatHost的唯一标识符; - agents:表示参与群聊的
Agent列表; - agentMap:表示
Agent与其对应的ExecutorBinding之间的映射关系; - managerFactory:表示用于创建
GroupChatManager实例的工厂方法。它接收一个Agent列表作为参数,并返回一个GroupChatManager实例。
GroupChatHost利用字段_history来存储群聊的对话历史。重写的TakeTurnAsync方法是群聊的核心逻辑,它负责协调Agent之间的对话,它采用如下的执行流程:
- 将传入的消息列表添加到对话历史中;
- 调用
GroupChatManager的ShouldTerminateAsync方法来判断群聊是否应该终止,如果是,则调用IWorkflowContext的YiledOutputAsync方法输出对话历史并终止执行,否则进入下面步骤; - 调用
GroupChatManager的UpdateHistoryAsync方法过滤对话历史,并将过滤后的历史广播给所有Agent; - 调用
GroupChatManager的SelectNextAgentAsync方法选择下一个发言的Agent; - 如果返回的
Agent不为null,则从agentMap参数提供映射表中获取对应的ExecutorBinding,并调用IWorkflowContext的SendMessageAsync方法发送一个TurnToken对象给该Agent,表示它可以开始发言。否则,调用IWorkflowContext的YiledOutputAsync方法输出对话历史并终止执行。
5. Workflow的编排
和前面介绍的HandoffWorkflowBuilder一样,用于构建无聊模式Workflow的GroupChatWorkflowBuilder类同样继承自OrchestrationBuilderBase<GroupChatWorkflowBuilder>。我们在调用其构造函数时需要指定用于创建GroupChatManager实例的委托方法。它提供了AddParticipants方法用于注册参与群聊的Agent。
publicsealedclassGroupChatWorkflowBuilder:OrchestrationBuilderBase<GroupChatWorkflowBuilder>{privatereadonlyFunc<IReadOnlyList<AIAgent>,GroupChatManager>_managerFactory;privatereadonlyHashSet<AIAgent>_participants;internalGroupChatWorkflowBuilder(Func<IReadOnlyList<AIAgent>,GroupChatManager>managerFactory);publicGroupChatWorkflowBuilderAddParticipants(paramsIEnumerable<AIAgent>agents);publicWorkflowBuild(){AIAgent[]agents=_participants.ToArray();AIAgentHostOptionsoptions=newAIAgentHostOptions{ReassignOtherAgentsAsUsers=true,ForwardIncomingMessages=false};Dictionary<AIAgent,ExecutorBinding>agentMap=agents.ToDictionary((AIAgenta)=>a,(AIAgenta)=>a.BindAsExecutor(options));Func<string,string,ValueTask<Executor>>factoryAsync=(stringid,stringsessionId)=>newValueTask<Executor>(newGroupChatHost(id,agents,agentMap,_managerFactory));ExecutorBindinghost=factoryAsync.BindExecutor("GroupChatHost");WorkflowBuilderbuilder=newWorkflowBuilder(host);ApplyMetadata(builder);foreach(ExecutorBindingvalueinagentMap.Values){builder.AddEdge(host,value).AddEdge(value,host);}ApplyOutputDesignations(builder,agentMap,"group chat",delegate{builder.WithOutputFrom(host);if(agentMap.Count>0){builder.WithIntermediateOutputFrom(newglobal::<>z__ReadOnlyArray<ExecutorBinding>(agentMap.Values.ToArray()));}});returnbuilder.Build();}}重写的Build方法采用如下的流程构建Workflow:
- 针对注册的Agent创建对应的
AIAgentHostExecutor,并转换成对应的AIAgentBinding。这些Agent和AIAgentBinding之间的映射关系会保存在一个字典中。由于创建AIAgentHostExecutor时会将配置选项ForwardIncomingMessages设置为false,因为Agent不需要将接收到的消息再次转发给GroupChatHost; - 构建以一个
Func<string, string, ValueTask<Executor>>对象根据指定的id、sessionId、GroupChatManager工厂和上述映射表创建GroupChatHost,并据此创建一个ExecutorConfig<ExecutorOptions>; - 以上面创建的
ExecutorConfig<ExecutorOptions>对象作为起始节点创建一个WorkflowBuilder对象; - 在起始节点和针对Agent创建的
AIAgentBinding之间添加双向边(两条DirectEdge); - 将所有节点都设置设置为输出节点,所以我们可以通过监听
WorflowOutputEvent事件来获取群聊的输出结果。AIAgentBinding节点通过添加OutputTag.Intermediate标签标记为中间输出节点,以区别与GroupChatHost的最终输出。 - 调用
Build方法构建Workflow。