[MAF工作流框架揭秘-10]基于Open-Telemetry的调用链跟踪

可观测行已经成为部署于生产环境的应用必需具备的能力。而提到可观测行,Open-Telemetry无疑是目前最流行的开源工具之一。基于Open-Telemetry的调用链跟踪和性能监控被应用到MAF的方方面面,在如下的两篇文章中,我详细介绍了基于ChatClient和Agent的调用链跟踪和性能监控:

  • OpenTelemetryChatClient-实现链路跟踪和性能监控
  • 基于Agent的调用链跟踪和性能监控

就调用链跟踪来说,起始Wokflow更有必要。原因很简单,Worflow往往具有复杂的网络拓扑结构,具体的执行路径由消息路由决定。每次调用都可以因为输入、当前外部状态以及LLM的输出的不同而走不同的路径。基于Open-Telemetry的调用链跟踪可以帮助我们清晰地看到每次调用的具体执行路径,进而分析和优化Workflow的设计是否合理,并在出错的时候快速定位问题所在的环节。

1. 跟踪Workflow的执行路径

在正是介绍Workflow针对Open-Telemetry的调用链跟踪的设计和实现原理之前,我们先来看一个简单的示例程序,来看看Workflow的调用链跟踪在实际中的表现。为了能够收集跟踪数据并可视化呈现调用的完整链条,我们本机执行如下的命令安装了Jaeger:

dockerrun-d--namejaeger\-p16686:16686\-p4317:4317\jaegertracing/all-in-one:latest

在如下的演示程序中,我们构建了一个简单的Workflow,包含4个节点,分别是FooBarBazQux,其中Foo节点的输出同时作为BarBaz节点的输入,而Qux节点则需要等待BarBaz节点都完成后才能执行。我们在Workflow构建的时候调用了WithOpenTelemetry方法来启用Open-Telemetry的调用链跟踪功能,并且提供了一个ActivitySource对象来收集跟踪数据。在每个节点的实现中,我们模拟了一个随机的执行时间来模拟操作的耗时。

usingMicrosoft.Agents.AI.Workflows;usingOpenTelemetry;usingOpenTelemetry.Metrics;usingOpenTelemetry.Resources;usingOpenTelemetry.Trace;usingSystem.Diagnostics;varserviceName="maf.workflow";varservceVersion="1.0.0";varrandom=newRandom();varfoo=CreateExecutor("Foo");varbar=CreateExecutor("Bar");varbaz=CreateExecutor("Baz");varqux=CreateExecutor("Qux");using(Sdk.CreateTracerProviderBuilder().SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName,serviceVersion:servceVersion)).AddSource(serviceName).AddConsoleExporter().AddOtlpExporter(options=>{options.Endpoint=newUri("http://localhost:4317");options.Protocol=OpenTelemetry.Exporter.OtlpExportProtocol.Grpc;}).Build()){while(true){awaitusingvarrun=awaitInProcessExecution.Default.RunStreamingAsync(BuildWorkflow(),"start");awaitrun.RunToCompletionAsync();}}ExecutorBindingCreateExecutor(stringid)=>newFunc<string,ValueTask<string>>(asyncinput=>{awaitTask.Delay(random.Next(100,500));returnid;}).BindAsExecutor(id);WorkflowBuildWorkflow(){returnnewWorkflowBuilder(foo).AddFanOutEdge(foo,[bar,baz]).AddFanInBarrierEdge([bar,baz],qux).WithOutputFrom(qux).WithOpenTelemetry(options=>options.EnableSensitiveData=true,newActivitySource(serviceName)).Build();}

FooBarBazQux四个节点构成的Workflow具有如下的结构:

Jaeger呈现的调用具有与之匹配的结构,我们可以清晰地看到每次调用的具体执行路径:

从外到内,调用链记录下如下描述执行操作的Span:

  • workflow.session: 代表一个可以涵盖多次调用的Workflow Session,一个Session绑定一个StreamingRun或者Run对象;
  • workflow_invoke: 代表在Session内针对Workflow的一次调用;
  • executor.process: 代表Workflow中一个Executor节点的一次执行;
  • message_send: 代表节点将消息发送给下一个节点;
  • edge_group.process: 代表针对始于某个节点的多条边的处理;

除了执行Workflow的调用链跟踪,Workflow的构建过程同样被记录在另一个调用链中。

![Alternative Text][1782049741740]

虽然这个调用只有一个单一的Span,但是这个Span利用一个名为workflow.definition的Tag记录了Workflow的定义信息,这个信息可能帮助我们确定构建的Workflow是否符合预期。在我们这个例子中,构建的Workflow被描述为如下这个JSON:

{"executors":{"Foo":{"executorType":{"assemblyName":"Microsoft.Agents.AI.Workflows, Version=1.10.0.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3","typeName":"Microsoft.Agents.AI.Workflows.FunctionExecutor`2[[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]]"},"executorId":"Foo"},"Bar":{"executorType":{"assemblyName":"Microsoft.Agents.AI.Workflows, Version=1.10.0.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3","typeName":"Microsoft.Agents.AI.Workflows.FunctionExecutor`2[[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]]"},"executorId":"Bar"},"Baz":{"executorType":{"assemblyName":"Microsoft.Agents.AI.Workflows, Version=1.10.0.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3","typeName":"Microsoft.Agents.AI.Workflows.FunctionExecutor`2[[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]]"},"executorId":"Baz"},"Qux":{"executorType":{"assemblyName":"Microsoft.Agents.AI.Workflows, Version=1.10.0.0, Culture=neutral, PublicKeyToken=f300afd708cefcd3","typeName":"Microsoft.Agents.AI.Workflows.FunctionExecutor`2[[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e],[System.String, System.Private.CoreLib, Version=10.0.0.0, Culture=neutral, PublicKeyToken=7cec85d7bea7798e]]"},"executorId":"Qux"}},"edges":{"Foo":[{"$type":1,"hasAssigner":false,"kind":1,"connection":{"sourceIds":["Foo"],"sinkIds":["Bar","Baz"]}}],"Bar":[{"$type":2,"kind":2,"connection":{"sourceIds":["Bar","Baz"],"sinkIds":["Qux"]}}],"Baz":[{"$type":2,"kind":2,"connection":{"sourceIds":["Bar","Baz"],"sinkIds":["Qux"]}}]},"requestPorts":[],"startExecutorId":"Foo","outputExecutorIds":{"Qux":[]}}

2. 将Agent和LLM调用的操作纳入调用链跟踪

调用链的完整性决定了其价值。如果Workflow具有一个或者多个基于AIAgent的节点,那么将Agent和LLM调用的操作纳入调用链跟踪是非常有必要的。我们可以按照开篇提到的两篇文章中介绍的方式启用Agent和LLM调用的操作的调用链跟踪。在如下这个演示程序中,我们创建的Workflow只有一个基于AIAgent的节点。在构建这个AIAgent管道时,我们调用了UseOpenTelemetry方法来启用基于Agent调用的调用链跟踪。内部的ChatClient管道同样调用了UseOpenTelemetry方法来启用基于LLM调用的调用链跟踪。

usingAzure;usingMicrosoft.Agents.AI;usingMicrosoft.Agents.AI.Workflows;usingMicrosoft.Extensions.AI;usingOpenAI;usingOpenTelemetry;usingOpenTelemetry.Metrics;usingOpenTelemetry.Resources;usingOpenTelemetry.Trace;usingSystem.ComponentModel;usingSystem.Diagnostics;dotenv.net.DotEnv.Load();varserviceName="maf.workflow";varservceVersion="1.0.0";varendpoint=Environment.GetEnvironmentVariable("OPENAI_URL")!;varmodel=Environment.GetEnvironmentVariable("MODEL")!;varapiKey=Environment.GetEnvironmentVariable("API_KEY")!;varagent=newOpenAIClient(credential:newAzureKeyCredential(apiKey),options:newOpenAIClientOptions{Endpoint=newUri(endpoint)}).GetChatClient(model:model).AsIChatClient().AsBuilder().UseOpenTelemetry(sourceName:serviceName).Build().AsAIAgent(name:"MyAgent",tools:[AIFunctionFactory.Create(GetWeather,nameof(GetWeather))]).AsBuilder().Use(inner=>newOpenTelemetryAgent(inner,serviceName,true)).Build();using(Sdk.CreateTracerProviderBuilder().SetResourceBuilder(ResourceBuilder.CreateDefault().AddService(serviceName,serviceVersion:servceVersion)).AddSource(serviceName).AddOtlpExporter(options=>{options.Endpoint=newUri("http://localhost:4317");options.Protocol=OpenTelemetry.Exporter.OtlpExportProtocol.Grpc;}).Build()){while(true){varworkflow=newWorkflowBuilder(agent).WithOutputFrom(agent).WithOpenTelemetry(options=>options.EnableSensitiveData=true,newActivitySource(serviceName)).Build();awaitusingvarrun=awaitInProcessExecution.Default.RunAsync(workflow,"根据目前苏州天气,提供着装建议。");foreach(varresultinrun.NewEvents.OfType<WorkflowOutputEvent>())Console.Write(result.Data);awaitTask.Delay(5000);}}[Description("获取指定城市天气信息")]staticstringGetWeather([Description("城市名称")]stringcity)=>$"{city}目前天气晴,气温25摄氏度。";

在Jaeger中,我们同样可以清晰地看到Agent、工具和LLM调用的操作被纳入了调用链跟踪:

3. WorkflowTelemetryContext

基于Open-Telemetry调用链跟踪在.NET是通过ActivityActivitySourceActivityListener这三个核心类型实现的,代表每个跟踪操作的Span对应一个Activity对象。我们前面利用Jaeger展示了Workflow构建和调用的执行路径,涉及的每个操作对应的Activity都是通过WorkflowTelemetryContext这个类来创建的。

internalsealedclassWorkflowTelemetryContext{publicActivity?StartWorkflowBuildActivity();publicActivity?StartWorkflowSessionActivity();publicActivity?StartWorkflowRunActivity();publicActivity?StartExecutorProcessActivity(stringexecutorId,string?executorType,stringmessageType,object?message);publicActivity?StartEdgeGroupProcessActivity();publicActivity?StartMessageSendActivity(stringsourceId,string?targetId,object?message);}

WorkflowTelemetryContext用于创建Activity的方法说明如下:

  • StartWorkflowBuildActivity: 代表Workflow的构建操作,名称为workflow.build
  • StartWorkflowSessionActivity: 代表Workflow Session的开始,名称为workflow.session
  • StartWorkflowRunActivity: 代表Workflow Run的开始,名称为workflow.invoke
  • StartExecutorProcessActivity: 代表Workflow中一个Executor节点的执行操作,名称为executor.process
  • StartEdgeGroupProcessActivity: 代表针对始于某个节点的多条边的处理操作,名称为edge_group.process
  • StartMessageSendActivity: 代表节点将消息发送给下一个节点的操作,名称为message.send

对于Tracing这种对性能影响极大的高频操作,一般都需要一个显式的开关。对于Worflow来说,只有WorkflowTelemetryContextIsEnabled属性为true,我们才会创建对应的Activity来记录调用链跟踪数据。静态属性Disabled提供了一个默认了一个IsEnabledfalseWorkflowTelemetryContext实例。只有在启用的情况下,WorkflowTelemetryContext才会创建一个真正的ActivitySource来创建Activity对象。如果没有显式指定ActivitySource,那么WorkflowTelemetryContext会默认创建一个命名为Microsoft.Agents.AI.WorkflowsActivitySource

internalsealedclassWorkflowTelemetryContext{publicboolIsEnabled{get;}publicstaticWorkflowTelemetryContextDisabled{get;}=newWorkflowTelemetryContext();publicWorkflowTelemetryOptionsOptions{get;}publicActivitySourceActivitySource{get;}publicWorkflowTelemetryContext(WorkflowTelemetryOptionsoptions,ActivitySource?activitySource=null);}

4. 跟踪实现

WorkflowBuilder定义了如下这个名为_telemetryContext的字段来持有WorkflowTelemetryContext对象,其默认值来源于WorkflowTelemetryContext的静态属性Disabled。它同时提供了一个SetTelemetryContext方法来设置这个字段的值。WithOpenTelemetry扩展方法会创建一个新的WorkflowTelemetryContext实例,并通过调用SetTelemetryContext方法将其设置到WorkflowBuilder中。

publicclassWorkflowBuilder{privateWorkflowTelemetryContext_telemetryContext=WorkflowTelemetryContext.Disabled;internalvoidSetTelemetryContext(WorkflowTelemetryContextcontext)=>_telemetryContext=context;}publicstaticWorkflowBuilderWithOpenTelemetry(thisWorkflowBuilderbuilder,Action<WorkflowTelemetryOptions>?configure=null,ActivitySource?activitySource=null){WorkflowTelemetryOptionsworkflowTelemetryOptions=newWorkflowTelemetryOptions();configure?.Invoke(workflowTelemetryOptions);WorkflowTelemetryContexttelemetryContext=newWorkflowTelemetryContext(workflowTelemetryOptions,activitySource);builder.SetTelemetryContext(telemetryContext);returnbuilder;}

WithOpenTelemetry扩展方法除了提供一个可选的ActivitySource参数用来指定构建WorkflowTelemetryContext实例时使用的ActivitySource之外,还提供了一个可选的configure参数用来配置WorkflowTelemetryOptions对象。

publicsealedclassWorkflowTelemetryOptions{publicboolEnableSensitiveData{get;set;}publicboolDisableWorkflowBuild{get;set;}publicboolDisableWorkflowRun{get;set;}publicboolDisableExecutorProcess{get;set;}publicboolDisableEdgeGroupProcess{get;set;}publicboolDisableMessageSend{get;set;}}

WorkflowTelemetryOptions提供的配置选项说明如下:

  • EnableSensitiveData: 是否在调用链跟踪中包含敏感数据,比如消息内容等,默认值为false;
  • DisableWorkflowBuild: 是否禁用Workflow构建操作的调用链跟踪,默认值为false;
  • DisableWorkflowRun: 是否禁用Workflow调用操作的调用链跟踪,默认值为false;
  • DisableExecutorProcess: 是否禁用Workflow中Executor节点执行操作的调用链跟踪,默认值为false;
  • DisableEdgeGroupProcess: 是否禁用针对始于某个节点的多条边的处理操作的调用链跟踪,默认值为false;
  • DisableMessageSend: 是否禁用节点将消息发送给下一个节点的操作的调用链跟踪,默认值为false。

当我们调用WorkflowBuilderBuild方法创建Workflow对象时,内部会调用WorkflowTelemetryContextStartWorkflowBuildActivity方法来创建一个代表Workflow构建操作的Activity对象,并设置以Workflow定义为核心的Tag。于此同时,WorkflowBuilder会将持有的WorkflowTelemetryContext对象传递给Workflow的构造函数,并由Workflow来持有这个对象。Workflow在执行过程中会根据需要调用WorkflowTelemetryContext的方法来创建Activity对象,从而实现基于Open-Telemetry的调用链跟踪。

publicclassWorkflow{internalWorkflowTelemetryContextTelemetryContext{get;}}