azure-functions-durable-extension源码学习
- 1: client
- 1.1: client initial
- 1.2: client start new instance
- 1.2.1: 调用堆栈
- 1.2.2: FunctionsDurableTaskClient
- 2: webjobs
- 2.1: common
- 2.1.1: IOrchestrationService
- 2.1.2: DurabilityProvider
- 2.2: azure storage
- 3: worker
- 3.1: worker initial
- 3.1.1: 过时的初始化
- 3.1.2: 调用堆栈
- 3.1.3: GrpcWorkerClient
- 3.1.4: GrpcWorker
- 3.1.5: FunctionRpcClient
- 3.2: client run orchestrator
- 3.2.1: 调用堆栈
- 3.3: client run activity
- 3.3.1: 调用堆栈
- 3.4: worker register orchestrator
- 3.4.1: 调用堆栈
- 4: trigger
1 - client
1.1 - client initial
1.2 - client start new instance
1.2.1 - 调用堆栈
调用堆栈概况
MyDurableFunction2.dll!Company.Function.HelloOrchestration.HttpStart(Microsoft.Azure.Functions.Worker.Http.HttpRequestData req, Microsoft.DurableTask.Client.DurableTaskClient client, Microsoft.Azure.Functions.Worker.FunctionContext executionContext) Line 49 (c:/Users/sky/work/code/durabletask-fork2/MyDurableFunction2/HelloOrchestration.cs:49)
MyDurableFunction2.dll!MyDurableFunction2.DirectFunctionExecutor.ExecuteAsync(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 40 (GeneratedFunctionExecutor.g.cs:40)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.Pipeline.FunctionExecutionMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 20 (FunctionExecutionMiddleware.cs:20)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseFunctionExecutionMiddleware.AnonymousMethod__1_2(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 57 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:57)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.OutputBindings.OutputBindingsMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 13 (OutputBindingsMiddleware.cs:13)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseOutputBindingsMiddleware.AnonymousMethod__3(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 84 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:84)
Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.dll!Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 48 (FunctionsHttpProxyingMiddleware.cs:48)
[Resuming Async Method] (Unknown Source:0)
System.Private.CoreLib.dll!System.Runtime.CompilerServices.AsyncTaskMethodBuilder<System.Threading.Tasks.VoidTaskResult>.AsyncStateMachineBox<Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.<Invoke>d__4>.ExecutionContextCallback(object s) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state) (Unknown Source:0)
System.Private.CoreLib.dll!System.Runtime.CompilerServices.AsyncTaskMethodBuilder<System.Threading.Tasks.VoidTaskResult>.AsyncStateMachineBox<Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.<Invoke>d__4>.MoveNext(System.Threading.Thread threadPoolThread) (Unknown Source:0)
System.Private.CoreLib.dll!System.Runtime.CompilerServices.AsyncTaskMethodBuilder<System.Threading.Tasks.VoidTaskResult>.AsyncStateMachineBox<Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.<Invoke>d__4>.MoveNext() (Unknown Source:0)
System.Private.CoreLib.dll!System.Runtime.CompilerServices.TaskAwaiter.OutputWaitEtwEvents.AnonymousMethod__12_0(System.Action innerContinuation, System.Threading.Tasks.Task innerTask) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Action action, bool allowInlining) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.Tasks.Task.RunContinuations(object continuationObject) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.Tasks.Task<System.__Canon>.TrySetResult(System.__Canon result) (Unknown Source:0)
System.Private.CoreLib.dll!System.Runtime.CompilerServices.AsyncTaskMethodBuilder<System.__Canon>.SetExistingTaskResult(System.Threading.Tasks.Task<System.__Canon> task, System.__Canon result) (Unknown Source:0)
[Completed] Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.dll!Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.DefaultHttpCoordinator.SetFunctionContextAsync(string invocationId, Microsoft.Azure.Functions.Worker.FunctionContext context) Line 37 (DefaultHttpCoordinator.cs:37)
System.Private.CoreLib.dll!System.Runtime.CompilerServices.AsyncTaskMethodBuilder<System.__Canon>.AsyncStateMachineBox<Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.DefaultHttpCoordinator.<SetFunctionContextAsync>d__3>.ExecutionContextCallback(object s) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state) (Unknown Source:0)
Azure function client sdk code
GrpcWorkerClientFactory
Azure/azure-functions-dotnet-worker 项目下的 GrpcWorkerClientFactory
public async Task StartAsync(CancellationToken token)
{
if (_running)
{
throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
}
_running = true;
var eventStream = _grpcClient.EventStream(cancellationToken: token);
await SendStartStreamMessageAsync(eventStream.RequestStream);
_ = StartWriterAsync(eventStream.RequestStream);
_ = StartReaderAsync(eventStream.ResponseStream);
}
其中 StartReaderAsync 的实现是:
private async Task StartReaderAsync(IAsyncStreamReader<StreamingMessage> responseStream)
{
while (await responseStream.MoveNext())
{
await _processor!.ProcessMessageAsync(responseStream.Current);
}
}
GrpcWorker
Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
{
// Dispatch and return.
Task.Run(() => ProcessRequestCoreAsync(message));
return Task.CompletedTask;
}
后面就是异步操作了:
private async Task ProcessRequestCoreAsync(StreamingMessage request)
{
StreamingMessage responseMessage = new StreamingMessage
{
RequestId = request.RequestId
};
switch (request.ContentCase)
{
case MsgType.InvocationRequest:
responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
break;
case MsgType.WorkerInitRequest:
responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
break;
case MsgType.WorkerStatusRequest:
responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
break;
case MsgType.FunctionsMetadataRequest:
responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
break;
case MsgType.WorkerTerminate:
WorkerTerminateRequestHandler(request.WorkerTerminate);
break;
case MsgType.FunctionLoadRequest:
responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
break;
case MsgType.FunctionEnvironmentReloadRequest:
responseMessage.FunctionEnvironmentReloadResponse = EnvironmentReloadRequestHandler(_workerOptions);
break;
case MsgType.InvocationCancel:
InvocationCancelRequestHandler(request.InvocationCancel);
break;
default:
// TODO: Trace failure here.
return;
}
await _workerClient!.SendMessageAsync(responseMessage);
}
InvocationHandler
public async Task<InvocationResponse> InvokeAsync(InvocationRequest request)
{
using CancellationTokenSource cancellationTokenSource = new();
FunctionContext? context = null;
InvocationResponse response = new()
{
InvocationId = request.InvocationId,
Result = new StatusResult()
};
if (!_inflightInvocations.TryAdd(request.InvocationId, cancellationTokenSource))
{
var exception = new InvalidOperationException("Unable to track CancellationTokenSource");
response.Result.Status = StatusResult.Types.Status.Failure;
response.Result.Exception = exception.ToRpcException();
return response;
}
try
{
var invocation = new GrpcFunctionInvocation(request);
IInvocationFeatures invocationFeatures = _invocationFeaturesFactory.Create();
invocationFeatures.Set<FunctionInvocation>(invocation);
invocationFeatures.Set<IExecutionRetryFeature>(invocation);
context = _application.CreateContext(invocationFeatures, cancellationTokenSource.Token);
invocationFeatures.Set<IFunctionBindingsFeature>(new GrpcFunctionBindingsFeature(context, request, _outputBindingsInfoProvider));
if (_inputConversionFeatureProvider.TryCreate(typeof(DefaultInputConversionFeature), out var conversion))
{
invocationFeatures.Set<IInputConversionFeature>(conversion!);
}
await _application.InvokeFunctionAsync(context);
var serializer = _workerOptions.Serializer!;
var functionBindings = context.GetBindings();
foreach (var binding in functionBindings.OutputBindingData)
{
var parameterBinding = new ParameterBinding
{
Name = binding.Key
};
if (binding.Value is not null)
{
parameterBinding.Data = await binding.Value.ToRpcAsync(serializer);
}
response.OutputData.Add(parameterBinding);
}
if (functionBindings.InvocationResult is not null)
{
TypedData? returnVal = await functionBindings.InvocationResult.ToRpcAsync(serializer);
response.ReturnValue = returnVal;
}
response.Result.Status = StatusResult.Types.Status.Success;
}
catch (Exception ex)
{
response.Result.Exception = _workerOptions.EnableUserCodeException ? ex.ToUserRpcException() : ex.ToRpcException();
response.Result.Status = StatusResult.Types.Status.Failure;
if (ex.InnerException is TaskCanceledException or OperationCanceledException)
{
response.Result.Status = StatusResult.Types.Status.Cancelled;
}
}
finally
{
_inflightInvocations.TryRemove(request.InvocationId, out var cts);
if (context is IAsyncDisposable asyncContext)
{
await asyncContext.DisposeAsync();
}
(context as IDisposable)?.Dispose();
}
return response;
}
FunctionsApplication
public async Task InvokeFunctionAsync(FunctionContext context)
{
var scope = new FunctionInvocationScope(context.FunctionDefinition.Name, context.InvocationId);
using var logScope = _logger.BeginScope(scope);
using Activity? invokeActivity = _functionActivitySourceFactory.StartInvoke(context);
try
{
await _functionExecutionDelegate(context);
}
catch (Exception ex)
{
invokeActivity?.SetStatus(ActivityStatusCode.Error, ex.Message);
Log.InvocationError(_logger, context.FunctionDefinition.Name, context.InvocationId, ex);
throw;
}
}
FunctionsHttpProxyingMiddleware
Azure/azure-functions-dotnet-worker 项目下的 OutputBindingsMiddleware
public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
{
// Only use the coordinator for HttpTriggers
if (!_isHttpTrigger.GetOrAdd(context.FunctionId, static (_, c) => IsHttpTriggerFunction(c), context))
{
await next(context);
return;
}
var invocationId = context.InvocationId;
// this call will block until the ASP.NET middleware pipeline has signaled that it's ready to run the function
var httpContext = await _coordinator.SetFunctionContextAsync(invocationId, context);
AddHttpContextToFunctionContext(context, httpContext);
// Register additional context features
context.Features.Set<IFromBodyConversionFeature>(FromBodyConverstionFeature.Instance);
await next(context);
var invocationResult = context.GetInvocationResult();
......
}
OutputBindingsMiddleware
Azure/azure-functions-dotnet-worker 项目下的 OutputBindingsMiddleware
public static async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
{
await next(context);
AddOutputBindings(context);
}
调用 next(context) 方法:
Work.Sdk.Generator.GeneratedFunctionExecutor
被 GeneratedFunctionExecutor.g.cs 调用:
public async ValueTask ExecuteAsync(FunctionContext context)
{
var inputBindingFeature = context.Features.Get<IFunctionInputBindingFeature>();
var inputBindingResult = await inputBindingFeature.BindFunctionInputAsync(context);
var inputArguments = inputBindingResult.Values;
if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.RunOrchestrator", StringComparison.Ordinal))
{
context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.RunOrchestrator((global::Microsoft.DurableTask.TaskOrchestrationContext)inputArguments[0]);
}
else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.SayHello", StringComparison.Ordinal))
{
context.GetInvocationResult().Value = global::Company.Function.HelloOrchestration.SayHello((string)inputArguments[0], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[1]);
}
else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.HttpStart", StringComparison.Ordinal))
{
context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.HttpStart((global::Microsoft.Azure.Functions.Worker.Http.HttpRequestData)inputArguments[0], (global::Microsoft.DurableTask.Client.DurableTaskClient)inputArguments[1], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[2]);
}
}
通过检查 context.FunctionDefinition.EntryPoint 的值,如果为以下值时,则分别调用对应的 function:
context.FunctionDefinition.EntryPoint 的值 |
function | funciton source code |
|---|---|---|
| “Company.Function.HelloOrchestration.SayHello” | HelloOrchestration.RunOrchestrator() | [Function(nameof(HelloOrchestration))] |
| “Company.Function.HelloOrchestration.RunOrchestrator” | HelloOrchestration.SayHello() | [Function(nameof(SayHello))] |
| “Company.Function.HelloOrchestration.HttpStart” | HelloOrchestration.HttpStart() | [Function(“HelloOrchestration_HttpStart”)] |
这里将调用 Company.Function.HelloOrchestration.HttpStart() 方法:
Customer Code
HelloOrchestration
以 quickstart HelloOrchestration.cs 为例:
public static async Task<HttpResponseData> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(HelloOrchestration));
......
}
为了支持 versioning,这里的 TaskName 需要额外传递 version 参数,因此代码更新为:
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
new TaskName(nameof(HelloOrchestration), "1.2.0"));
Client 的调用者可以在这里选择传递是否传递新实例的版本。
Azure functions durable extension的代码
FunctionsDurableTaskClient
FunctionsDurableTaskClient 的 ScheduleNewOrchestrationInstanceAsync() 实现:
private readonly DurableTaskClient inner;
public override Task<string> ScheduleNewOrchestrationInstanceAsync(
TaskName orchestratorName,
object? input = null,
StartOrchestrationOptions? options = null,
CancellationToken cancellation = default)
{
return this.inner.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, options, cancellation);
}
调用了内部的 inner 的 ScheduleNewOrchestrationInstanceAsync()方法, inner 定义的类型是 DurableTaskClient,实际的代码实现是类 GrpcDurableTaskClient
GrpcDurableTaskClient
GrpcDurableTaskClient 的 ScheduleNewOrchestrationInstanceAsync() 方法实现:
readonly TaskHubSidecarServiceClient sidecarClient;
public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
TaskName orchestratorName,
object? input = null,
StartOrchestrationOptions? options = null,
CancellationToken cancellation = default)
{
Check.NotEntity(this.options.EnableEntitySupport, options?.InstanceId);
var request = new P.CreateInstanceRequest
{
// 这里的 orchestratorName.Name 的值是 "HelloOrchestration"
Name = orchestratorName.Name,
// 这里的 version 是一个 null,现在改为传递 orchestratorName.Version
Version = orchestratorName.Version,
InstanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"),
// input 为 null
Input = this.DataConverter.Serialize(input),
};
DateTimeOffset? startAt = options?.StartAt;
this.logger.SchedulingOrchestration(
request.InstanceId,
orchestratorName,
sizeInBytes: request.Input != null ? Encoding.UTF8.GetByteCount(request.Input) : 0,
startAt.GetValueOrDefault(DateTimeOffset.UtcNow));
if (startAt.HasValue)
{
// Convert timestamps to UTC if not already UTC
request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime());
}
P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync(
request, cancellationToken: cancellation);
return result.InstanceId;
}
主要要一路传递 version 字段。
最后调用 sidecarClient 的 StartInstanceAsync() 方法发出 grpc 请求并得到返回的结果。
Durabletask-dotnet的代码
TaskHubSidecarServiceClient
Microsoft.DurableTask.Protobuf.TaskHubSidecarService.TaskHubSidecarServiceClient
sidecarClient 定义为 TaskHubSidecarServiceClient 类型,这是根据 protobuf 文件生成的 grpc client 代码:
public virtual grpc::AsyncUnaryCall<global::Microsoft.DurableTask.Protobuf.CreateInstanceResponse> StartInstanceAsync(global::Microsoft.DurableTask.Protobuf.CreateInstanceRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return StartInstanceAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
protobuf 定义
在 Durabletask-dotnet 项目中的eng\proto\protos\orchestrator_service.proto 文件中定义了 StartInstance() 方法:
service TaskHubSidecarService {
......
// Starts a new orchestration instance.
rpc StartInstance(CreateInstanceRequest) returns (CreateInstanceResponse);
......
}
message CreateInstanceRequest {
string instanceId = 1;
string name = 2;
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
google.protobuf.Timestamp scheduledStartTimestamp = 5;
OrchestrationIdReusePolicy orchestrationIdReusePolicy = 6;
}
message CreateInstanceResponse {
string instanceId = 1;
string version = 2;
}
之后就是 grpc 服务器端的代码实现了。
azure-functions-durable-extension
LocalGrpcListener
在 azure-functions-durable-extension 仓库下的 src\WebJobs.Extensions.DurableTask\LocalGrpcListener.cs 中。
public async override Task<P.CreateInstanceResponse> StartInstance(P.CreateInstanceRequest request, ServerCallContext context)
{
try
{
string instanceId = await this.GetClient(context).StartNewAsync(
request.Name, request.InstanceId, request.Version, Raw(request.Input));
return new P.CreateInstanceResponse
{
InstanceId = instanceId,
};
}
catch (InvalidOperationException)
{
throw new RpcException(new Status(StatusCode.AlreadyExists, $"An Orchestration instance with the ID {request.InstanceId} already exists."));
}
}
this.GetClient(context) 返回的是 IDurableClient 类型,实际实现为 DurableClient 。
DurableClient
在 azure-functions-durable-extension 仓库下的文件src\WebJobs.Extensions.DurableTask\ContextImplementations\DurableClient.cs 中。
async Task<string> IDurableOrchestrationClient.StartNewAsync<T>(string orchestratorFunctionName, string instanceId, string instanceVersion, T input)
{
......
OrchestrationStatus[] dedupeStatuses = this.GetStatusesNotToOverride();
Task<OrchestrationInstance> createTask = this.client.CreateOrchestrationInstanceAsync(
orchestratorFunctionName, instanceVersion, instanceId, input, null, dedupeStatuses);
this.traceHelper.FunctionScheduled(
this.TaskHubName,
orchestratorFunctionName,
instanceId,
reason: "NewInstance",
functionType: FunctionType.Orchestrator,
isReplay: false);
OrchestrationInstance instance = await createTask;
return instance.InstanceId;
}
versioning: StartNewAsync() 需要增加一个
string instanceVersion参数来传递 version 参数。
这里的 client 是 TaskHubClient 类型
durabletask
TaskHubClient
在 durabletask 仓库下的文件 src\DurableTask.Core\TaskHubClient.cs 中
public Task<OrchestrationInstance> CreateOrchestrationInstanceAsync(string name, string version, object input)
{
return this.InternalCreateOrchestrationInstanceWithRaisedEventAsync(
name,
version,
orchestrationInstanceId: null,
input,
orchestrationTags: null,
dedupeStatuses: null,
eventName: null,
eventData: null);
}
InternalCreateOrchestrationInstanceWithRaisedEventAsync() 方法的实现:
async Task<OrchestrationInstance> InternalCreateOrchestrationInstanceWithRaisedEventAsync(
string orchestrationName,
string orchestrationVersion,
string orchestrationInstanceId,
object orchestrationInput,
IDictionary<string, string> orchestrationTags,
OrchestrationStatus[] dedupeStatuses,
string eventName,
object eventData,
DateTime? startAt = null)
{
TraceContextBase requestTraceContext = null;
// correlation
CorrelationTraceClient.Propagate(()=> { requestTraceContext = CreateOrExtractRequestTraceContext(eventName); });
if (string.IsNullOrWhiteSpace(orchestrationInstanceId))
{
orchestrationInstanceId = Guid.NewGuid().ToString("N");
}
var orchestrationInstance = new OrchestrationInstance
{
InstanceId = orchestrationInstanceId,
ExecutionId = Guid.NewGuid().ToString("N"),
};
string serializedOrchestrationData = this.defaultConverter.SerializeInternal(orchestrationInput);
var startedEvent = new ExecutionStartedEvent(-1, serializedOrchestrationData)
{
Tags = orchestrationTags,
Name = orchestrationName,
Version = orchestrationVersion,
OrchestrationInstance = orchestrationInstance,
ScheduledStartTime = startAt
};
var startMessage = new TaskMessage
{
OrchestrationInstance = orchestrationInstance,
Event = startedEvent
};
this.logHelper.SchedulingOrchestration(startedEvent);
using Activity newActivity = TraceHelper.StartActivityForNewOrchestration(startedEvent);
CorrelationTraceClient.Propagate(() => CreateAndTrackDependencyTelemetry(requestTraceContext));
try
{
// Raised events and create orchestration calls use different methods so get handled separately
await this.ServiceClient.CreateTaskOrchestrationAsync(startMessage, dedupeStatuses);
}
catch (Exception e)
{
TraceHelper.AddErrorDetailsToSpan(newActivity, e);
throw;
}
......
return orchestrationInstance;
}
ExecutionStartedEvent 的 version 字段被设置为前面传递过来的 version。
最后调用到
// Raised events and create orchestration calls use different methods so get handled separately
await this.ServiceClient.CreateTaskOrchestrationAsync(startMessage, dedupeStatuses);
this.ServiceClient 的类型是 IOrchestrationServiceClient,实际实现是 AzureStorageDurabilityProvider
azure-functions-durable-extension
AzureStorageDurabilityProvider
在 azure-functions-durable-extension 仓库的 src\WebJobs.Extensions.DurableTask\AzureStorageDurabilityProvider.cs 文件中。
internal class AzureStorageDurabilityProvider : DurabilityProvider {}
AzureStorageDurabilityProvider 继承自 DurabilityProvider,src\WebJobs.Extensions.DurableTask\DurabilityProvider.cs
DurabilityProvider 的 CreateTaskOrchestrationAsync() 方法的实现是:
public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
{
return this.GetOrchestrationServiceClient().CreateTaskOrchestrationAsync(creationMessage);
}
this.GetOrchestrationServiceClient() 返回 IOrchestrationServiceClient 类型,实际实现是 AzureStorageOrchestrationService 。
AzureStorageOrchestrationService
在 durabletask 仓库的 src\DurableTask.AzureStorage\AzureStorageOrchestrationService.cs 文件中。
public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
{
return this.CreateTaskOrchestrationAsync(creationMessage, null);
}
public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
{
ExecutionStartedEvent executionStartedEvent = creationMessage.Event as ExecutionStartedEvent;
if (executionStartedEvent == null)
{
throw new ArgumentException($"Only {nameof(EventType.ExecutionStarted)} messages are supported.", nameof(creationMessage));
}
// Client operations will auto-create the task hub if it doesn't already exist.
await this.EnsureTaskHubAsync();
InstanceStatus existingInstance = await this.trackingStore.FetchInstanceStatusAsync(
creationMessage.OrchestrationInstance.InstanceId);
if (existingInstance?.State != null && dedupeStatuses != null && dedupeStatuses.Contains(existingInstance.State.OrchestrationStatus))
{
// An instance in this state already exists.
if (this.settings.ThrowExceptionOnInvalidDedupeStatus)
{
throw new OrchestrationAlreadyExistsException($"An Orchestration instance with the status {existingInstance.State.OrchestrationStatus} already exists.");
}
return;
}
if (executionStartedEvent.Generation == null)
{
if (existingInstance != null)
{
executionStartedEvent.Generation = existingInstance.State.Generation + 1;
}
else
{
executionStartedEvent.Generation = 0;
}
}
ControlQueue controlQueue = await this.GetControlQueueAsync(creationMessage.OrchestrationInstance.InstanceId);
MessageData startMessage = await this.SendTaskOrchestrationMessageInternalAsync(
EmptySourceInstance,
controlQueue,
creationMessage);
string inputPayloadOverride = null;
if (startMessage.CompressedBlobName != null)
{
// The input of the orchestration is changed to be a URL to a compressed blob, which
// is the input queue message. When fetching the orchestration instance status, that
// blob will be downloaded, decompressed, and the ExecutionStartedEvent.Input value
// will be returned as the input value.
inputPayloadOverride = this.messageManager.GetBlobUrl(startMessage.CompressedBlobName);
}
await this.trackingStore.SetNewExecutionAsync(
executionStartedEvent,
existingInstance?.ETag,
inputPayloadOverride);
}
疑问
TaskHubClient 的 InternalCreateOrchestrationInstanceWithRaisedEventAsync() 方法,除了通过 CreateTaskOrchestrationAsync() 方法发送 ExecutionStartedEvent 外,对于有 eventData 的情况,还需要通过 SendTaskOrchestrationMessageAsync() 方法发送 eventRaisedEvent :
if (eventData != null)
{
string serializedEventData = this.defaultConverter.SerializeInternal(eventData);
var eventRaisedEvent = new EventRaisedEvent(-1, serializedEventData) { Name = eventName };
this.logHelper.RaisingEvent(orchestrationInstance, eventRaisedEvent);
var eventMessage = new TaskMessage
{
OrchestrationInstance = new OrchestrationInstance
{
InstanceId = orchestrationInstanceId,
// to ensure that the event gets raised on the running
// orchestration instance, null the execution id
// so that it will find out which execution
// it should use for processing
ExecutionId = null
},
Event = eventRaisedEvent,
};
await this.ServiceClient.SendTaskOrchestrationMessageAsync(eventMessage);
}
ExecutionStartedEvent 和 EventRaisedEvent 都要被包装为 TaskMessage。
1.2.2 - FunctionsDurableTaskClient
client 调用的开始:
public static async Task<HttpResponseData> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
FunctionContext executionContext)
{
......
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(HelloOrchestration));
......
}
client 类型是 DurableTaskClient,实际实现的类是 FunctionsDurableTaskClient。代码在 azure-functions-durable-extension 仓库的 src\Worker.Extensions.DurableTask\FunctionsDurableTaskClient.cs 文件中
类定义和构造函数
internal sealed class FunctionsDurableTaskClient : DurableTaskClient
{
private readonly DurableTaskClient inner;
public FunctionsDurableTaskClient(DurableTaskClient inner, string? queryString)
: base(inner.Name)
{
this.inner = inner;
this.QueryString = queryString;
}
Debug 看到这个 inner 的实现是 Microsoft.DurableTask.Client.Grpc.GrpcDurableTaskClient
ScheduleNewOrchestrationInstanceAsync()
public override Task<string> ScheduleNewOrchestrationInstanceAsync(
TaskName orchestratorName,
object? input = null,
StartOrchestrationOptions? options = null,
CancellationToken cancellation = default)
{
return this.inner.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, options, cancellation);
}
TaskName
TaskName 的代码在 durabletask-dotnet 仓库下的src\Abstractions\TaskName.cs
public readonly struct TaskName : IEquatable<TaskName>
2 - webjobs
2.1 - common
2.1.1 - IOrchestrationService
Azure-functions-durable-extension 仓库下的 src\WebJobs.Extensions.DurableTask\DurabilityProvider.cs
The backend storage provider that provides the actual durability of Durable Functions. This is functionally a superset of IOrchestrationService and IOrchestrationServiceClient. If the storage provider does not any of the Durable Functions specific operations, they can use this class directly with the expectation that only those interfaces will be implemented. All of the Durable Functions specific methods/operations are virtual and can be overwritten by creating a subclass.
提供持久函数实际持久性的后端存储提供商。在功能上,它是 IOrchestrationService 和 IOrchestrationServiceClient 的超集。如果存储提供商不进行任何特定的 Durable Functions 操作,他们可以直接使用该类,并期望只实现这些接口。所有 Durable Functions 特定的方法/操作都是虚拟的,可以通过创建子类来覆盖。
类定义
public class DurabilityProvider :
IOrchestrationService,
IOrchestrationServiceClient,
IOrchestrationServiceQueryClient,
IOrchestrationServicePurgeClient,
IEntityOrchestrationService
{}
2.1.2 - DurabilityProvider
Azure-functions-durable-extension 仓库下的 src\WebJobs.Extensions.DurableTask\DurabilityProvider.cs
The backend storage provider that provides the actual durability of Durable Functions. This is functionally a superset of IOrchestrationService and IOrchestrationServiceClient. If the storage provider does not any of the Durable Functions specific operations, they can use this class directly with the expectation that only those interfaces will be implemented. All of the Durable Functions specific methods/operations are virtual and can be overwritten by creating a subclass.
提供持久函数实际持久性的后端存储提供商。在功能上,它是 IOrchestrationService 和 IOrchestrationServiceClient 的超集。如果存储提供商不进行任何特定的 Durable Functions 操作,他们可以直接使用该类,并期望只实现这些接口。所有 Durable Functions 特定的方法/操作都是虚拟的,可以通过创建子类来覆盖。
类定义
这个类实现了一堆接口:
public class DurabilityProvider :
IOrchestrationService,
IOrchestrationServiceClient,
IOrchestrationServiceQueryClient,
IOrchestrationServicePurgeClient,
IEntityOrchestrationService
{}
构造函数
/// <summary>
/// Creates the default <see cref="DurabilityProvider"/>.
/// </summary>
/// <param name="storageProviderName">The name of the storage backend providing the durability.</param>
/// <param name="service">The internal <see cref="IOrchestrationService"/> that provides functionality
/// for this classes implementions of <see cref="IOrchestrationService"/>.</param>
/// <param name="serviceClient">The internal <see cref="IOrchestrationServiceClient"/> that provides functionality
/// for this classes implementions of <see cref="IOrchestrationServiceClient"/>.</param>
/// <param name="connectionName">The name of the app setting that stores connection details for the storage provider.</param>
public DurabilityProvider(string storageProviderName, IOrchestrationService service, IOrchestrationServiceClient serviceClient, string connectionName)
{
this.name = storageProviderName ?? throw new ArgumentNullException(nameof(storageProviderName));
this.innerService = service ?? throw new ArgumentNullException(nameof(service));
this.innerServiceClient = serviceClient ?? throw new ArgumentNullException(nameof(serviceClient));
this.entityOrchestrationService = service as IEntityOrchestrationService;
this.connectionName = connectionName ?? throw new ArgumentNullException(connectionName);
}
这里的
this.entityOrchestrationService = service as IEntityOrchestrationService;
而这里的 servcie 是 IOrchestrationService 类型:
IOrchestrationService service
LockNextOrchestrationWorkItemAsync()
这个方法的实现委托给 this.entityOrchestrationService 了:
/// <inheritdoc/>
Task<TaskOrchestrationWorkItem> IEntityOrchestrationService.LockNextOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
=> this.entityOrchestrationService.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
2.2 - azure storage
2.2.1 - AzureStorageDurabilityProvider
Azure-functions-durable-extension 仓库下的 src\WebJobs.Extensions.DurableTask\AzureStorageDurabilityProvider.cs
The Azure Storage implementation of additional methods not required by IOrchestrationService.
IOrchestrationService 不需要的其他方法的 Azure 存储实现。
类定义
internal class AzureStorageDurabilityProvider : DurabilityProvider {......}
DurabilityProvider 在 src\WebJobs.Extensions.DurableTask\DurabilityProvider.cs
LockNextOrchestrationWorkItemAsync() 方法
这个方法的实现在基类 DurabilityProvider 中,AzureStorageDurabilityProvider 没有覆盖这个基类方法。
3 - worker
3.1 - worker initial
3.1.1 - 过时的初始化
GrpcDurableTaskWorker
仓库 durabletask-dotnet 下的文件 src\Worker\Grpc\GrpcDurableTaskWorker.Processor.cs
public async Task ExecuteAsync(CancellationToken cancellation)
{
while (!cancellation.IsCancellationRequested)
{
try
{
AsyncServerStreamingCall<P.WorkItem> stream = await this.ConnectAsync(cancellation);
await this.ProcessWorkItemsAsync(stream, cancellation);
}
......
}
}
ConnectAsync() 方法中会调用 grpc protobuf 文件中定义的 GetWorkItems() 方法:
async Task<AsyncServerStreamingCall<P.WorkItem>> ConnectAsync(CancellationToken cancellation)
{
await this.sidecar!.HelloAsync(EmptyMessage, cancellationToken: cancellation);
this.Logger.EstablishedWorkItemConnection();
Console.WriteLine("********GrpcDurableTaskWorker call GetWorkItems()********");
// Get the stream for receiving work-items
return this.sidecar!.GetWorkItems(new P.GetWorkItemsRequest(), cancellationToken: cancellation);
}
这本该是 worker 正常的初始化流程,但现在已经被废弃。
备注:记录一下避免误解,事实上我被耽误了很多时间在这里。
3.1.2 - 调用堆栈
调用堆栈概况
在 azure-functions-dotnet-worker 仓库下的 src\DotNetWorker.Grpc\GrpcWorker.cs 中增加日志打印:
public Task StartAsync(CancellationToken token)
{
Console.WriteLine(new System.Diagnostics.StackTrace(true));
_workerClient = _workerClientFactory.CreateClient(this);
Console.WriteLine("_workerClient is " + _workerClient.GetType().Name);
return _workerClient.StartAsync(token);
}
得到启动时初始化 worker 的调用堆栈:
[2024-04-08T09:07:04.591Z] at Microsoft.Azure.Functions.Worker.GrpcWorker.StartAsync(CancellationToken token) in C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Grpc\GrpcWorker.cs:line 58
[2024-04-08T09:07:04.592Z] at Microsoft.Azure.Functions.Worker.WorkerHostedService.StartAsync(CancellationToken cancellationToken) in C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Core\WorkerHostedService.cs:line 25
[2024-04-08T09:07:04.592Z] at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.593Z] at Microsoft.Azure.Functions.Worker.WorkerHostedService.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.593Z] at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>b__15_1(IHostedService service, CancellationToken token)
[2024-04-08T09:07:04.593Z] at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.593Z] at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>b__15_1(IHostedService service, CancellationToken token)
[2024-04-08T09:07:04.594Z] at Microsoft.Extensions.Hosting.Internal.Host.ForeachService[T](IEnumerable`1 services, CancellationToken token, Boolean concurrent, Boolean abortOnFirstException, List`1 exceptions, Func`3 operation)
[2024-04-08T09:07:04.594Z] at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.594Z] at Microsoft.Extensions.Hosting.Internal.Host.ForeachService[T](IEnumerable`1 services, CancellationToken token, Boolean concurrent, Boolean abortOnFirstException, List`1 exceptions, Func`3 operation)
[2024-04-08T09:07:04.594Z] at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.595Z] at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.595Z] at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.595Z] at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
[2024-04-08T09:07:04.596Z] at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.596Z] at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
[2024-04-08T09:07:04.596Z] at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.Run(IHost host)
[2024-04-08T09:07:04.596Z] at Program.<Main>$(String[] args) in C:\Users\sky\work\code\durabletask\MyDurableFunction1\Program.cs:line 13
......
[2024-04-08T09:07:04.598Z] _workerClient is GrpcWorkerClient
[2024-04-08T09:07:04.629Z] Worker process started and initialized.
调用堆栈
GrpcWorker
public Task StartAsync(CancellationToken token)
{
_workerClient = _workerClientFactory.CreateClient(this);
return _workerClient.StartAsync(token);
}
GrpcWorkerClient
src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient
public async Task StartAsync(CancellationToken token)
{
if (_running)
{
throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
}
_running = true;
var eventStream = _grpcClient.EventStream(cancellationToken: token);
await SendStartStreamMessageAsync(eventStream.RequestStream);
_ = StartWriterAsync(eventStream.RequestStream);
_ = StartReaderAsync(eventStream.ResponseStream);
}
3.1.3 - GrpcWorkerClient
src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient
类定义
private class GrpcWorkerClient : IWorkerClient
{
private readonly FunctionRpcClient _grpcClient;
private readonly GrpcWorkerStartupOptions _startupOptions;
private readonly ChannelReader<StreamingMessage> _outputReader;
private readonly ChannelWriter<StreamingMessage> _outputWriter;
private bool _running;
private IMessageProcessor? _processor;
}
构造函数
public GrpcWorkerClient(GrpcHostChannel outputChannel, GrpcWorkerStartupOptions startupOptions, IMessageProcessor processor)
{
_startupOptions = startupOptions ?? throw new ArgumentNullException(nameof(startupOptions));
_processor = processor ?? throw new ArgumentNullException(nameof(processor));
// 初始化 reader 和 writer,都来自 outputChannel
_outputReader = outputChannel.Channel.Reader;
_outputWriter = outputChannel.Channel.Writer;
// 创建 _grpcClient,FunctionRpcClient 类型
_grpcClient = CreateClient();
}
CreateClient() 方法:
private FunctionRpcClient CreateClient()
{
#if NET5_0_OR_GREATER
GrpcChannel grpcChannel = GrpcChannel.ForAddress(_startupOptions.HostEndpoint!.AbsoluteUri, new GrpcChannelOptions()
{
MaxReceiveMessageSize = _startupOptions.GrpcMaxMessageLength,
MaxSendMessageSize = _startupOptions.GrpcMaxMessageLength,
Credentials = ChannelCredentials.Insecure
});
#else
var options = new ChannelOption[]
{
new ChannelOption(GrpcCore.ChannelOptions.MaxReceiveMessageLength, _startupOptions.GrpcMaxMessageLength),
new ChannelOption(GrpcCore.ChannelOptions.MaxSendMessageLength, _startupOptions.GrpcMaxMessageLength)
};
GrpcCore.Channel grpcChannel = new GrpcCore.Channel(_startupOptions.HostEndpoint!.Host, _startupOptions.HostEndpoint.Port, ChannelCredentials.Insecure, options);
#endif
return new FunctionRpcClient(grpcChannel);
}
}
start过程
StartAsync
public async Task StartAsync(CancellationToken token)
{
if (_running)
{
throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
}
_running = true;
var eventStream = _grpcClient.EventStream(cancellationToken: token);
await SendStartStreamMessageAsync(eventStream.RequestStream);
_ = StartWriterAsync(eventStream.RequestStream);
_ = StartReaderAsync(eventStream.ResponseStream);
}
3.1.4 - GrpcWorker
src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient
类定义
private class GrpcWorkerClient : IWorkerClient
{
private readonly FunctionRpcClient _grpcClient;
private readonly GrpcWorkerStartupOptions _startupOptions;
private readonly ChannelReader<StreamingMessage> _outputReader;
private readonly ChannelWriter<StreamingMessage> _outputWriter;
private bool _running;
private IMessageProcessor? _processor;
}
构造函数
public GrpcWorkerClient(GrpcHostChannel outputChannel, GrpcWorkerStartupOptions startupOptions, IMessageProcessor processor)
{
_startupOptions = startupOptions ?? throw new ArgumentNullException(nameof(startupOptions));
_processor = processor ?? throw new ArgumentNullException(nameof(processor));
// 初始化 reader 和 writer,都来自 outputChannel
_outputReader = outputChannel.Channel.Reader;
_outputWriter = outputChannel.Channel.Writer;
// 创建 _grpcClient,FunctionRpcClient 类型
_grpcClient = CreateClient();
}
CreateClient() 方法:
private FunctionRpcClient CreateClient()
{
#if NET5_0_OR_GREATER
GrpcChannel grpcChannel = GrpcChannel.ForAddress(_startupOptions.HostEndpoint!.AbsoluteUri, new GrpcChannelOptions()
{
MaxReceiveMessageSize = _startupOptions.GrpcMaxMessageLength,
MaxSendMessageSize = _startupOptions.GrpcMaxMessageLength,
Credentials = ChannelCredentials.Insecure
});
#else
var options = new ChannelOption[]
{
new ChannelOption(GrpcCore.ChannelOptions.MaxReceiveMessageLength, _startupOptions.GrpcMaxMessageLength),
new ChannelOption(GrpcCore.ChannelOptions.MaxSendMessageLength, _startupOptions.GrpcMaxMessageLength)
};
GrpcCore.Channel grpcChannel = new GrpcCore.Channel(_startupOptions.HostEndpoint!.Host, _startupOptions.HostEndpoint.Port, ChannelCredentials.Insecure, options);
#endif
return new FunctionRpcClient(grpcChannel);
}
}
处理消息
ProcessMessageAsync
Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
{
// Dispatch and return.
Task.Run(() => ProcessRequestCoreAsync(message));
return Task.CompletedTask;
}
ProcessRequestCoreAsync
private async Task ProcessRequestCoreAsync(StreamingMessage request)
{
StreamingMessage responseMessage = new StreamingMessage
{
RequestId = request.RequestId
};
switch (request.ContentCase)
{
case MsgType.InvocationRequest:
responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
break;
case MsgType.WorkerInitRequest:
Console.WriteLine("GrpcWorker received WorkerInitRequest");
responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
break;
case MsgType.WorkerStatusRequest:
responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
break;
case MsgType.FunctionsMetadataRequest:
responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
break;
case MsgType.WorkerTerminate:
WorkerTerminateRequestHandler(request.WorkerTerminate);
break;
case MsgType.FunctionLoadRequest:
responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
break;
case MsgType.FunctionEnvironmentReloadRequest:
responseMessage.FunctionEnvironmentReloadResponse = EnvironmentReloadRequestHandler(_workerOptions);
break;
case MsgType.InvocationCancel:
InvocationCancelRequestHandler(request.InvocationCancel);
break;
default:
// TODO: Trace failure here.
return;
}
await _workerClient!.SendMessageAsync(responseMessage);
}
WorkerInitRequest
case MsgType.WorkerInitRequest:
responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
break;
WorkerInitRequestHandler() 方法的实现:
internal static WorkerInitResponse WorkerInitRequestHandler(WorkerInitRequest request, WorkerOptions workerOptions)
{
var response = new WorkerInitResponse
{
Result = new StatusResult { Status = StatusResult.Types.Status.Success },
WorkerVersion = WorkerInformation.Instance.WorkerVersion,
WorkerMetadata = GetWorkerMetadata()
};
response.Capabilities.Add(GetWorkerCapabilities(workerOptions));
return response;
}
TBD: WorkerVersion 怎么来的?
InvocationRequest
case MsgType.InvocationRequest:
responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
break;
InvocationRequestHandlerAsync 方法的实现:
internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
{
return _invocationHandler.InvokeAsync(request);
}
3.1.5 - FunctionRpcClient
Azure/azure-functions-dotnet-worker 仓库下的 FunctionRpcClient protobuf 定义。
protobuf
proto文件地址:
protos\azure-functions-language-worker-protobuf\src\proto\FunctionRpc.proto
FunctionRpc service
FunctionRpc.proto 定义了 FunctionRpc 这个 grpc service:
option java_multiple_files = true;
option java_package = "com.microsoft.azure.functions.rpc.messages";
option java_outer_classname = "FunctionProto";
option csharp_namespace = "Microsoft.Azure.Functions.Worker.Grpc.Messages";
option go_package ="github.com/Azure/azure-functions-go-worker/internal/rpc";
package AzureFunctionsRpcMessages;
import "google/protobuf/duration.proto";
import "identity/ClaimsIdentityRpc.proto";
import "shared/NullableTypes.proto";
// Interface exported by the server.
service FunctionRpc {
}
EventStream 方法
只定义了一个 EventStream 方法:
rpc EventStream (stream StreamingMessage) returns (stream StreamingMessage) {}
request 和 response 都是 stream,类型都是 StreamingMessage。
StreamingMessage
StreamingMessage 除了一个 request_id 用来在 host 和 worker 之间做唯一标识外,就只有一个 oneof content 字段:
message StreamingMessage {
// Used to identify message between host and worker
string request_id = 1;
// Payload of the message
oneof content {
......
}
}
消息类型还挺多:
oneof content {
// Worker initiates stream
StartStream start_stream = 20;
// Host sends capabilities/init data to worker
WorkerInitRequest worker_init_request = 17;
// Worker responds after initializing with its capabilities & status
WorkerInitResponse worker_init_response = 16;
// MESSAGE NOT USED
// Worker periodically sends empty heartbeat message to host
WorkerHeartbeat worker_heartbeat = 15;
// Host sends terminate message to worker.
// Worker terminates if it can, otherwise host terminates after a grace period
WorkerTerminate worker_terminate = 14;
// Host periodically sends status request to the worker
WorkerStatusRequest worker_status_request = 12;
WorkerStatusResponse worker_status_response = 13;
// On file change event, host sends notification to worker
FileChangeEventRequest file_change_event_request = 6;
// Worker requests a desired action (restart worker, reload function)
WorkerActionResponse worker_action_response = 7;
// Host sends required metadata to worker to load function
FunctionLoadRequest function_load_request = 8;
// Worker responds after loading with the load result
FunctionLoadResponse function_load_response = 9;
// Host requests a given invocation
InvocationRequest invocation_request = 4;
// Worker responds to a given invocation
InvocationResponse invocation_response = 5;
// Host sends cancel message to attempt to cancel an invocation.
// If an invocation is cancelled, host will receive an invocation response with status cancelled.
InvocationCancel invocation_cancel = 21;
// Worker logs a message back to the host
RpcLog rpc_log = 2;
FunctionEnvironmentReloadRequest function_environment_reload_request = 25;
FunctionEnvironmentReloadResponse function_environment_reload_response = 26;
// Ask the worker to close any open shared memory resources for a given invocation
CloseSharedMemoryResourcesRequest close_shared_memory_resources_request = 27;
CloseSharedMemoryResourcesResponse close_shared_memory_resources_response = 28;
// Worker indexing message types
FunctionsMetadataRequest functions_metadata_request = 29;
FunctionMetadataResponse function_metadata_response = 30;
// Host sends required metadata to worker to load functions
FunctionLoadRequestCollection function_load_request_collection = 31;
// Host gets the list of function load responses
FunctionLoadResponseCollection function_load_response_collection = 32;
// Host sends required metadata to worker to warmup the worker
WorkerWarmupRequest worker_warmup_request = 33;
// Worker responds after warming up with the warmup result
WorkerWarmupResponse worker_warmup_response = 34;
}
StartStream
// Worker initiates stream
StartStream start_stream = 20;
// Process.Start required info
// connection details
// protocol type
// protocol version
// Worker sends the host information identifying itself
message StartStream {
// id of the worker
string worker_id = 2;
}
TBD: 这里可以考虑增加一个 version 字段。
WorkerInitRequest / WorkerInitResponse
// Host sends capabilities/init data to worker
WorkerInitRequest worker_init_request = 17;
// Worker responds after initializing with its capabilities & status
WorkerInitResponse worker_init_response = 16;
// Host requests the worker to initialize itself
message WorkerInitRequest {
// version of the host sending init request
string host_version = 1;
// A map of host supported features/capabilities
map<string, string> capabilities = 2;
// inform worker of supported categories and their levels
// i.e. Worker = Verbose, Function.MyFunc = None
map<string, RpcLog.Level> log_categories = 3;
// Full path of worker.config.json location
string worker_directory = 4;
// base directory for function app
string function_app_directory = 5;
}
// Worker responds with the result of initializing itself
message WorkerInitResponse {
// PROPERTY NOT USED
// TODO: Remove from protobuf during next breaking change release
string worker_version = 1;
// A map of worker supported features/capabilities
map<string, string> capabilities = 2;
// Status of the response
StatusResult result = 3;
// Worker metadata captured for telemetry purposes
WorkerMetadata worker_metadata = 4;
}
WorkerHeartbeat
// MESSAGE NOT USED
// Worker periodically sends empty heartbeat message to host
WorkerHeartbeat worker_heartbeat = 15;
// MESSAGE NOT USED
// TODO: Remove from protobuf during next breaking change release
message WorkerHeartbeat {}
WorkerTerminate
// Host sends terminate message to worker.
// Worker terminates if it can, otherwise host terminates after a grace period
WorkerTerminate worker_terminate = 14;
// Warning before killing the process after grace_period
// Worker self terminates ..no response on this
message WorkerTerminate {
google.protobuf.Duration grace_period = 1;
}
WorkerStatusRequest / WorkerStatusResponse
// Host periodically sends status request to the worker
WorkerStatusRequest worker_status_request = 12;
WorkerStatusResponse worker_status_response = 13;
// Used by the host to determine worker health
message WorkerStatusRequest {
}
// Worker responds with status message
// TODO: Add any worker relevant status to response
message WorkerStatusResponse {
}
InvocationRequest / InvocationResponse
// Host requests a given invocation
InvocationRequest invocation_request = 4;
// Worker responds to a given invocation
InvocationResponse invocation_response = 5;
// Host requests worker to invoke a Function
message InvocationRequest {
// Unique id for each invocation
string invocation_id = 1;
// Unique id for each Function
string function_id = 2;
// Input bindings (include trigger)
repeated ParameterBinding input_data = 3;
// binding metadata from trigger
map<string, TypedData> trigger_metadata = 4;
// Populates activityId, tracestate and tags from host
RpcTraceContext trace_context = 5;
// Current retry context
RetryContext retry_context = 6;
}
// Worker responds with status of Invocation
message InvocationResponse {
// Unique id for invocation
string invocation_id = 1;
// Output binding data
repeated ParameterBinding output_data = 2;
// data returned from Function (for $return and triggers with return support)
TypedData return_value = 4;
// Status of the invocation (success/failure/canceled)
StatusResult result = 3;
}
WorkerWarmupRequest / WorkerWarmupResponse
// Host sends required metadata to worker to warmup the worker
WorkerWarmupRequest worker_warmup_request = 33;
// Worker responds after warming up with the warmup result
WorkerWarmupResponse worker_warmup_response = 34;
message WorkerWarmupRequest {
// Full path of worker.config.json location
string worker_directory = 1;
}
message WorkerWarmupResponse {
StatusResult result = 1;
}
3.2 - client run orchestrator
3.2.1 - 调用堆栈
调用堆栈概况
MyDurableFunction1.dll!Company.Function.HelloOrchestration.RunOrchestrator(Microsoft.DurableTask.TaskOrchestrationContext context) Line 16 (c:\Users\sky\work\code\durabletask\MyDurableFunction1\HelloOrchestration.cs:16)
MyDurableFunction1.dll!MyDurableFunction1.DirectFunctionExecutor.ExecuteAsync(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 32 (GeneratedFunctionExecutor.g.cs:32)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.Pipeline.FunctionExecutionMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 20 (FunctionExecutionMiddleware.cs:20)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseFunctionExecutionMiddleware.AnonymousMethod__1_2(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 57 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:57)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.OutputBindings.OutputBindingsMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 13 (OutputBindingsMiddleware.cs:13)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseOutputBindingsMiddleware.AnonymousMethod__3(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 84 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:84)
Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.dll!Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 34 (FunctionsHttpProxyingMiddleware.cs:34)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseMiddleware.AnonymousMethod__1(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 105 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:105)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrator.EnsureSynchronousExecution(Microsoft.Azure.Functions.Worker.FunctionContext functionContext, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next, Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrationContext orchestrationContext) Line 72 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\FunctionsOrchestrator.cs:72)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrator.RunAsync(Microsoft.DurableTask.TaskOrchestrationContext context, object input) Line 51 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\FunctionsOrchestrator.cs:51)
Microsoft.DurableTask.Worker.dll!Microsoft.DurableTask.Worker.Shims.TaskOrchestrationShim.Execute(DurableTask.Core.OrchestrationContext innerContext, string rawInput) Line 52 (c:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\src\Worker\Core\Shims\TaskOrchestrationShim.cs:52)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.ProcessEvent(DurableTask.Core.History.HistoryEvent historyEvent) Line 211 (TaskOrchestrationExecutor.cs:211)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.ExecuteCore.__ProcessEvents|12_0(System.Collections.Generic.IEnumerable<DurableTask.Core.History.HistoryEvent> events) Line 135 (TaskOrchestrationExecutor.cs:135)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.ExecuteCore(System.Collections.Generic.IEnumerable<DurableTask.Core.History.HistoryEvent> pastEvents, System.Collections.Generic.IEnumerable<DurableTask.Core.History.HistoryEvent> newEvents) Line 143 (TaskOrchestrationExecutor.cs:143)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.Execute() Line 93 (TaskOrchestrationExecutor.cs:93)
Microsoft.DurableTask.Worker.Grpc.dll!Microsoft.DurableTask.Worker.Grpc.GrpcOrchestrationRunner.LoadAndRun(string encodedOrchestratorRequest, Microsoft.DurableTask.ITaskOrchestrator implementation, System.IServiceProvider services) Line 113 (c:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\src\Worker\Grpc\GrpcOrchestrationRunner.cs:113)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.RunOrchestrationAsync(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.BindingMetadata triggerBinding, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 59 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\DurableTaskFunctionsMiddleware.cs:59)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext functionContext, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 22 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\DurableTaskFunctionsMiddleware.cs:22)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseMiddleware.AnonymousMethod__1(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 105 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:105)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.FunctionsApplication.InvokeFunctionAsync(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 77 (FunctionsApplication.cs:77)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.Handlers.InvocationHandler.InvokeAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.InvocationRequest request) Line 88 (InvocationHandler.cs:88)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.InvocationRequestHandlerAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.InvocationRequest request) Line 122 (GrpcWorker.cs:122)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.ProcessRequestCoreAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.StreamingMessage request) Line 81 (GrpcWorker.cs:81)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.Microsoft.Azure.Functions.Worker.Grpc.IMessageProcessor.ProcessMessageAsync.AnonymousMethod__0() Line 66 (GrpcWorker.cs:66)
System.Private.CoreLib.dll!System.Threading.Tasks.Task<System.Threading.Tasks.Task>.InnerInvoke() (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(System.Threading.Thread threadPoolThread, System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.Tasks.Task.ExecuteWithThreadLocal(ref System.Threading.Tasks.Task currentTaskSlot, System.Threading.Thread threadPoolThread) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch() (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart() (Unknown Source:0)
[Native to Managed Transition] (Unknown Source:0)
Azure function dotnet worker
GrpcWorkerClientFactory
在 azure-functions-dotnet-worker 仓库的 src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中:
private async Task StartReaderAsync(IAsyncStreamReader<StreamingMessage> responseStream)
{
while (await responseStream.MoveNext())
{
await _processor!.ProcessMessageAsync(responseStream.Current);
}
}
这是 worker grpc 服务器端的入口。
这里的 _processor 实现的 Microsoft.Azure.Functions.Worker.GrpcWorker
GrpcWorker
在 azure-functions-dotnet-worker 仓库的 src\DotNetWorker.Grpc\GrpcWorker.cs 文件中:
grpc worker 收到 grpc 消息之后,调用 ProcessRequestCoreAsync() 方法进行处理,注意这里是异步:
Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
{
// Dispatch and return.
Task.Run(() => ProcessRequestCoreAsync(message));
return Task.CompletedTask;
}
ProcessRequestCoreAsync() 方法的实现:
private async Task ProcessRequestCoreAsync(StreamingMessage request)
{
StreamingMessage responseMessage = new StreamingMessage
{
RequestId = request.RequestId
};
switch (request.ContentCase)
{
case MsgType.InvocationRequest:
// 会走到这里,注意会有两次 InvocationRequest
// 第一次是 HelloOrchestration_HttpStart,第二次才是 RunOrchestrator
responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
break;
case MsgType.WorkerInitRequest:
responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
break;
case MsgType.WorkerStatusRequest:
responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
break;
case MsgType.FunctionsMetadataRequest:
responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
break;
case MsgType.WorkerTerminate:
WorkerTerminateRequestHandler(request.WorkerTerminate);
break;
case MsgType.FunctionLoadRequest:
responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
break;
case MsgType.FunctionEnvironmentReloadRequest:
responseMessage.FunctionEnvironmentReloadResponse = EnvironmentReloadRequestHandler(_workerOptions);
break;
case MsgType.InvocationCancel:
InvocationCancelRequestHandler(request.InvocationCancel);
break;
default:
// TODO: Trace failure here.
return;
}
await _workerClient!.SendMessageAsync(responseMessage);
}
InvocationRequestHandlerAsync() 方法
internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
{
return _invocationHandler.InvokeAsync(request);
}
这里的 _invocationHandler 的定义类型是 IInvocationHandler,实现是 Microsoft.Azure.Functions.Worker.Handlers.InvocationHandler
InvocationHandler
代码在 azure-functions-dotnet-worker 仓库下的 src\DotNetWorker.Grpc\Handlers\InvocationHandler.cs :
public async Task<InvocationResponse> InvokeAsync(InvocationRequest request)
{
using CancellationTokenSource cancellationTokenSource = new();
FunctionContext? context = null;
InvocationResponse response = new()
{
InvocationId = request.InvocationId,
Result = new StatusResult()
};
if (!_inflightInvocations.TryAdd(request.InvocationId, cancellationTokenSource))
{
var exception = new InvalidOperationException("Unable to track CancellationTokenSource");
response.Result.Status = StatusResult.Types.Status.Failure;
response.Result.Exception = exception.ToRpcException();
return response;
}
try
{
var invocation = new GrpcFunctionInvocation(request);
IInvocationFeatures invocationFeatures = _invocationFeaturesFactory.Create();
invocationFeatures.Set<FunctionInvocation>(invocation);
invocationFeatures.Set<IExecutionRetryFeature>(invocation);
context = _application.CreateContext(invocationFeatures, cancellationTokenSource.Token);
invocationFeatures.Set<IFunctionBindingsFeature>(new GrpcFunctionBindingsFeature(context, request, _outputBindingsInfoProvider));
if (_inputConversionFeatureProvider.TryCreate(typeof(DefaultInputConversionFeature), out var conversion))
{
invocationFeatures.Set<IInputConversionFeature>(conversion!);
}
// 进入这里
await _application.InvokeFunctionAsync(context);
var serializer = _workerOptions.Serializer!;
var functionBindings = context.GetBindings();
foreach (var binding in functionBindings.OutputBindingData)
{
var parameterBinding = new ParameterBinding
{
Name = binding.Key
};
if (binding.Value is not null)
{
parameterBinding.Data = await binding.Value.ToRpcAsync(serializer);
}
response.OutputData.Add(parameterBinding);
}
if (functionBindings.InvocationResult is not null)
{
TypedData? returnVal = await functionBindings.InvocationResult.ToRpcAsync(serializer);
response.ReturnValue = returnVal;
}
response.Result.Status = StatusResult.Types.Status.Success;
}
catch (Exception ex)
{
response.Result.Exception = _workerOptions.EnableUserCodeException ? ex.ToUserRpcException() : ex.ToRpcException();
response.Result.Status = StatusResult.Types.Status.Failure;
if (ex.InnerException is TaskCanceledException or OperationCanceledException)
{
response.Result.Status = StatusResult.Types.Status.Cancelled;
}
}
finally
{
_inflightInvocations.TryRemove(request.InvocationId, out var cts);
if (context is IAsyncDisposable asyncContext)
{
await asyncContext.DisposeAsync();
}
(context as IDisposable)?.Dispose();
}
return response;
}
_application 的定义类型是IFunctionsApplication ,实际实现是 Microsoft.Azure.Functions.Worker.FunctionsApplication
FunctionsApplication
代码在 azure-functions-dotnet-worker 仓库下的 src\DotNetWorker.Core\FunctionsApplication.cs
public async Task InvokeFunctionAsync(FunctionContext context)
{
var scope = new FunctionInvocationScope(context.FunctionDefinition.Name, context.InvocationId);
using var logScope = _logger.BeginScope(scope);
using Activity? invokeActivity = _functionActivitySourceFactory.StartInvoke(context);
try
{
// 进入这里
await _functionExecutionDelegate(context);
}
catch (Exception ex)
{
invokeActivity?.SetStatus(ActivityStatusCode.Error, ex.Message);
Log.InvocationError(_logger, context.FunctionDefinition.Name, context.InvocationId, ex);
throw;
}
}
_functionExecutionDelegate 的实现是 Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate
MiddlewareWorkerApplicationBuilderExtensions
代码在 azure-functions-dotnet-worker 仓库下的 src\DotNetWorker.Core\hosting\MiddlewareWorkerApplicationBuilderExtensions.cs
public static IFunctionsWorkerApplicationBuilder UseMiddleware<T>(this IFunctionsWorkerApplicationBuilder builder)
where T : class, IFunctionsWorkerMiddleware
{
builder.Services.AddSingleton<T>();
builder.Use(next =>
{
return context =>
{
var middleware = context.InstanceServices.GetRequiredService<T>();
return middleware.Invoke(context, next);
};
});
return builder;
}
这里的 middleware 实现是 DurableTaskFunctionsMiddleware。
从这里开始,调用从 azure function dotnet worker (这是 Azure function 的代码) 进去 Azure functions durable extension (这是 Durable Function 的代码)。
这之前的代码都不能动。
Azure functions durable extension
DurableTaskFunctionsMiddleware
代码在 azure-functions-durable-extension 仓库中的 src\Worker.Extensions.DurableTask\DurableTaskFunctionsMiddleware.cs 文件中:
public Task Invoke(FunctionContext functionContext, FunctionExecutionDelegate next)
{
if (IsOrchestrationTrigger(functionContext, out BindingMetadata? triggerBinding))
{
// 代码进入这里
return RunOrchestrationAsync(functionContext, triggerBinding, next);
}
if (IsEntityTrigger(functionContext, out triggerBinding))
{
return RunEntityAsync(functionContext, triggerBinding, next);
}
return next(functionContext);
}
RunOrchestrationAsync() 方法的实现:
static async Task RunOrchestrationAsync(
FunctionContext context, BindingMetadata triggerBinding, FunctionExecutionDelegate next)
{
InputBindingData<object> triggerInputData = await context.BindInputAsync<object>(triggerBinding);
if (triggerInputData?.Value is not string encodedOrchestratorState)
{
throw new InvalidOperationException("Orchestration history state was either missing from the input or not a string value.");
}
FunctionsOrchestrator orchestrator = new(context, next, triggerInputData);
string orchestratorOutput = GrpcOrchestrationRunner.LoadAndRun(
encodedOrchestratorState, orchestrator, context.InstanceServices);
// Send the encoded orchestrator output as the return value seen by the functions host extension
context.GetInvocationResult().Value = orchestratorOutput;
}
durabletask-dotnet
GrpcOrchestrationRunner
在 durabletask-dotnet 仓库下的 \src\Worker\Grpc\GrpcOrchestrationRunner.cs
public static string LoadAndRun(
string encodedOrchestratorRequest,
ITaskOrchestrator implementation,
IServiceProvider? services = null)
{
Check.NotNullOrEmpty(encodedOrchestratorRequest);
Check.NotNull(implementation);
P.OrchestratorRequest request = P.OrchestratorRequest.Parser.Base64Decode<P.OrchestratorRequest>(
encodedOrchestratorRequest);
List<HistoryEvent> pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList();
IEnumerable<HistoryEvent> newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent);
// Re-construct the orchestration state from the history.
// New events must be added using the AddEvent method.
OrchestrationRuntimeState runtimeState = new(pastEvents);
foreach (HistoryEvent newEvent in newEvents)
{
runtimeState.AddEvent(newEvent);
}
TaskName orchestratorName = new(runtimeState.Name);
ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p
? new(new(p.Name), p.OrchestrationInstance.InstanceId)
: null;
DurableTaskShimFactory factory = services is null
? DurableTaskShimFactory.Default
: ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);
TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, parent);
TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails);
// 代码进入这里
OrchestratorExecutionResult result = executor.Execute();
P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse(
request.InstanceId,
result.CustomStatus,
result.Actions);
byte[] responseBytes = response.ToByteArray();
return Convert.ToBase64String(responseBytes);
}
这里的 P.OrchestratorRequest request 要仔细检查,看是否设置了 version:
- request.InstanceId
- 作为 newEvent 的 ExecutionStartedEvent 的 version 字段和 OrchestrationInstance.InstanceVersion 字段
TaskOrchestrationExecutor
DurableTask.Core.TaskOrchestrationExecutor 这个类在 Azure/durabletask 项目中
public OrchestratorExecutionResult Execute()
{
return this.ExecuteCore(
pastEvents: this.orchestrationRuntimeState.PastEvents,
newEvents: this.orchestrationRuntimeState.NewEvents);
}
第一次执行时,PastEvents 为空,NewEvents 里面有两个 event:
- DurableTask.Core.History.OrchestratorStartedEvent
- DurableTask.Core.History.ExecutionStartedEvent
ExecuteCore() 方法的实现:
OrchestratorExecutionResult ExecuteCore(IEnumerable<HistoryEvent> pastEvents, IEnumerable<HistoryEvent> newEvents)
{
SynchronizationContext prevCtx = SynchronizationContext.Current;
try
{
SynchronizationContext syncCtx = new TaskOrchestrationSynchronizationContext(this.decisionScheduler);
SynchronizationContext.SetSynchronizationContext(syncCtx);
OrchestrationContext.IsOrchestratorThread = true;
try
{
void ProcessEvents(IEnumerable<HistoryEvent> events)
{
foreach (HistoryEvent historyEvent in events)
{
if (historyEvent.EventType == EventType.OrchestratorStarted)
{
var decisionStartedEvent = (OrchestratorStartedEvent)historyEvent;
this.context.CurrentUtcDateTime = decisionStartedEvent.Timestamp;
continue;
}
// 进入这里
this.ProcessEvent(historyEvent);
historyEvent.IsPlayed = true;
}
}
// Replay the old history to rebuild the local state of the orchestration.
// TODO: Log a verbose message indicating that the replay has started (include event count?)
this.context.IsReplaying = true;
ProcessEvents(pastEvents);
// Play the newly arrived events to determine the next action to take.
// TODO: Log a verbose message indicating that new events are being processed (include event count?)
this.context.IsReplaying = false;
// 第一次调用会进去这里,IsReplaying 设置为 false
ProcessEvents(newEvents);
// check if workflow is completed after this replay
// TODO: Create a setting that allows orchestrations to complete when the orchestrator
// function completes, even if there are open tasks.
if (!this.context.HasOpenTasks)
{
if (this.result!.IsCompleted)
{
if (this.result.IsFaulted)
{
Exception? exception = this.result.Exception?.InnerExceptions.FirstOrDefault();
Debug.Assert(exception != null);
if (Utils.IsExecutionAborting(exception!))
{
// Let this exception propagate out to be handled by the dispatcher
ExceptionDispatchInfo.Capture(exception).Throw();
}
this.context.FailOrchestration(exception);
}
else
{
this.context.CompleteOrchestration(this.result.Result);
}
}
// TODO: It is an error if result is not completed when all OpenTasks are done.
// Throw an exception in that case.
}
}
catch (NonDeterministicOrchestrationException exception)
{
this.context.FailOrchestration(exception);
}
return new OrchestratorExecutionResult
{
Actions = this.context.OrchestratorActions,
CustomStatus = this.taskOrchestration.GetStatus(),
};
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevCtx);
OrchestrationContext.IsOrchestratorThread = false;
}
}
ProcessEvent() 方法
void ProcessEvent(HistoryEvent historyEvent)
{
bool overrideSuspension = historyEvent.EventType == EventType.ExecutionResumed || historyEvent.EventType == EventType.ExecutionTerminated;
if (this.context.IsSuspended && !overrideSuspension)
{
this.context.HandleEventWhileSuspended(historyEvent);
}
else
{
switch (historyEvent.EventType)
{
case EventType.ExecutionStarted:
// 执行这里的代码
var executionStartedEvent = (ExecutionStartedEvent)historyEvent;
this.result = this.taskOrchestration.Execute(this.context, executionStartedEvent.Input);
break;
case EventType.ExecutionTerminated:
this.context.HandleExecutionTerminatedEvent((ExecutionTerminatedEvent)historyEvent);
break;
case EventType.TaskScheduled:
this.context.HandleTaskScheduledEvent((TaskScheduledEvent)historyEvent);
break;
case EventType.TaskCompleted:
this.context.HandleTaskCompletedEvent((TaskCompletedEvent)historyEvent);
break;
case EventType.TaskFailed:
this.context.HandleTaskFailedEvent((TaskFailedEvent)historyEvent);
break;
case EventType.SubOrchestrationInstanceCreated:
this.context.HandleSubOrchestrationCreatedEvent((SubOrchestrationInstanceCreatedEvent)historyEvent);
break;
case EventType.SubOrchestrationInstanceCompleted:
this.context.HandleSubOrchestrationInstanceCompletedEvent(
(SubOrchestrationInstanceCompletedEvent)historyEvent);
break;
case EventType.SubOrchestrationInstanceFailed:
this.context.HandleSubOrchestrationInstanceFailedEvent((SubOrchestrationInstanceFailedEvent)historyEvent);
break;
case EventType.TimerCreated:
this.context.HandleTimerCreatedEvent((TimerCreatedEvent)historyEvent);
break;
case EventType.TimerFired:
this.context.HandleTimerFiredEvent((TimerFiredEvent)historyEvent);
break;
case EventType.EventSent:
this.context.HandleEventSentEvent((EventSentEvent)historyEvent);
break;
case EventType.EventRaised:
this.context.HandleEventRaisedEvent((EventRaisedEvent)historyEvent, this.skipCarryOverEvents, this.taskOrchestration);
break;
case EventType.ExecutionSuspended:
this.context.HandleExecutionSuspendedEvent((ExecutionSuspendedEvent)historyEvent);
break;
case EventType.ExecutionResumed:
this.context.HandleExecutionResumedEvent((ExecutionResumedEvent)historyEvent, ProcessEvent);
break;
}
}
}
versioning TODO: 这里的 ExecutionStartedEvent 的 version 字段暂时为空,后面需要更新。
TaskOrchestrationShim
public override async Task<string?> Execute(OrchestrationContext innerContext, string rawInput)
{
Check.NotNull(innerContext);
JsonDataConverterShim converterShim = new(this.invocationContext.Options.DataConverter);
innerContext.MessageDataConverter = converterShim;
innerContext.ErrorDataConverter = converterShim;
object? input = this.DataConverter.Deserialize(rawInput, this.implementation.InputType);
this.wrapperContext = new(innerContext, this.invocationContext, input);
try
{
object? output = await this.implementation.RunAsync(this.wrapperContext, input);
// Return the output (if any) as a serialized string.
return this.DataConverter.Serialize(output);
}
finally
{
// if user code crashed inside a critical section, or did not exit it, do that now
this.wrapperContext.ExitCriticalSectionIfNeeded();
}
}
versioning TODO: 这里的 OrchestrationContext 的 OrchestrationInstance 字段只包含 InstanceId 和 ExecutionId,需要增加一个 InstanceVersion 字段,其值应该从 ExecutionStartedEvent 的 version 字段中获取。
wrapperContext 的实现是 TaskOrchestrationContextWrapper
versioning TODO: TaskOrchestrationContextWrapper 需要增加一个 InstanceVersion 字段,其值从 this.innerContext.OrchestrationInstance.InstanceVersion 中获取
this.implementation 的实现是 Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrator
FunctionsOrchestrator
public async Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
{
// Set the function input to be the orchestration context wrapped in our own object so that we can
// intercept any of the calls and inject our own logic or tracking.
FunctionsOrchestrationContext wrapperContext = new(context, this.functionContext);
this.contextBinding.Value = wrapperContext;
this.inputContext.PrepareInput(input);
try
{
// This method will advance to the next middleware and throw if it detects an asynchronous execution.
await EnsureSynchronousExecution(this.functionContext, this.next, wrapperContext);
}
catch (Exception ex)
{
this.functionContext.GetLogger<FunctionsOrchestrator>().LogError(
ex,
"An error occurred while executing the orchestrator function '{FunctionName}'.",
this.functionContext.FunctionDefinition.Name);
throw;
}
// Set the raw function output as the orchestrator output
object? functionOutput = this.functionContext.GetInvocationResult().Value;
return functionOutput;
}
EnsureSynchronousExecution的实现:
private static async Task EnsureSynchronousExecution(
FunctionContext functionContext,
FunctionExecutionDelegate next,
FunctionsOrchestrationContext orchestrationContext)
{
Task orchestratorTask = next(functionContext);
if (!orchestratorTask.IsCompleted && !orchestrationContext.IsAccessed)
{
// If the middleware returns before the orchestrator function's context object was accessed and before
// it completes its execution, then we know that either some middleware component went async or that the
// orchestrator function did some illegal await as its very first action.
throw new InvalidOperationException(Constants.IllegalAwaitErrorMessage);
}
await orchestratorTask;
// This will throw if either the orchestrator performed an illegal await or if some middleware ahead of this
// one performed some illegal await.
orchestrationContext.ThrowIfIllegalAccess();
}
next 函数的实现是在前面定义的:
return context =>
{
var middleware = context.InstanceServices.GetRequiredService<T>();
return middleware.Invoke(context, next);
};
Azure-functions-dotnet-worker
FunctionExecutionMiddleware
public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
{
// Only use the coordinator for HttpTriggers
if (!_isHttpTrigger.GetOrAdd(context.FunctionId, static (_, c) => IsHttpTriggerFunction(c), context))
{
await next(context);
return;
}
var invocationId = context.InvocationId;
// this call will block until the ASP.NET middleware pipeline has signaled that it's ready to run the function
var httpContext = await _coordinator.SetFunctionContextAsync(invocationId, context);
AddHttpContextToFunctionContext(context, httpContext);
// Register additional context features
context.Features.Set<IFromBodyConversionFeature>(FromBodyConverstionFeature.Instance);
await next(context);
var invocationResult = context.GetInvocationResult();
if (invocationResult?.Value is IActionResult actionResult)
{
ActionContext actionContext = new ActionContext(httpContext, httpContext.GetRouteData(), new ActionDescriptor());
await actionResult.ExecuteResultAsync(actionContext);
}
else if (invocationResult?.Value is AspNetCoreHttpResponseData)
{
// The AspNetCoreHttpResponseData implementation is
// simply a wrapper over the underlying HttpResponse and
// all APIs manipulate the request.
// There's no need to return this result as no additional
// processing is required.
invocationResult.Value = null;
}
// allows asp.net middleware to continue
_coordinator.CompleteFunctionInvocation(invocationId);
}
public Task Invoke(FunctionContext context)
{
return _functionExecutor.ExecuteAsync(context).AsTask();
}
这里的 _functionExecutor 的实现是 MyDurableFunction1.DirectFunctionExecutor
OutputBindingsMiddleware
public static async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
{
await next(context);
AddOutputBindings(context);
}
Work.Sdk.Generator.GeneratedFunctionExecutor
被 GeneratedFunctionExecutor.g.cs 调用:
public async ValueTask ExecuteAsync(FunctionContext context)
{
var inputBindingFeature = context.Features.Get<IFunctionInputBindingFeature>();
var inputBindingResult = await inputBindingFeature.BindFunctionInputAsync(context);
var inputArguments = inputBindingResult.Values;
if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.RunOrchestrator", StringComparison.Ordinal))
{
context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.RunOrchestrator((global::Microsoft.DurableTask.TaskOrchestrationContext)inputArguments[0]);
}
else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.SayHello", StringComparison.Ordinal))
{
context.GetInvocationResult().Value = global::Company.Function.HelloOrchestration.SayHello((string)inputArguments[0], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[1]);
}
else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.HttpStart", StringComparison.Ordinal))
{
context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.HttpStart((global::Microsoft.Azure.Functions.Worker.Http.HttpRequestData)inputArguments[0], (global::Microsoft.DurableTask.Client.DurableTaskClient)inputArguments[1], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[2]);
}
}
通过检查 context.FunctionDefinition.EntryPoint 的值,如果为以下值时,则分别调用对应的 function:
context.FunctionDefinition.EntryPoint 的值 |
function | funciton source code |
|---|---|---|
| “Company.Function.HelloOrchestration.SayHello” | HelloOrchestration.RunOrchestrator() | [Function(nameof(HelloOrchestration))] |
| “Company.Function.HelloOrchestration.RunOrchestrator” | HelloOrchestration.SayHello() | [Function(nameof(SayHello))] |
| “Company.Function.HelloOrchestration.HttpStart” | HelloOrchestration.HttpStart() | [Function(“HelloOrchestration_HttpStart”)] |
其中,HttpStart() function 是用来接受 http 请求然后出发 Schedule New Orchestration Instance 操作的。
之后 Orchestration Engine 就会启动 Orchestration ,然后 RunOrchestrator() 方法被执行。
context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.RunOrchestrator((global::Microsoft.DurableTask.TaskOrchestrationContext)inputArguments[0]);
Customer Code
HelloOrchestration function
以 quickstart HelloOrchestration.cs 为例:
[Function(nameof(HelloOrchestration))]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger("1.5.6")] TaskOrchestrationContext context)
{
var instanceId = context.InstanceId;
var InstanceVersion = context.InstanceVersion;
......
}
3.3 - client run activity
3.3.1 - 调用堆栈
调用堆栈概况
MyDurableFunction2.dll!Company.Function.HelloOrchestration.SayHello(string name, Microsoft.Azure.Functions.Worker.FunctionContext executionContext) Line 35 (c:/Users/sky/work/code/durabletask-fork2/MyDurableFunction2/HelloOrchestration.cs:35)
MyDurableFunction2.dll!MyDurableFunction2.DirectFunctionExecutor.ExecuteAsync(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 36 (GeneratedFunctionExecutor.g.cs:36)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.Pipeline.FunctionExecutionMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 20 (FunctionExecutionMiddleware.cs:20)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseFunctionExecutionMiddleware.AnonymousMethod__1_2(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 57 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:57)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.OutputBindings.OutputBindingsMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 13 (OutputBindingsMiddleware.cs:13)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseOutputBindingsMiddleware.AnonymousMethod__3(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 84 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:84)
Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.dll!Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 34 (FunctionsHttpProxyingMiddleware.cs:34)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseMiddleware.AnonymousMethod__1(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 105 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:105)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext functionContext, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 30 (c:/Users/sky/work/code/durabletask-fork2/azure-functions-durable-extension/src/Worker.Extensions.DurableTask/DurableTaskFunctionsMiddleware.cs:30)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseMiddleware.AnonymousMethod__1(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 105 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:105)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.FunctionsApplication.InvokeFunctionAsync(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 77 (FunctionsApplication.cs:77)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.Handlers.InvocationHandler.InvokeAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.InvocationRequest request) Line 88 (InvocationHandler.cs:88)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.InvocationRequestHandlerAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.InvocationRequest request) Line 122 (GrpcWorker.cs:122)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.ProcessRequestCoreAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.StreamingMessage request) Line 81 (GrpcWorker.cs:81)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.Microsoft.Azure.Functions.Worker.Grpc.IMessageProcessor.ProcessMessageAsync.AnonymousMethod__0() Line 66 (GrpcWorker.cs:66)
System.Private.CoreLib.dll!System.Threading.Tasks.Task<System.Threading.Tasks.Task>.InnerInvoke() (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(System.Threading.Thread threadPoolThread, System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.Tasks.Task.ExecuteWithThreadLocal(ref System.Threading.Tasks.Task currentTaskSlot, System.Threading.Thread threadPoolThread) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch() (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart() (Unknown Source:0)
azure-function-dotnet-worker
GrpcWorker
Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
{
// Dispatch and return.
Task.Run(() => ProcessRequestCoreAsync(message));
return Task.CompletedTask;
}
private async Task ProcessRequestCoreAsync(StreamingMessage request)
{
StreamingMessage responseMessage = new StreamingMessage
{
RequestId = request.RequestId
};
switch (request.ContentCase)
{
case MsgType.InvocationRequest:
// 走到这里
responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
break;
......
internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
{
return _invocationHandler.InvokeAsync(request);
}
_invocationHandler 的实现是 InvocationHandler
InvocationHandler
public async Task<InvocationResponse> InvokeAsync(InvocationRequest request)
{
using CancellationTokenSource cancellationTokenSource = new();
FunctionContext? context = null;
InvocationResponse response = new()
{
InvocationId = request.InvocationId,
Result = new StatusResult()
};
if (!_inflightInvocations.TryAdd(request.InvocationId, cancellationTokenSource))
{
var exception = new InvalidOperationException("Unable to track CancellationTokenSource");
response.Result.Status = StatusResult.Types.Status.Failure;
response.Result.Exception = exception.ToRpcException();
return response;
}
try
{
var invocation = new GrpcFunctionInvocation(request);
IInvocationFeatures invocationFeatures = _invocationFeaturesFactory.Create();
invocationFeatures.Set<FunctionInvocation>(invocation);
invocationFeatures.Set<IExecutionRetryFeature>(invocation);
context = _application.CreateContext(invocationFeatures, cancellationTokenSource.Token);
invocationFeatures.Set<IFunctionBindingsFeature>(new GrpcFunctionBindingsFeature(context, request, _outputBindingsInfoProvider));
if (_inputConversionFeatureProvider.TryCreate(typeof(DefaultInputConversionFeature), out var conversion))
{
invocationFeatures.Set<IInputConversionFeature>(conversion!);
}
// 走到这里
await _application.InvokeFunctionAsync(context);
var serializer = _workerOptions.Serializer!;
......
}
FunctionsApplication
public async Task InvokeFunctionAsync(FunctionContext context)
{
var scope = new FunctionInvocationScope(context.FunctionDefinition.Name, context.InvocationId);
using var logScope = _logger.BeginScope(scope);
using Activity? invokeActivity = _functionActivitySourceFactory.StartInvoke(context);
try
{
await _functionExecutionDelegate(context);
}
catch (Exception ex)
{
invokeActivity?.SetStatus(ActivityStatusCode.Error, ex.Message);
Log.InvocationError(_logger, context.FunctionDefinition.Name, context.InvocationId, ex);
throw;
}
}
DurableTaskFunctionsMiddleware
public Task Invoke(FunctionContext functionContext, FunctionExecutionDelegate next)
{
if (IsOrchestrationTrigger(functionContext, out BindingMetadata? triggerBinding))
{
// call Orchestration 会走到这里
return RunOrchestrationAsync(functionContext, triggerBinding, next);
}
if (IsEntityTrigger(functionContext, out triggerBinding))
{
return RunEntityAsync(functionContext, triggerBinding, next);
}
// call activity 会走到这里
return next(functionContext);
}
这里是 call Orchestration 和 call activity 分叉的地方。
return next(functionContext) 中的 next 是 Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate 。
这个 next 是在 MiddlewareWorkerApplicationBuilderExtensions 中通过这样的方式注入的:
public static IFunctionsWorkerApplicationBuilder UseMiddleware<T>(this IFunctionsWorkerApplicationBuilder builder)
where T : class, IFunctionsWorkerMiddleware
{
builder.Services.AddSingleton<T>();
builder.Use(next =>
{
return context =>
{
var middleware = context.InstanceServices.GetRequiredService<T>();
return middleware.Invoke(context, next);
};
});
return builder;
}
FunctionsHttpProxyingMiddleware
public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
{
// Only use the coordinator for HttpTriggers
if (!_isHttpTrigger.GetOrAdd(context.FunctionId, static (_, c) => IsHttpTriggerFunction(c), context))
{
// 代码走到这里
await next(context);
return;
}
var invocationId = context.InvocationId;
// this call will block until the ASP.NET middleware pipeline has signaled that it's ready to run the function
var httpContext = await _coordinator.SetFunctionContextAsync(invocationId, context);
AddHttpContextToFunctionContext(context, httpContext);
// Register additional context features
context.Features.Set<IFromBodyConversionFeature>(FromBodyConverstionFeature.Instance);
await next(context);
var invocationResult = context.GetInvocationResult();
......
}
MiddlewareWorkerApplicationBuilderExtensions
builder.Use(next =>
{
return context =>
{
return OutputBindingsMiddleware.Invoke(context, next);
};
});
这里有一段内部匿名方法。
FunctionExecutionMiddleware
public Task Invoke(FunctionContext context)
{
return _functionExecutor.ExecuteAsync(context).AsTask();
}
_functionExecutor 的实现是 GeneratedFunctionExecutor 。
GeneratedFunctionExecutor
GeneratedFunctionExecutor 是生成的代码,内容在 GeneratedFunctionExecutor.g.cs 文件中:
public async ValueTask ExecuteAsync(FunctionContext context)
{
var inputBindingFeature = context.Features.Get<IFunctionInputBindingFeature>();
var inputBindingResult = await inputBindingFeature.BindFunctionInputAsync(context);
var inputArguments = inputBindingResult.Values;
if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.RunOrchestrator", StringComparison.Ordinal))
{
context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.RunOrchestrator((global::Microsoft.DurableTask.TaskOrchestrationContext)inputArguments[0]);
}
else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.SayHello", StringComparison.Ordinal))
{
context.GetInvocationResult().Value = global::Company.Function.HelloOrchestration.SayHello((string)inputArguments[0], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[1]);
}
else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.HttpStart", StringComparison.Ordinal))
{
context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.HttpStart((global::Microsoft.Azure.Functions.Worker.Http.HttpRequestData)inputArguments[0], (global::Microsoft.DurableTask.Client.DurableTaskClient)inputArguments[1], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[2]);
}
}
这里就会进入 SayHello 方法。
FunctionContext, 实际实现是
Microsoft.Azure.Functions.Worker.DefaultFunctionContext
用户代码
activity 实现
[Function(nameof(SayHello))]
public static string SayHello([ActivityTrigger] string name, FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("SayHello");
logger.LogInformation("Saying hello to {name}.", name);
return $"Hello {name}!";
}
3.4 - worker register orchestrator
3.4.1 - 调用堆栈
调用堆栈概况
在 azure-functions-durable-extension 仓库的 src\WebJobs.Extensions.DurableTask\DurableTaskExtension.cs文件中,增加堆栈打印的代码:
internal void RegisterOrchestrator(FunctionName orchestratorFunction, RegisteredFunctionInfo orchestratorInfo)
{
Console.WriteLine("call RegisterOrchestrator!");
Console.WriteLine("Information of RegisterOrchestrator: \n" + Environment.StackTrace);
......
}
得到的输出如下:
at Microsoft.Azure.WebJobs.Extensions.DurableTask.DurableTaskExtension.RegisterOrchestrator(FunctionName orchestratorFunction, RegisteredFunctionInfo orchestratorInfo) in C:\Users\sky\work\code\durabletask-fork2\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\DurableTaskExtension.cs:line 1215
at Microsoft.Azure.WebJobs.Extensions.DurableTask.OrchestrationTriggerAttributeBindingProvider.TryCreateAsync(TriggerBindingProviderContext context) in C:\Users\sky\work\code\durabletask-fork2\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\Bindings\OrchestrationTriggerAttributeBindingProvider.cs:line 69
at Microsoft.Azure.WebJobs.Host.Triggers.CompositeTriggerBindingProvider.TryCreateAsync(TriggerBindingProviderContext context) in D:\a\_work\1\s\src\Microsoft.Azure.WebJobs.Host\Triggers\CompositeTriggerBindingProvider.cs:line 20
at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
at Microsoft.Azure.WebJobs.Host.Triggers.CompositeTriggerBindingProvider.TryCreateAsync(TriggerBindingProviderContext context)
at Microsoft.Azure.WebJobs.Host.Indexers.FunctionIndexer.IndexMethodAsyncCore(MethodInfo method, IFunctionIndexCollector index, CancellationToken cancellationToken) in D:\a\_work\1\s\src\Microsoft.Azure.WebJobs.Host\Indexers\FunctionIndexer.cs:line 172
at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
at Microsoft.Azure.WebJobs.Host.Indexers.FunctionIndexer.IndexMethodAsyncCore(MethodInfo method, IFunctionIndexCollector index, CancellationToken cancellationToken)
at Microsoft.Azure.WebJobs.Host.Indexers.FunctionIndexer.IndexMethodAsync(MethodInfo method, IFunctionIndexCollector index, CancellationToken cancellationToken) in D:\a\_work\1\s\src\Microsoft.Azure.WebJobs.Host\Indexers\FunctionIndexer.cs:line 149
at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
at Microsoft.Azure.WebJobs.Host.Indexers.FunctionIndexer.IndexMethodAsync(MethodInfo method, IFunctionIndexCollector index, CancellationToken cancellationToken)
at Microsoft.Azure.WebJobs.Host.Indexers.FunctionIndexer.IndexTypeAsync(Type type, IFunctionIndexCollector index, CancellationToken cancellationToken) in D:\a\_work\1\s\src\Microsoft.Azure.WebJobs.Host\Indexers\FunctionIndexer.cs:line 81
at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
at Microsoft.Azure.WebJobs.Host.Indexers.FunctionIndexer.IndexTypeAsync(Type type, IFunctionIndexCollector index, CancellationToken cancellationToken)
CompositeTriggerBindingProvider
at Microsoft.Azure.WebJobs.Host.Triggers.CompositeTriggerBindingProvider.TryCreateAsync(TriggerBindingProviderContext context) in D:\a_work\1\s\src\Microsoft.Azure.WebJobs.Host\Triggers\CompositeTriggerBindingProvider.cs:line 20
OrchestrationTriggerAttributeBindingProvider
at Microsoft.Azure.WebJobs.Extensions.DurableTask.OrchestrationTriggerAttributeBindingProvider.TryCreateAsync(TriggerBindingProviderContext context) in C:\Users\sky\work\code\durabletask-fork2\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\Bindings\OrchestrationTriggerAttributeBindingProvider.cs:line 69
public Task<ITriggerBinding?> TryCreateAsync(TriggerBindingProviderContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}
ParameterInfo parameter = context.Parameter;
OrchestrationTriggerAttribute? trigger = parameter.GetCustomAttribute<OrchestrationTriggerAttribute>(inherit: false);
if (trigger == null)
{
return Task.FromResult<ITriggerBinding?>(null);
}
// Priority for getting the name is [OrchestrationTrigger], [FunctionName], method name
string name = trigger.Orchestration;
if (string.IsNullOrEmpty(name))
{
MemberInfo method = context.Parameter.Member;
name = method.GetCustomAttribute<FunctionNameAttribute>()?.Name ?? method.Name;
}
var orchestratorName = new FunctionName(name);
if (name.StartsWith("@"))
{
throw new ArgumentException("Orchestration names must not start with @.");
}
// 代码走到这里
this.config.RegisterOrchestrator(orchestratorName, null);
var binding = new OrchestrationTriggerBinding(this.config, parameter, orchestratorName, this.connectionName, this.platormInformation);
return Task.FromResult<ITriggerBinding?>(binding);
}
DurableTaskExtension
internal void RegisterOrchestrator(FunctionName orchestratorFunction, RegisteredFunctionInfo orchestratorInfo)
{
// print stack trace
var st = new StackTrace(true);
if (orchestratorInfo != null)
{
orchestratorInfo.IsDeregistered = false;
}
if (this.knownOrchestrators.TryAdd(orchestratorFunction, orchestratorInfo))
{
this.TraceHelper.ExtensionInformationalEvent(
this.Options.HubName,
instanceId: string.Empty,
functionName: orchestratorFunction.Name,
message: $"Registered orchestrator function named {orchestratorFunction}.",
writeToUserLogs: false);
}
else
{
this.knownOrchestrators[orchestratorFunction] = orchestratorInfo;
}
}
4 - trigger
src\WebJobs.Extensions.DurableTask\TriggerAttributes
4.1 - OrchestrationTriggerAttribute
public sealed class OrchestrationTriggerAttribute : Attribute
{
/// <summary>
/// Gets or sets the name of the orchestrator function.
/// </summary>
/// <remarks>
/// If not specified, the function name is used as the name of the orchestration.
/// </remarks>
/// <value>
/// The name of the orchestrator function or <c>null</c> to use the function name.
/// </value>
#pragma warning disable CS0618 // Type or member is obsolete
[AutoResolve]
#pragma warning restore CS0618 // Type or member is obsolete
public string Orchestration { get; set; }
}
Orchestration() method
Gets or sets the name of the orchestrator function.
获取或设置协调器函数的名称。
If not specified, the function name is used as the name of the orchestration.
如果未指定,函数名称将用作协调名称。
Value:
The name of the orchestrator function or null to use the function name.
这个方法被 src\WebJobs.Extensions.DurableTask\Bindings\OrchestrationTriggerAttributeBindingProvider.cs 的 TryCreateAsync() 方法调用:
public Task<ITriggerBinding?> TryCreateAsync(TriggerBindingProviderContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}
ParameterInfo parameter = context.Parameter;
OrchestrationTriggerAttribute? trigger = parameter.GetCustomAttribute<OrchestrationTriggerAttribute>(inherit: false);
if (trigger == null)
{
return Task.FromResult<ITriggerBinding?>(null);
}
// Priority for getting the name is [OrchestrationTrigger], [FunctionName], method name
string name = trigger.Orchestration;
if (string.IsNullOrEmpty(name))
{
MemberInfo method = context.Parameter.Member;
name = method.GetCustomAttribute<FunctionNameAttribute>()?.Name ?? method.Name;
}
var orchestratorName = new FunctionName(name);
if (name.StartsWith("@"))
{
throw new ArgumentException("Orchestration names must not start with @.");
}
this.config.RegisterOrchestrator(orchestratorName, null);
var binding = new OrchestrationTriggerBinding(this.config, parameter, orchestratorName, this.connectionName, this.platormInformation);
return Task.FromResult<ITriggerBinding?>(binding);
}