这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

client run activity

DurableTask client 运行 activity 的源码分析

1 - 调用堆栈

DurableTask worker 运行 activity 的调用堆栈

调用堆栈概况

MyDurableFunction2.dll!Company.Function.HelloOrchestration.SayHello(string name, Microsoft.Azure.Functions.Worker.FunctionContext executionContext) Line 35 (c:/Users/sky/work/code/durabletask-fork2/MyDurableFunction2/HelloOrchestration.cs:35)
MyDurableFunction2.dll!MyDurableFunction2.DirectFunctionExecutor.ExecuteAsync(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 36 (GeneratedFunctionExecutor.g.cs:36)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.Pipeline.FunctionExecutionMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 20 (FunctionExecutionMiddleware.cs:20)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseFunctionExecutionMiddleware.AnonymousMethod__1_2(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 57 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:57)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.OutputBindings.OutputBindingsMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 13 (OutputBindingsMiddleware.cs:13)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseOutputBindingsMiddleware.AnonymousMethod__3(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 84 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:84)
Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.dll!Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 34 (FunctionsHttpProxyingMiddleware.cs:34)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseMiddleware.AnonymousMethod__1(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 105 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:105)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext functionContext, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 30 (c:/Users/sky/work/code/durabletask-fork2/azure-functions-durable-extension/src/Worker.Extensions.DurableTask/DurableTaskFunctionsMiddleware.cs:30)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseMiddleware.AnonymousMethod__1(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 105 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:105)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.FunctionsApplication.InvokeFunctionAsync(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 77 (FunctionsApplication.cs:77)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.Handlers.InvocationHandler.InvokeAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.InvocationRequest request) Line 88 (InvocationHandler.cs:88)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.InvocationRequestHandlerAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.InvocationRequest request) Line 122 (GrpcWorker.cs:122)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.ProcessRequestCoreAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.StreamingMessage request) Line 81 (GrpcWorker.cs:81)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.Microsoft.Azure.Functions.Worker.Grpc.IMessageProcessor.ProcessMessageAsync.AnonymousMethod__0() Line 66 (GrpcWorker.cs:66)
System.Private.CoreLib.dll!System.Threading.Tasks.Task<System.Threading.Tasks.Task>.InnerInvoke() (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(System.Threading.Thread threadPoolThread, System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.Tasks.Task.ExecuteWithThreadLocal(ref System.Threading.Tasks.Task currentTaskSlot, System.Threading.Thread threadPoolThread) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch() (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart() (Unknown Source:0)

azure-function-dotnet-worker

GrpcWorker

        Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
        {
            // Dispatch and return.
            Task.Run(() => ProcessRequestCoreAsync(message));

            return Task.CompletedTask;
        }
        private async Task ProcessRequestCoreAsync(StreamingMessage request)
        {
            StreamingMessage responseMessage = new StreamingMessage
            {
                RequestId = request.RequestId
            };

            switch (request.ContentCase)
            {
                case MsgType.InvocationRequest:
                    // 走到这里
                    responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
                    break;
                ......
        internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
        {
            return _invocationHandler.InvokeAsync(request);
        }

_invocationHandler 的实现是 InvocationHandler

InvocationHandler

        public async Task<InvocationResponse> InvokeAsync(InvocationRequest request)
        {
            using CancellationTokenSource cancellationTokenSource = new();
            FunctionContext? context = null;
            InvocationResponse response = new()
            {
                InvocationId = request.InvocationId,
                Result = new StatusResult()
            };

            if (!_inflightInvocations.TryAdd(request.InvocationId, cancellationTokenSource))
            {
                var exception = new InvalidOperationException("Unable to track CancellationTokenSource");
                response.Result.Status = StatusResult.Types.Status.Failure;
                response.Result.Exception = exception.ToRpcException();

                return response;
            }

            try
            {
                var invocation = new GrpcFunctionInvocation(request);

                IInvocationFeatures invocationFeatures = _invocationFeaturesFactory.Create();
                invocationFeatures.Set<FunctionInvocation>(invocation);
                invocationFeatures.Set<IExecutionRetryFeature>(invocation);

                context = _application.CreateContext(invocationFeatures, cancellationTokenSource.Token);
                invocationFeatures.Set<IFunctionBindingsFeature>(new GrpcFunctionBindingsFeature(context, request, _outputBindingsInfoProvider));

                if (_inputConversionFeatureProvider.TryCreate(typeof(DefaultInputConversionFeature), out var conversion))
                {
                    invocationFeatures.Set<IInputConversionFeature>(conversion!);
                }

                // 走到这里
                await _application.InvokeFunctionAsync(context);

                var serializer = _workerOptions.Serializer!;
              ......
            }

FunctionsApplication

        public async Task InvokeFunctionAsync(FunctionContext context)
        {
            var scope = new FunctionInvocationScope(context.FunctionDefinition.Name, context.InvocationId);

            using var logScope = _logger.BeginScope(scope);
            using Activity? invokeActivity = _functionActivitySourceFactory.StartInvoke(context);

            try
            {
                await _functionExecutionDelegate(context);
            }
            catch (Exception ex)
            {
                invokeActivity?.SetStatus(ActivityStatusCode.Error, ex.Message);

                Log.InvocationError(_logger, context.FunctionDefinition.Name, context.InvocationId, ex);

                throw;
            }
        }

DurableTaskFunctionsMiddleware

    public Task Invoke(FunctionContext functionContext, FunctionExecutionDelegate next)
    {
        if (IsOrchestrationTrigger(functionContext, out BindingMetadata? triggerBinding))
        {
            // call Orchestration 会走到这里
            return RunOrchestrationAsync(functionContext, triggerBinding, next);
        }

        if (IsEntityTrigger(functionContext, out triggerBinding))
        {
            return RunEntityAsync(functionContext, triggerBinding, next);
        }

        // call activity 会走到这里
        return next(functionContext);
    }

这里是 call Orchestration 和 call activity 分叉的地方。

return next(functionContext) 中的 next 是 Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate

这个 next 是在 MiddlewareWorkerApplicationBuilderExtensions 中通过这样的方式注入的:

        public static IFunctionsWorkerApplicationBuilder UseMiddleware<T>(this IFunctionsWorkerApplicationBuilder builder)
            where T : class, IFunctionsWorkerMiddleware
        {
            builder.Services.AddSingleton<T>();

            builder.Use(next =>
            {
                return context =>
                {
                    var middleware = context.InstanceServices.GetRequiredService<T>();

                    return middleware.Invoke(context, next);
                };
            });

            return builder;
        }

FunctionsHttpProxyingMiddleware

        public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
        {
           // Only use the coordinator for HttpTriggers
            if (!_isHttpTrigger.GetOrAdd(context.FunctionId, static (_, c) => IsHttpTriggerFunction(c), context))
            {
              // 代码走到这里
                await next(context);
                return;
            }

            var invocationId = context.InvocationId;

            // this call will block until the ASP.NET middleware pipeline has signaled that it's ready to run the function
            var httpContext = await _coordinator.SetFunctionContextAsync(invocationId, context);

            AddHttpContextToFunctionContext(context, httpContext);

            // Register additional context features
            context.Features.Set<IFromBodyConversionFeature>(FromBodyConverstionFeature.Instance);

            await next(context);

            var invocationResult = context.GetInvocationResult();
          ......
        }

MiddlewareWorkerApplicationBuilderExtensions

            builder.Use(next =>
            {
                return context =>
                {
                    return OutputBindingsMiddleware.Invoke(context, next);
                };
            });

这里有一段内部匿名方法。

FunctionExecutionMiddleware

        public Task Invoke(FunctionContext context)
        {
            return _functionExecutor.ExecuteAsync(context).AsTask();
        }

_functionExecutor 的实现是 GeneratedFunctionExecutor 。

GeneratedFunctionExecutor

GeneratedFunctionExecutor 是生成的代码,内容在 GeneratedFunctionExecutor.g.cs 文件中:

        public async ValueTask ExecuteAsync(FunctionContext context)
        {
            var inputBindingFeature = context.Features.Get<IFunctionInputBindingFeature>();
            var inputBindingResult = await inputBindingFeature.BindFunctionInputAsync(context);
            var inputArguments = inputBindingResult.Values;

            if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.RunOrchestrator", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.RunOrchestrator((global::Microsoft.DurableTask.TaskOrchestrationContext)inputArguments[0]);
            }
            else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.SayHello", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = global::Company.Function.HelloOrchestration.SayHello((string)inputArguments[0], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[1]);
            }
            else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.HttpStart", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.HttpStart((global::Microsoft.Azure.Functions.Worker.Http.HttpRequestData)inputArguments[0], (global::Microsoft.DurableTask.Client.DurableTaskClient)inputArguments[1], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[2]);
            }
        }

这里就会进入 SayHello 方法。

FunctionContext, 实际实现是

Microsoft.Azure.Functions.Worker.DefaultFunctionContext

用户代码

activity 实现

    [Function(nameof(SayHello))]
    public static string SayHello([ActivityTrigger] string name, FunctionContext executionContext)
    {
      ILogger logger = executionContext.GetLogger("SayHello");
      logger.LogInformation("Saying hello to {name}.", name);
      return $"Hello {name}!";
    }