1 - 核心编程模型

Durable Task 后端持久性存储的核心编程模型

1.1 - Orchestration

核心编程模型的 Orchestration 定义

1.1.1 - OrchestrationInstance

核心编程模型之 OrchestrationInstance

src\DurableTask.Core\OrchestrationInstance.cs

OrchestrationInstance 中定义了几个属性:

  • InstanceId
  • ExecutionId()
    [DataContract]
    public class OrchestrationInstance : IExtensibleDataObject
    {
        /// <summary>
        /// The instance id, assigned as unique to the orchestration
        /// </summary>
        [DataMember]
        public string InstanceId { get; set; }

        /// <summary>
        /// The execution id, unique to the execution of this instance
        /// </summary>
        [DataMember]
        public string ExecutionId { get; set; }

为了支持 versioning,准备修改 OrchestrationInstance,增加一个 InstanceVersion 字段,类型为 string

        /// <summary>
        /// The version of this orchestration instance
        /// </summary>
        [DataMember]
        public string InstanceVersion { get; set; }

1.1.2 - OrchestrationContext

核心编程模型之 OrchestrationContext

src\DurableTask.Core\OrchestrationContext.cs

Context for an orchestration containing the instance, replay status, orchestration methods and proxy methods

包含实例、重放状态、协调方法和代理方法的协调上下文

类定义

OrchestrationContext 是一个 抽象类:

    public abstract class OrchestrationContext
    {
    }

实现类有 TaskOrchestrationContext。

OrchestrationContext 定义有以下 (public virtual) 方法:

  • CreateClient()
  • CreateClientV2()
  • CreateRetryableClient()
  • ScheduleWithRetry()
  • ScheduleTask()
  • CreateTimer()
  • CreateSubOrchestrationInstance()
  • CreateSubOrchestrationInstanceWithRetry()
  • SendEvent()
  • ContinueAsNew()

ScheduleTask()

        /// <param name="version">Name of the orchestration as specified by the ObjectCreator</param>
public abstract Task<TResult> ScheduleTask<TResult>(string name, string version, params object[] parameters);

这里是有 version 概念的。

ScheduleWithRetry()

        public virtual Task<T> ScheduleWithRetry<T>(Type taskActivityType, RetryOptions retryOptions,
            params object[] parameters)
        {
            return ScheduleWithRetry<T>(NameVersionHelper.GetDefaultName(taskActivityType),
                NameVersionHelper.GetDefaultVersion(taskActivityType),
                retryOptions, parameters);
        }

NameVersionHelper.GetDefaultVersion(taskActivityType) 目前没有实现,只是写死了返回空字符串:

        public static string GetDefaultVersion(object obj)
        {
            return string.Empty;
        }

之后调用带 version 参数的重载方法:

        public virtual Task<T> ScheduleWithRetry<T>(string name, string version, RetryOptions retryOptions,
            params object[] parameters)
        {
            Task<T> RetryCall() => ScheduleTask<T>(name, version, parameters);
            var retryInterceptor = new RetryInterceptor<T>(this, retryOptions, RetryCall);
            return retryInterceptor.Invoke();
        }

还是调用到 ScheduleTask() 方法上了。

CreateTimer()

        public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state);

        public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken);

CreateSubOrchestrationInstance()

        public virtual Task<T> CreateSubOrchestrationInstance<T>(Type orchestrationType, object input)
        {
            return CreateSubOrchestrationInstance<T>(NameVersionHelper.GetDefaultName(orchestrationType),
                NameVersionHelper.GetDefaultVersion(orchestrationType), input);
        }

        public virtual Task<T> CreateSubOrchestrationInstance<T>(Type orchestrationType, string instanceId, object input)
        {
            return CreateSubOrchestrationInstance<T>(NameVersionHelper.GetDefaultName(orchestrationType),
                NameVersionHelper.GetDefaultVersion(orchestrationType), instanceId, input);
        }

        public abstract Task<T> CreateSubOrchestrationInstance<T>(string name, string version, object input);

这个方法也是定义有 version 参数的,只是依然是没有被使用。

SendEvent()

         public abstract void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData);

在 OrchestrationInstance 增加了 version 参数之后,这个方法也有 version 的概念了。

ContinueAsNew()

public abstract void ContinueAsNew(object input);

没有 version 的概念,最好在实现中重用当前 instance 的 version(如果有指定)。

CreateClient

Create a proxy client class to schedule remote TaskActivities via a strongly typed interface.

创建一个代理客户端类,通过强类型接口调度远程 TaskActivities。

        public virtual T CreateClient<T>() where T : class
        {
            return CreateClient<T>(false);
        }

        public virtual T CreateClient<T>(bool useFullyQualifiedMethodNames) where T : class
        {
            return CreateClient<T>(() => new ScheduleProxy(this, useFullyQualifiedMethodNames));
        }

				private static T CreateClient<T>(Func<IInterceptor> createScheduleProxy) where T : class
        {
            if (!typeof(T).IsInterface && !typeof(T).IsClass)
            {
                throw new InvalidOperationException($"{nameof(T)} must be an interface or class.");
            }

            IInterceptor scheduleProxy = createScheduleProxy();

            if (typeof(T).IsClass)
            {
                if (typeof(T).IsSealed)
                {
                    throw new InvalidOperationException("Class cannot be sealed.");
                }

                return ProxyGenerator.CreateClassProxy<T>(scheduleProxy);
            }

            return ProxyGenerator.CreateInterfaceProxyWithoutTarget<T>(scheduleProxy);
        }

这里没有 version 的概念。

1.1.3 - OrchestrationState

核心编程模型之 OrchestrationState

src\DurableTask.Core\OrchestrationState.cs

OrchestrationState 中定义了几个属性:

  • CompletedTime
  • CompressedSize
  • CreatedTime
  • Input
  • LastUpdatedTime
  • Name
  • OrchestrationInstance: 包含 InstanceId 和 ExecutionId
  • Output
  • ParentInstance
  • Size
  • Status
  • Tags
  • Version: string  格式,看能否复用。
  • Generation
  • ScheduledStartTime
  • FailureDetails

1.2 - Activity

核心编程模型的 Activity 定义

1.2.1 - TaskActivity

核心编程模型之 TaskActivity

src\DurableTask.Core\TaskActivity.cs

TaskActivity 中定义了三个方法:

  • Run()
  • RunAsync()

Run() 方法

public abstract string Run(TaskContext context, string input);

blocked for AsyncTaskActivity:

        /// <summary>
        /// Synchronous execute method, blocked for AsyncTaskActivity
        /// </summary>
        /// <returns>string.Empty</returns>
        public override string Run(TaskContext context, string input)
        {
            // will never run
            return string.Empty;
        }

RunAsync() 方法

        public virtual Task<string> RunAsync(TaskContext context, string input)
        {
            return Task.FromResult(Run(context, input));
        }

会被覆盖为:

public override async Task<string> RunAsync(TaskContext context, string input)
        {
            TInput parameter = default(TInput);

            var jArray = Utils.ConvertToJArray(input);

            int parameterCount = jArray.Count;
            if (parameterCount > 1)
            {
                throw new TaskFailureException(
                    "TaskActivity implementation cannot be invoked due to more than expected input parameters.  Signature mismatch.");
            }
            
            if (parameterCount == 1)
            {
                JToken jToken = jArray[0];
                if (jToken is JValue jValue)
                {
                    parameter = jValue.ToObject<TInput>();
                }
                else
                {
                    string serializedValue = jToken.ToString();
                    parameter = DataConverter.Deserialize<TInput>(serializedValue);
                }
            }

            TResult result;
            try
            {
                result = await ExecuteAsync(context, parameter);
            }
            catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
            {
                string details = null;
                FailureDetails failureDetails = null;
                if (context != null && context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
                {
                    details = Utils.SerializeCause(e, DataConverter);
                }
                else
                {
                    failureDetails = new FailureDetails(e);
                }

                throw new TaskFailureException(e.Message, e, details)
                    .WithFailureDetails(failureDetails);
            }

            string serializedResult = DataConverter.Serialize(result);
            return serializedResult;
        }
    }

ExecuteAsync() 是一个abstract 方法:

        protected abstract Task<TResult> ExecuteAsync(TaskContext context, TInput input);

GetStatus() 方法

public abstract string GetStatus();

1.2.2 - TaskContext

核心编程模型之 TaskContext

src\DurableTask.Core\TaskContext.cs

TaskActivity 中定义了以下属性

  • OrchestrationInstance: 包含 InstanceId 和 InstanceId
  • ErrorPropagationMode

1.3 - OrchestrationExecution

核心编程模型的 OrchestrationExecution 定义

1.3.1 - OrchestrationExecutionContext

OrchestrationExecutionContext

src\DurableTask.Core\OrchestrationExecutionContext.cs

Context associated with the orchestration being executed.

与正在执行的协调相关的上下文。

    [DataContract]
    public class OrchestrationExecutionContext
    {
    }

OrchestrationTags()

这个类就定义了一个 OrchestrationTags 方法:


        /// <summary>
        /// Gets the orchestration tags
        /// </summary>
        [DataMember]
        public IDictionary<string, string> OrchestrationTags { get; internal set; }

1.3.2 - OrchestrationExecutionCursor

OrchestrationExecutionCursor

src\DurableTask.Core\OrchestrationExecutionCursor.cs

Context associated with the orchestration being executed.

与正在执行的协调相关的上下文。

类定义:

    internal class OrchestrationExecutionCursor
    {
    }

构造函数:

        public OrchestrationExecutionCursor(
            OrchestrationRuntimeState state,
            TaskOrchestration orchestration,
            TaskOrchestrationExecutor executor,
            IEnumerable<OrchestratorAction> latestDecisions)
        {
            RuntimeState = state;
            TaskOrchestration = orchestration;
            OrchestrationExecutor = executor;
            LatestDecisions = latestDecisions;
        }

get/set 方法:

        public OrchestrationRuntimeState RuntimeState { get; }

        public TaskOrchestration TaskOrchestration { get; }

        public TaskOrchestrationExecutor OrchestrationExecutor { get; }

        public IEnumerable<OrchestratorAction> LatestDecisions { get; set; }

就是一个值对象。

Cursor 游标体现在哪里?

1.4 - TaskOrchestration

核心编程模型的 worker 端的 TaskOrchestration 定义

1.4.1 - TaskOrchestration

核心编程模型之 TaskOrchestration

src\DurableTask.Core\TaskOrchestration.cs

TaskOrchestration.cs 中定义了三个方法:

  • Execute()
  • RaiseEvent()
  • GetStatus()

Execute() 方法

public abstract Task<string> Execute(OrchestrationContext context, string input);

方法实现为:

public override async Task<string> Execute(OrchestrationContext context, string input)
        {
            var parameter = DataConverter.Deserialize<TInput>(input);
            TResult result;
            try
            {
                result = await RunTask(context, parameter);
            }
            catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
            {
                string details = null;
                FailureDetails failureDetails = null;
                if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
                {
                    details = Utils.SerializeCause(e, DataConverter);
                }
                else
                {
                    failureDetails = new FailureDetails(e);
                }

                throw new OrchestrationFailureException(e.Message, details)
                {
                    FailureDetails = failureDetails,
                };
            }

            return DataConverter.Serialize(result);
        }

RunTask() 方法是个抽象方法。

public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);

默认的 DataConverter 是 json:

    public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
    {
        /// <summary>
        /// Creates a new TaskOrchestration with the default DataConverter
        /// </summary>
        protected TaskOrchestration()
        {
            DataConverter = JsonDataConverter.Default;
        }

        /// <summary>
        /// The DataConverter to use for input and output serialization/deserialization
        /// </summary>
        public DataConverter DataConverter { get; protected set; }

RaiseEvent() 方法

public abstract void RaiseEvent(OrchestrationContext context, string name, string input);

方法实现为:

public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
  var parameter = DataConverter.Deserialize<TEvent>(input);
  OnEvent(context, name, parameter);
}

OnEvent() 是一个空实现。

        public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
        {
            // do nothing
        }

GetStatus() 方法

public abstract string GetStatus();

实现

在这个项目中除了 sample 和 test 外没有实现类。

samples\Correlation.Samples\HelloOrchestrator.cs 中有一个最简单的实现:

[KnownType(typeof(Hello))]
    internal class HelloOrchestrator : TaskOrchestration<string, string>
    {
        public override async Task<string> RunTask(OrchestrationContext context, string input)
        {
          //  await contextBase.ScheduleTask<string>(typeof(Hello), "world");
          //   if you pass an empty string it throws an error
            return await context.ScheduleTask<string>(typeof(Hello), "world");
        }
    }

    internal class Hello : TaskActivity<string, string>
    {
        protected override string Execute(TaskContext context, string input)
        {
            if (string.IsNullOrEmpty(input))
            {
                throw new ArgumentNullException(nameof(input));
            }

            Console.WriteLine($"Activity: Hello {input}");
            return $"Hello, {input}!";
        }
    }

备注:这个实现和 Dapr workflow java sdk 中的定义最贴近,也最适合用来增加一个 getVersion() 方法来获取当前 worker 的版本,但是为什么 quickstart 中用的是静态方法?

TBD:请教一下 Chris。

1.4.2 - TaskOrchestrationContext

TaskOrchestrationContext

src\DurableTask.Core\TaskOrchestrationContext.cs

类定义

TaskOrchestrationContext 继承自 OrchestrationContext:

    internal class TaskOrchestrationContext : OrchestrationContext
    {
    }

Execute() 方法

public abstract Task<string> Execute(OrchestrationContext context, string input);

方法实现为:

public override async Task<string> Execute(OrchestrationContext context, string input)
        {
            var parameter = DataConverter.Deserialize<TInput>(input);
            TResult result;
            try
            {
                result = await RunTask(context, parameter);
            }
            catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
            {
                string details = null;
                FailureDetails failureDetails = null;
                if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
                {
                    details = Utils.SerializeCause(e, DataConverter);
                }
                else
                {
                    failureDetails = new FailureDetails(e);
                }

                throw new OrchestrationFailureException(e.Message, details)
                {
                    FailureDetails = failureDetails,
                };
            }

            return DataConverter.Serialize(result);
        }

RunTask() 方法是个抽象方法。

public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);

默认的 DataConverter 是 json:

    public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
    {
        /// <summary>
        /// Creates a new TaskOrchestration with the default DataConverter
        /// </summary>
        protected TaskOrchestration()
        {
            DataConverter = JsonDataConverter.Default;
        }

        /// <summary>
        /// The DataConverter to use for input and output serialization/deserialization
        /// </summary>
        public DataConverter DataConverter { get; protected set; }

RaiseEvent() 方法

public abstract void RaiseEvent(OrchestrationContext context, string name, string input);

方法实现为:

public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
  var parameter = DataConverter.Deserialize<TEvent>(input);
  OnEvent(context, name, parameter);
}

OnEvent() 是一个空实现。

        public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
        {
            // do nothing
        }

GetStatus() 方法

public abstract string GetStatus();

实现

在这个项目中除了 sample 和 test 外没有实现类。

samples\Correlation.Samples\HelloOrchestrator.cs 中有一个最简单的实现:

[KnownType(typeof(Hello))]
    internal class HelloOrchestrator : TaskOrchestration<string, string>
    {
        public override async Task<string> RunTask(OrchestrationContext context, string input)
        {
          //  await contextBase.ScheduleTask<string>(typeof(Hello), "world");
          //   if you pass an empty string it throws an error
            return await context.ScheduleTask<string>(typeof(Hello), "world");
        }
    }

    internal class Hello : TaskActivity<string, string>
    {
        protected override string Execute(TaskContext context, string input)
        {
            if (string.IsNullOrEmpty(input))
            {
                throw new ArgumentNullException(nameof(input));
            }

            Console.WriteLine($"Activity: Hello {input}");
            return $"Hello, {input}!";
        }
    }

备注:这个实现和 Dapr workflow java sdk 中的定义最贴近,也最适合用来增加一个 getVersion() 方法来获取当前 worker 的版本,但是为什么 quickstart 中用的是静态方法?

TBD:请教一下 Chris。

1.5 - OrchestrationState

核心编程模型的 OrchestrationState 定义

1.5.1 - OrchestrationState

核心编程模型的 OrchestrationState 定义

src\DurableTask.Core\OrchestrationState.cs

Represents the state of an orchestration

类定义

OrchestrationState 是一个 DataContract :

    [DataContract]
    public class OrchestrationState : IExtensibleDataObject
    {
    }

定义有如下 DataMember:

name 类型 描述
Name string 协调的名称
Input string 协调的序列化输入
Output string 协调的序列化输出
OrchestrationInstance OrchestrationInstance 该状态代表的协调实例
OrchestrationStatus OrchestrationStatus 当前协调状态
Status string 字符串格式的当前协调状态
ParentInstance ParentInstance 父实例,如果这个协调有
Version string 协调版本
Tags IDictionary<string, string> 与该协调相关的标记和字符串值字典
Generation int 协调的代。重复使用的 instanceIds 将递增该值。
CreatedTime DateTime 协调的创建时间
ScheduledStartTime DateTime 开始协调的时间
CompletedTime DateTime 协调完成时间
LastUpdatedTime DateTime 协调的最后更新时间
Size long 原始(未压缩)序列化运行时状态的大小
CompressedSize long 压缩序列化运行时状态的大小
FailureDetails FailureDetails 获取或设置与协调相关的故障详细信息。

OrchestrationState有 Version 字段定义,另外 OrchestrationState 的 OrchestrationInstance 字段也带有 version。

1.5.2 - OrchestrationStateQuery

OrchestrationStateQuery

src\DurableTask.Core\OrchestrationStateQuery.cs

Query class that can be used to filter results from the Orchestration instance store.

可用于从协调实例存储中过滤结果的查询类。

注意: 实例方法不是线程安全的。

类定义

 public class OrchestrationStateQuery    {
    }

构造函数

        public OrchestrationStateQuery()
        {
            FilterMap = new Dictionary<Type, OrchestrationStateQueryFilter>();
        }

FilterMap()

        public IDictionary<Type, OrchestrationStateQueryFilter> FilterMap { get; private set; }

GetFilters()

获取查询的 primary_filter、collection_of(secondary_filters)

public Tuple<OrchestrationStateQueryFilter, IEnumerable<OrchestrationStateQueryFilter>> GetFilters()
        {
            ICollection<OrchestrationStateQueryFilter> filters = FilterMap.Values;
            if (filters.Count == 0)
            {
                return null;
            }

            var secondaryFilters = new List<OrchestrationStateQueryFilter>();

            OrchestrationStateQueryFilter primaryFilter = filters.First();
            int primaryFilterPrecedence = SafeGetFilterPrecedence(primaryFilter);

            if (filters.Count > 1)
            {
                foreach (OrchestrationStateQueryFilter filter in filters)
                {
                    int newPrecedence = SafeGetFilterPrecedence(filter);
                    if (newPrecedence > primaryFilterPrecedence)
                    {
                        secondaryFilters.Add(primaryFilter);

                        primaryFilter = filter;
                        primaryFilterPrecedence = newPrecedence;
                    }
                    else
                    {
                        secondaryFilters.Add(filter);
                    }
                }
            }

            return new Tuple<OrchestrationStateQueryFilter, IEnumerable<OrchestrationStateQueryFilter>>(
                primaryFilter, secondaryFilters);
        }

1.5.3 - OrchestrationStateQueryFilter

OrchestrationStateQueryFilter

src\DurableTask.Core\OrchestrationStateQueryFilter.cs

Abstract class for an orchestration state query filter

协调状态查询过滤器的抽象类

类定义

OrchestrationStateInstanceFilter 实现了 OrchestrationStateQueryFilter

    public abstract class OrchestrationStateQueryFilter
    {
    }

这是一个空的抽象类。

它的实现有:

  • OrchestrationStateInstanceFilter
  • OrchestrationStateNameVersionFilter: 这个可以用来做版本过滤

1.5.4 - OrchestrationStateInstanceFilter

OrchestrationStateInstanceFilter

src\DurableTask.Core\OrchestrationStateInstanceFilter.cs

Filter for Orchestration instance filter

协调实例过滤器

类定义

OrchestrationStateInstanceFilter 实现了 OrchestrationStateQueryFilter

    public class OrchestrationStateInstanceFilter : OrchestrationStateQueryFilter
    {

构造函数

        // 使用默认设置创建 OrchestrationStateInstanceFilter 的新实例
				public OrchestrationStateInstanceFilter()
        {
            // default is exact match
            StartsWith = false;
        }

只定义有两个方法用来存取 InstanceId / ExecutionId 作为过滤器的匹配方式,另外 StartsWith 设置筛选器的匹配类型:

				// 获取或设置过滤器的 InstanceId
        public string InstanceId { get; set; }

				// 获取或设置过滤器的 ExecutionId
        public string ExecutionId { get; set; }

				// 获取或设置筛选器的匹配类型:以开始或精确匹配开始
        public bool StartsWith { get; set; }

1.5.5 - OrchestrationStateNameVersionFilter

OrchestrationStateNameVersionFilter

src\DurableTask.Core\OrchestrationStateInstanceFilter.cs

Filter for Orchestration Name and Version

协调名称和版本过滤器

类定义

OrchestrationStateNameVersionFilter 实现了 OrchestrationStateQueryFilter

    public class OrchestrationStateNameVersionFilter : OrchestrationStateQueryFilter
    {
    }

构造函数

        // 使用默认设置创建 OrchestrationStateInstanceFilter 的新实例
				public OrchestrationStateInstanceFilter()
        {
            // default is exact match
            StartsWith = false;
        }

只定义有两个方法用来存取 InstanceId / ExecutionId 作为过滤器的匹配方式,另外 StartsWith 设置筛选器的匹配类型:


        public string Name { get; set; }

        public string Version { get; set; }

1.6 - Entity

核心编程模型的 Entity 定义

1.6.1 - TaskEntity

核心编程模型之 TaskEntity

Abstract base class for entities

src\DurableTask.Core\Entities\TaskEntity.cs

TaskActivity 中定义了三个方法:

  • ExecuteOperationBatchAsync()

ExecuteOperationBatchAsync() 方法

public abstract Task<EntityBatchResult> ExecuteOperationBatchAsync(EntityBatchRequest operations);

EnztityBatchRequest 类

A request for execution of a batch of operations on an entity.

  • string InstanceId
  • string EntityState
  • List<OperationRequest> Operations

OperationRequest 类

包含属性:

  • string Operation
  • Guid Id
  • string Input

1.6.2 - EntityId

核心编程模型之 EntityId

A unique identifier for an entity, consisting of entity name and entity key.

src\DurableTask.Core\Entities\EntityId.cs

EntityId 中定义以下属性:

  • string Name
  • string Key

1.7 - History

核心编程模型的 History event 定义

1.7.1 - History概述

核心编程模型的 History event 概述

介绍

以下介绍来自 README.md

Durable Task Framework History Events

以下是构成协调状态的一些常见历史事件。您可以在 DTFx 的 Azure StorageMSSQL 存储后端的历史记录表中轻松查看这些事件。在使用 DTFx 代码、调试问题或创建直接读取历史记录的诊断工具(如 Durable Functions Monitor 项目)时,了解这些事件非常有用。

Event Type Description
OrchestratorStarted 协调器函数正在开始新的_执行/execution_。您将在历史记录中看到许多此类事件–每次协调器从 “等待 “状态恢复时都会出现一个。请注意,这并不意味着协调器首次启动–首次执行由 “ExecutionStarted “历史事件表示(见下文)。该事件的 timestamp 时间戳用于填充 CurrentDateTimeUtc 属性。
ExecutionStarted 协调已开始首次执行。该事件包含协调器名称、输入内容和协调器的_scheduled_时间(可能早于历史记录中前面的 OrchestratorStarted事件)。这总是协调历史中的第二个事件。
TaskScheduled 协调器调度了一项活动任务。该事件包括活动名称、输入和一个连续的 “EventId”,可用于将 “TaskScheduled " 事件与相应的 “TaskCompleted “或 “TaskFailed “事件关联起来。请注意,如果一个活动任务被重试,可能会生成多个 Task*** 事件。
TaskCompleted 调度的任务活动已成功完成。TaskScheduledId 字段将与相应 TaskScheduled 事件的 “EventId” 字段匹配。
TaskFailed 计划的任务活动以失败告终。TaskScheduledId 字段将与相应 “TaskScheduled” 事件的 “EventId” 字段匹配。
SubOrchestrationInstanceCreated 协调器已调度子协调器。该事件包含已调度协调器的名称、实例 ID、输入和有序事件 ID,可用于将 SubOrchestrationInstanceCreated 事件与后续的 SubOrchestrationInstanceCompleted 或 SubOrchestrationInstanceFailed 历史事件关联起来。时间戳指的是调度子协调器的时间,它将早于开始执行的时间。请注意,如果一个活动任务被重试,可能会产生多个 SubOrchestrationInstance*** 事件。
SubOrchestrationInstanceCompleted 调度的子协调器已成功完成。TaskScheduledId “字段将与相应 “SubOrchestrationInstanceCreated “事件的 “EventId “字段匹配。
SubOrchestrationInstanceFailed 计划的子协调器已完成,但出现故障。TaskScheduledId 字段将与相应 SubOrchestrationInstanceCreated 事件的 EventId 字段匹配。
TimerCreated 协调器安排了一个持久定时器。FireAt “属性包含定时器启动的日期。
TimerFired 先前安排的持久定时器已启动。TimerId 字段将与相应 TimeCreated 事件的 EventId 字段匹配。
EventRaised 协调(或持久实体中的实体)收到外部事件。该记录包含事件名称、有效载荷和事件_发送_的时间戳(应与历史事件实际被持久化的时间相同或更早)。
EventSent 协调(或entity)向另一个协调(或entity)发送了单向消息。
ExecutionCompleted 协调已完成。该事件包括协调的输出,不区分成功或失败。
ExecutionTerminated 协调被 API 调用强制终止。该事件的时间戳表示计划终止的时间,而不一定是实际终止的时间。
OrchestratorCompleted 协调器函数已等待并提交了任何副作用。您将在历史记录中看到许多此类事件–协调器每次等待时都会出现一个。请注意,这并不意味着协调器已经完成(完成由 ExecutionCompletedExecutionTerminated 表示)。
GenericEvent 通用历史事件,有一个 Data 字段,但没有特定含义。这种历史事件并不常用。在某些情况下,该事件用于触发空闲协调的全新重放,例如在协调重绕之后。
HistoryStateEvent 包含协调历史快照的历史事件。大多数现代后端类型都不使用这种事件类型。

1.7.2 - HistoryEvent事件

核心编程模型的 HistoryEvent 事件

包含属性:

  • int EventId
  • EventType EventType
  • bool IsPlayed
  • DateTime Timestamp
  • ExtensionDataObject ExtensionData

这个类也是其他 event 的父类。

1.7.3 - ExecutionStartedEvent事件

核心编程模型的 ExecutionStartedEvent 事件

包含属性:

  • string EventId
  • string Input
  • EventType EventType
  • ParentInstance ParentInstance
  • string Name
  • string Version:可以复用
  • IDictionary<string, string> Tags
  • string Correlation
  • DistributedTraceContext ParentTraceContext
  • DateTime ScheduledStartTime
  • int Generation

1.7.4 - OrchestratorStartedEvent事件

核心编程模型的 OrchestratorStartedEvent 事件

包含属性:

  • string EventId
  • EventType EventType

2 - DurableTask Dispatcher源码学习

学习durabletask Dispatcher的源码

2.1 - TaskOrchestrationDispatcher源码学习

学习durabletask TaskOrchestrationDispatcher的源码

Dispatcher for orchestrations to handle processing and renewing, completion of orchestration events

协调的调度程序,用于处理、更新和完成协调事件

src\DurableTask.Core\TaskOrchestrationDispatcher.cs

类定义

没任何借口或者基类:

    public class TaskOrchestrationDispatcher{}

类字段

        static readonly Task CompletedTask = Task.FromResult(0);

        readonly INameVersionObjectManager<TaskOrchestration> objectManager;
        readonly IOrchestrationService orchestrationService;
        readonly WorkItemDispatcher<TaskOrchestrationWorkItem> dispatcher;
        readonly DispatchMiddlewarePipeline dispatchPipeline;
        readonly LogHelper logHelper;
        ErrorPropagationMode errorPropagationMode;
        readonly NonBlockingCountdownLock concurrentSessionLock;
        readonly IEntityOrchestrationService? entityOrchestrationService;
        readonly EntityBackendProperties? entityBackendProperties;
        readonly TaskOrchestrationEntityParameters? entityParameters;

主要的功能在 objectManager , orchestrationService , dispatcher 和 DispatchMiddlewarePipeline。

构造函数

internal TaskOrchestrationDispatcher(
            IOrchestrationService orchestrationService,
            INameVersionObjectManager<TaskOrchestration> objectManager,
            DispatchMiddlewarePipeline dispatchPipeline,
            LogHelper logHelper,
            ErrorPropagationMode errorPropagationMode)
        {
            // 这几个字段都是传递进来的
            this.objectManager = objectManager ?? throw new ArgumentNullException(nameof(objectManager));
            this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService));
            this.dispatchPipeline = dispatchPipeline ?? throw new ArgumentNullException(nameof(dispatchPipeline));
            this.logHelper = logHelper ?? throw new ArgumentNullException(nameof(logHelper));
            this.errorPropagationMode = errorPropagationMode;
  
            // orchestrationService 转为 IEntityOrchestrationService
            this.entityOrchestrationService = orchestrationService as IEntityOrchestrationService;
            this.entityBackendProperties = this.entityOrchestrationService?.EntityBackendProperties;
            this.entityParameters = TaskOrchestrationEntityParameters.FromEntityBackendProperties(this.entityBackendProperties);

  					// 初始化 dispatcher
            this.dispatcher = new WorkItemDispatcher<TaskOrchestrationWorkItem>(
                "TaskOrchestrationDispatcher",
                item => item == null ? string.Empty : item.InstanceId,
                this.OnFetchWorkItemAsync,
                this.OnProcessWorkItemSessionAsync)
            {
                // 初始化 dispatcher 的字段
                GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
                GetDelayInSecondsAfterOnProcessException = orchestrationService.GetDelayInSecondsAfterOnProcessException,
                SafeReleaseWorkItem = orchestrationService.ReleaseTaskOrchestrationWorkItemAsync,
                AbortWorkItem = orchestrationService.AbandonTaskOrchestrationWorkItemAsync,
                DispatcherCount = orchestrationService.TaskOrchestrationDispatcherCount,
                MaxConcurrentWorkItems = orchestrationService.MaxConcurrentTaskOrchestrationWorkItems,
                LogHelper = logHelper,
            };

            // To avoid starvation, we only allow half of all concurrently execution orchestrations to
            // leverage extended sessions.
            var maxConcurrentSessions = (int)Math.Ceiling(this.dispatcher.MaxConcurrentWorkItems / 2.0);
            this.concurrentSessionLock = new NonBlockingCountdownLock(maxConcurrentSessions);
        }

StartAsync() 方法

Starts the dispatcher to start getting and processing orchestration events

启动调度程序,开始获取和处理协调事件

        public async Task StartAsync()
        {
            await this.dispatcher.StartAsync();
        }

OnFetchWorkItemAsync() 方法

Method to get the next work item to process within supplied timeout

在提供的超时时间内获取下一个要处理的工作项的方法

        protected Task<TaskOrchestrationWorkItem> OnFetchWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
        {
            if (this.entityBackendProperties?.UseSeparateQueueForEntityWorkItems == true)
            {
                // only orchestrations should be served by this dispatcher, so we call
                // the method which returns work items for orchestrations only.
                // 该调度器只应为协调提供服务,因此我们调用这个方法,该方法只返回协调的工作项。
                Console.WriteLine("OnFetchWorkItemAsync: UseSeparateQueueForEntityWorkItems == true");
                Console.WriteLine("OnFetchWorkItemAsync: this.entityOrchestrationService=" + this.entityOrchestrationService?.GetType().FullName);
                return this.entityOrchestrationService!.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
            }
            else
            {
                // both entities and orchestrations are served by this dispatcher,
                // so we call the method that may return work items for either.
                // 实体和编排都由该调度器提供服务,因此我们调用的方法可能会返回两者的工作项。
                return this.orchestrationService.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
            }
        }

增加日志打印,看到:

OnFetchWorkItemAsync: UseSeparateQueueForEntityWorkItems == true
OnFetchWorkItemAsync: this.entityOrchestrationService=Microsoft.Azure.WebJobs.Extensions.DurableTask.AzureStorageDurabilityProvider

OnProcessWorkItemSessionAsync() 方法

async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
        {
            // DTFx history replay expects that ExecutionStarted comes before other events.
            // If this is not already the case, due to a race-condition, we re-order the
            // messages to enforce this expectation.
            EnsureExecutionStartedIsFirst(workItem.NewMessages);

            try
            {
                if (workItem.Session == null)
                {
                    // Legacy behavior
                    await this.OnProcessWorkItemAsync(workItem);
                    return;
                }

                var isExtendedSession = false;

                CorrelationTraceClient.Propagate(
                    () =>
                    {                
                        // Check if it is extended session.
                        // TODO: Remove this code - it looks incorrect and dangerous
                        isExtendedSession = this.concurrentSessionLock.Acquire();
                        this.concurrentSessionLock.Release();
                        workItem.IsExtendedSession = isExtendedSession;
                    });

                var processCount = 0;
                try
                {
                    while (true)
                    {
                        // If the provider provided work items, execute them.
                        if (workItem.NewMessages?.Count > 0)
                        {
                            bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem);
                            if (isCompletedOrInterrupted)
                            {
                                break;
                            }

                            processCount++;
                        }

                        // Fetches beyond the first require getting an extended session lock, used to prevent starvation.
                        if (processCount > 0 && !isExtendedSession)
                        {
                            isExtendedSession = this.concurrentSessionLock.Acquire();
                            if (!isExtendedSession)
                            {
                                TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-MaxOperations", "Failed to acquire concurrent session lock.");
                                break;
                            }
                        }

                        TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-StartFetch", "Starting fetch of existing session.");
                        Stopwatch timer = Stopwatch.StartNew();

                        // Wait for new messages to arrive for the session. This call is expected to block (asynchronously)
                        // until either new messages are available or until a provider-specific timeout has expired.
                        workItem.NewMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
                        if (workItem.NewMessages == null)
                        {
                            break;
                        }

                        TraceHelper.Trace(
                            TraceEventType.Verbose,
                            "OnProcessWorkItemSession-EndFetch",
                            $"Fetched {workItem.NewMessages.Count} new message(s) after {timer.ElapsedMilliseconds} ms from existing session.");
                        workItem.OrchestrationRuntimeState.NewEvents.Clear();
                    }
                }
                finally
                {
                    if (isExtendedSession)
                    {
                        TraceHelper.Trace(
                            TraceEventType.Verbose,
                            "OnProcessWorkItemSession-Release",
                            $"Releasing extended session after {processCount} batch(es).");
                        this.concurrentSessionLock.Release();
                    }
                }
            }
            catch (SessionAbortedException e)
            {
                // Either the orchestration or the orchestration service explicitly abandoned the session.
                OrchestrationInstance instance = workItem.OrchestrationRuntimeState?.OrchestrationInstance ?? new OrchestrationInstance { InstanceId = workItem.InstanceId };
                this.logHelper.OrchestrationAborted(instance, e.Message);
                TraceHelper.TraceInstance(TraceEventType.Warning, "TaskOrchestrationDispatcher-ExecutionAborted", instance, "{0}", e.Message);
                await this.orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
            }
        }

OnProcessWorkItemAsync()

protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem)
        {
            var messagesToSend = new List<TaskMessage>();
            var timerMessages = new List<TaskMessage>();
            var orchestratorMessages = new List<TaskMessage>();
            var isCompleted = false;
            var continuedAsNew = false;
            var isInterrupted = false;
            
            // correlation
            CorrelationTraceClient.Propagate(() => CorrelationTraceContext.Current = workItem.TraceContext);

            ExecutionStartedEvent? continueAsNewExecutionStarted = null;
            TaskMessage? continuedAsNewMessage = null;
            IList<HistoryEvent>? carryOverEvents = null;
            string? carryOverStatus = null;

            workItem.OrchestrationRuntimeState.LogHelper = this.logHelper;
            OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;

            runtimeState.AddEvent(new OrchestratorStartedEvent(-1));

            OrchestrationRuntimeState originalOrchestrationRuntimeState = runtimeState;

            // Distributed tracing support: each orchestration execution is a trace activity
            // that derives from an established parent trace context. It is expected that some
            // listener will receive these events and publish them to a distributed trace logger.
            ExecutionStartedEvent startEvent =
                runtimeState.ExecutionStartedEvent ??
                workItem.NewMessages.Select(msg => msg.Event).OfType<ExecutionStartedEvent>().FirstOrDefault();
            Activity? traceActivity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent);

            OrchestrationState? instanceState = null;

            Task? renewTask = null;
            using var renewCancellationTokenSource = new CancellationTokenSource();
            if (workItem.LockedUntilUtc < DateTime.MaxValue)
            {
                // start a task to run RenewUntil
                renewTask = Task.Factory.StartNew(
                    () => RenewUntil(workItem, this.orchestrationService, this.logHelper, nameof(TaskOrchestrationDispatcher), renewCancellationTokenSource.Token),
                    renewCancellationTokenSource.Token);
            }

            try
            {
                // Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
                if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper))
                {
                    // TODO : mark an orchestration as faulted if there is data corruption
                    this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
                    TraceHelper.TraceSession(
                        TraceEventType.Error,
                        "TaskOrchestrationDispatcher-DeletedOrchestration",
                        runtimeState.OrchestrationInstance?.InstanceId!,
                        "Received work-item for an invalid orchestration");
                    isCompleted = true;
                    traceActivity?.Dispose();
                }
                else
                {
                    do
                    {
                        continuedAsNew = false;
                        continuedAsNewMessage = null;


                        this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name);
                        TraceHelper.TraceInstance(
                            TraceEventType.Verbose,
                            "TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin",
                            runtimeState.OrchestrationInstance!,
                            "Executing user orchestration: {0}",
                            JsonDataConverter.Default.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true));

                        if (workItem.Cursor == null)
                        {
                            workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem);
                        }
                        else
                        {
                            await this.ResumeOrchestrationAsync(workItem);
                        }

                        IReadOnlyList<OrchestratorAction> decisions = workItem.Cursor.LatestDecisions.ToList();

                        this.logHelper.OrchestrationExecuted(
                            runtimeState.OrchestrationInstance!,
                            runtimeState.Name,
                            decisions);
                        TraceHelper.TraceInstance(
                            TraceEventType.Information,
                            "TaskOrchestrationDispatcher-ExecuteUserOrchestration-End",
                            runtimeState.OrchestrationInstance!,
                            "Executed user orchestration. Received {0} orchestrator actions: {1}",
                            decisions.Count,
                            string.Join(", ", decisions.Select(d => d.Id + ":" + d.OrchestratorActionType)));

                        // TODO: Exception handling for invalid decisions, which is increasingly likely
                        //       when custom middleware is involved (e.g. out-of-process scenarios).
                        foreach (OrchestratorAction decision in decisions)
                        {
                            TraceHelper.TraceInstance(
                                TraceEventType.Information,
                                "TaskOrchestrationDispatcher-ProcessOrchestratorAction",
                                runtimeState.OrchestrationInstance!,
                                "Processing orchestrator action of type {0}",
                                decision.OrchestratorActionType);
                            switch (decision.OrchestratorActionType)
                            {
                                case OrchestratorActionType.ScheduleOrchestrator:
                                    var scheduleTaskAction = (ScheduleTaskOrchestratorAction)decision;
                                    var message = this.ProcessScheduleTaskDecision(
                                        scheduleTaskAction,
                                        runtimeState,
                                        this.IncludeParameters,
                                        traceActivity);
                                    messagesToSend.Add(message);
                                    break;
                                case OrchestratorActionType.CreateTimer:
                                    var timerOrchestratorAction = (CreateTimerOrchestratorAction)decision;
                                    timerMessages.Add(this.ProcessCreateTimerDecision(
                                        timerOrchestratorAction,
                                        runtimeState,
                                        isInternal: false));
                                    break;
                                case OrchestratorActionType.CreateSubOrchestration:
                                    var createSubOrchestrationAction = (CreateSubOrchestrationAction)decision;
                                    orchestratorMessages.Add(
                                        this.ProcessCreateSubOrchestrationInstanceDecision(
                                            createSubOrchestrationAction,
                                            runtimeState,
                                            this.IncludeParameters,
                                            traceActivity));
                                    break;
                                case OrchestratorActionType.SendEvent:
                                    var sendEventAction = (SendEventOrchestratorAction)decision;
                                    orchestratorMessages.Add(
                                        this.ProcessSendEventDecision(sendEventAction, runtimeState));
                                    break;
                                case OrchestratorActionType.OrchestrationComplete:
                                    OrchestrationCompleteOrchestratorAction completeDecision = (OrchestrationCompleteOrchestratorAction)decision;
                                    TaskMessage? workflowInstanceCompletedMessage =
                                        this.ProcessWorkflowCompletedTaskDecision(completeDecision, runtimeState, this.IncludeDetails, out continuedAsNew);
                                    if (workflowInstanceCompletedMessage != null)
                                    {
                                        // Send complete message to parent workflow or to itself to start a new execution
                                        // Store the event so we can rebuild the state
                                        carryOverEvents = null;
                                        if (continuedAsNew)
                                        {
                                            continuedAsNewMessage = workflowInstanceCompletedMessage;
                                            continueAsNewExecutionStarted = workflowInstanceCompletedMessage.Event as ExecutionStartedEvent;
                                            if (completeDecision.CarryoverEvents.Any())
                                            {
                                                carryOverEvents = completeDecision.CarryoverEvents.ToList();
                                                completeDecision.CarryoverEvents.Clear();
                                            }
                                        }
                                        else
                                        {
                                            orchestratorMessages.Add(workflowInstanceCompletedMessage);
                                        }
                                    }

                                    isCompleted = !continuedAsNew;
                                    break;
                                default:
                                    throw TraceHelper.TraceExceptionInstance(
                                        TraceEventType.Error,
                                        "TaskOrchestrationDispatcher-UnsupportedDecisionType",
                                        runtimeState.OrchestrationInstance!,
                                        new NotSupportedException($"Decision type '{decision.OrchestratorActionType}' not supported"));
                            }

                            // Underlying orchestration service provider may have a limit of messages per call, to avoid the situation
                            // we keep on asking the provider if message count is ok and stop processing new decisions if not.
                            //
                            // We also put in a fake timer to force next orchestration task for remaining messages
                            int totalMessages = messagesToSend.Count + orchestratorMessages.Count + timerMessages.Count;
                            if (this.orchestrationService.IsMaxMessageCountExceeded(totalMessages, runtimeState))
                            {
                                TraceHelper.TraceInstance(
                                    TraceEventType.Information,
                                    "TaskOrchestrationDispatcher-MaxMessageCountReached",
                                    runtimeState.OrchestrationInstance!,
                                    "MaxMessageCount reached.  Adding timer to process remaining events in next attempt.");

                                if (isCompleted || continuedAsNew)
                                {
                                    TraceHelper.TraceInstance(
                                        TraceEventType.Information,
                                        "TaskOrchestrationDispatcher-OrchestrationAlreadyCompleted",
                                        runtimeState.OrchestrationInstance!,
                                        "Orchestration already completed.  Skip adding timer for splitting messages.");
                                    break;
                                }

                                var dummyTimer = new CreateTimerOrchestratorAction
                                {
                                    Id = FrameworkConstants.FakeTimerIdToSplitDecision,
                                    FireAt = DateTime.UtcNow
                                };

                                timerMessages.Add(this.ProcessCreateTimerDecision(
                                    dummyTimer,
                                    runtimeState,
                                    isInternal: true));
                                isInterrupted = true;
                                break;
                            }
                        }

                        // correlation
                        CorrelationTraceClient.Propagate(() =>
                        {
                            if (runtimeState.ExecutionStartedEvent != null)
                                runtimeState.ExecutionStartedEvent.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
                        });


                        // finish up processing of the work item
                        if (!continuedAsNew && runtimeState.Events.Last().EventType != EventType.OrchestratorCompleted)
                        {
                            runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
                        }

                        if (isCompleted)
                        {
                            TraceHelper.TraceSession(TraceEventType.Information, "TaskOrchestrationDispatcher-DeletingSessionState", workItem.InstanceId, "Deleting session state");
                            if (runtimeState.ExecutionStartedEvent != null)
                            {
                                instanceState = Utils.BuildOrchestrationState(runtimeState);
                            }
                        }
                        else
                        {
                            if (continuedAsNew)
                            {
                                TraceHelper.TraceSession(
                                    TraceEventType.Information,
                                    "TaskOrchestrationDispatcher-UpdatingStateForContinuation",
                                    workItem.InstanceId,
                                    "Updating state for continuation");

                                // correlation
                                CorrelationTraceClient.Propagate(() =>
                                {
                                    continueAsNewExecutionStarted!.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
                                });

                                // Copy the distributed trace context, if any
                                continueAsNewExecutionStarted!.SetParentTraceContext(runtimeState.ExecutionStartedEvent);

                                runtimeState = new OrchestrationRuntimeState();
                                runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
                                runtimeState.AddEvent(continueAsNewExecutionStarted!);
                                runtimeState.Status = workItem.OrchestrationRuntimeState.Status ?? carryOverStatus;
                                carryOverStatus = workItem.OrchestrationRuntimeState.Status;

                                if (carryOverEvents != null)
                                {
                                    foreach (var historyEvent in carryOverEvents)
                                    {
                                        runtimeState.AddEvent(historyEvent);
                                    }
                                }

                                runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
                                workItem.OrchestrationRuntimeState = runtimeState;

                                workItem.Cursor = null;
                            }

                            instanceState = Utils.BuildOrchestrationState(runtimeState);
                        }
                    } while (continuedAsNew);
                }
            }
            finally
            {
                if (renewTask != null)
                {
                    try
                    {

                        renewCancellationTokenSource.Cancel();
                        await renewTask;
                    }
                    catch (ObjectDisposedException)
                    {
                        // ignore
                    }
                    catch (OperationCanceledException)
                    {
                        // ignore
                    }
                }
            }

            if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
            {
                // some backends expect the original runtime state object
                workItem.OrchestrationRuntimeState = originalOrchestrationRuntimeState;
            }

            runtimeState.Status = runtimeState.Status ?? carryOverStatus;

            if (instanceState != null)
            {
                instanceState.Status = runtimeState.Status;
            }

            await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
                workItem,
                runtimeState,
                continuedAsNew ? null : messagesToSend,
                orchestratorMessages,
                continuedAsNew ? null : timerMessages,
                continuedAsNewMessage,
                instanceState);
            
            if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
            {
                workItem.OrchestrationRuntimeState = runtimeState;
            }

            return isCompleted || continuedAsNew || isInterrupted;
        }

StopAsync() 方法

Stops the dispatcher to stop getting and processing orchestration events

停止调度程序,以停止获取和处理协调事件

        public async Task StopAsync(bool forced)
        {
            await this.dispatcher.StopAsync(forced);
        }

NonBlockingCountdownLock

一个 CountdownLock 的实现:

internal class NonBlockingCountdownLock
        {
            int available;

            public NonBlockingCountdownLock(int available)
            {
                if (available <= 0)
                {
                    throw new ArgumentOutOfRangeException(nameof(available));
                }

                this.available = available;
                this.Capacity = available;
            }

            public int Capacity { get; }

            public bool Acquire()
            {
                if (this.available <= 0)
                {
                    return false;
                }

                if (Interlocked.Decrement(ref this.available) >= 0)
                {
                    return true;
                }

                // the counter went negative - fix it
                Interlocked.Increment(ref this.available);
                return false;
            }

            public void Release()
            {
                Interlocked.Increment(ref this.available);
            }
        }

2.2 - WorkItemDispatcher源码学习

学习durabletask WorkItemDispatcher的源码

Dispatcher class for fetching and processing work items of the supplied type

调度程序类,用于获取和处理所提供类型的工作项

src\DurableTask.Core\WorkItemDispatcher.cs

类定义

public class WorkItemDispatcher<T> : IDisposable

类字段

        const int DefaultMaxConcurrentWorkItems = 20;
        const int DefaultDispatcherCount = 1;

        const int BackOffIntervalOnInvalidOperationSecs = 10;
        const int CountDownToZeroDelay = 5;

        // ReSharper disable once StaticMemberInGenericType
        static readonly TimeSpan DefaultReceiveTimeout = TimeSpan.FromSeconds(30);
        readonly string id;
        readonly string name;
        readonly object thisLock = new object();
        readonly SemaphoreSlim initializationLock = new SemaphoreSlim(1, 1);

        volatile int concurrentWorkItemCount;
        volatile int countDownToZeroDelay;
        volatile int delayOverrideSecs;
        volatile int activeFetchers;
        bool isStarted;
        SemaphoreSlim concurrencyLock;
        CancellationTokenSource shutdownCancellationTokenSource;

方法定义

        readonly Func<T, string> workItemIdentifier;        

				Func<TimeSpan, CancellationToken, Task<T>> FetchWorkItem { get; }

        Func<T, Task> ProcessWorkItem { get; }
        
        /// <summary>
        /// Method to execute for safely releasing a work item
        /// </summary>
        public Func<T, Task> SafeReleaseWorkItem;

        /// <summary>
        /// Method to execute for aborting a work item
        /// </summary>
        public Func<T, Task> AbortWorkItem;

        /// <summary>
        /// Method to get a delay to wait after a fetch exception
        /// </summary>
        public Func<Exception, int> GetDelayInSecondsAfterOnFetchException = (exception) => 0;

        /// <summary>
        /// Method to get a delay to wait after a process exception
        /// </summary>
        public Func<Exception, int> GetDelayInSecondsAfterOnProcessException = (exception) => 0;

构造函数

        public WorkItemDispatcher(
            string name, 
            Func<T, string> workItemIdentifier,
            Func<TimeSpan, CancellationToken, Task<T>> fetchWorkItem,
            Func<T, Task> processWorkItem)
        {
            this.name = name;
            this.id = Guid.NewGuid().ToString("N");
            this.workItemIdentifier = workItemIdentifier ?? throw new ArgumentNullException(nameof(workItemIdentifier));
            this.FetchWorkItem = fetchWorkItem ?? throw new ArgumentNullException(nameof(fetchWorkItem));
            this.ProcessWorkItem = processWorkItem ?? throw new ArgumentNullException(nameof(processWorkItem));
        }

对照 TaskOrchestrationDispatcher 中的初始化代码:

internal TaskOrchestrationDispatcher(
            IOrchestrationService orchestrationService,
            INameVersionObjectManager<TaskOrchestration> objectManager,
            DispatchMiddlewarePipeline dispatchPipeline,
            LogHelper logHelper,
            ErrorPropagationMode errorPropagationMode)
        {
						......
  					// 初始化 dispatcher
            this.dispatcher = new WorkItemDispatcher<TaskOrchestrationWorkItem>(
                "TaskOrchestrationDispatcher",
                item => item == null ? string.Empty : item.InstanceId,
                this.OnFetchWorkItemAsync,
                this.OnProcessWorkItemSessionAsync)
            {
                // 初始化 dispatcher 的字段
                GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
                GetDelayInSecondsAfterOnProcessException = orchestrationService.GetDelayInSecondsAfterOnProcessException,
                SafeReleaseWorkItem = orchestrationService.ReleaseTaskOrchestrationWorkItemAsync,
                AbortWorkItem = orchestrationService.AbandonTaskOrchestrationWorkItemAsync,
                DispatcherCount = orchestrationService.TaskOrchestrationDispatcherCount,
                MaxConcurrentWorkItems = orchestrationService.MaxConcurrentTaskOrchestrationWorkItems,
                LogHelper = logHelper,
            };

						......
        }

Func<T, string> workItemIdentifier 的实现是:

item => item == null ? string.Empty : item.InstanceId,

fetchWorkItem 的实现是 TaskOrchestrationDispatcher.this.OnFetchWorkItemAsync,fetchWorkItem 的实现是 TaskOrchestrationDispatcher.this.OnProcessWorkItemSessionAsync,

DispatcherCount 由 orchestrationService.TaskOrchestrationDispatcherCount 指定,

MaxConcurrentWorkItems 由 orchestrationService.MaxConcurrentTaskOrchestrationWorkItems 指定。

StartAsync() 方法

Starts the work item dispatcher:

public async Task StartAsync()
        {
            if (!this.isStarted)
            {
                await this.initializationLock.WaitAsync();
                try
                {
                    if (this.isStarted)
                    {
                        throw TraceHelper.TraceException(TraceEventType.Error, "WorkItemDispatcherStart-AlreadyStarted", new InvalidOperationException($"WorkItemDispatcher '{this.name}' has already started"));
                    }

                    this.concurrencyLock?.Dispose();
                    this.concurrencyLock = new SemaphoreSlim(this.MaxConcurrentWorkItems);

                    this.shutdownCancellationTokenSource?.Dispose();
                    this.shutdownCancellationTokenSource = new CancellationTokenSource();

                    this.isStarted = true;

                    TraceHelper.Trace(TraceEventType.Information, "WorkItemDispatcherStart", $"WorkItemDispatcher('{this.name}') starting. Id {this.id}.");

                    for (var i = 0; i < this.DispatcherCount; i++)
                    {
                        string dispatcherId = i.ToString();
                        // 创建 context
                        var context = new WorkItemDispatcherContext(this.name, this.id, dispatcherId);
                        this.LogHelper.DispatcherStarting(context);

                        // We just want this to Run we intentionally don't wait
                        // 我们只是想让它运行起来,我们故意不等待
                        #pragma warning disable 4014
                        Task.Run(() => this.DispatchAsync(context));
                        #pragma warning restore 4014
                    }
                }
                finally
                {
                    this.initializationLock.Release();
                }
            }
        }

DispatchAsync() 方法

async Task DispatchAsync(WorkItemDispatcherContext context)
        {
            string dispatcherId = context.DispatcherId;

            bool logThrottle = true;
            while (this.isStarted)
            {
                if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
                {
                    if (logThrottle)
                    {
                        // This can happen frequently under heavy load.
                        // To avoid log spam, we log just once until we can proceed.
                        this.LogHelper.FetchingThrottled(
                            context,
                            this.concurrentWorkItemCount,
                            this.MaxConcurrentWorkItems);
                        TraceHelper.Trace(
                            TraceEventType.Warning,
                            "WorkItemDispatcherDispatch-MaxOperations",
                            this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));
                        
                        logThrottle = false;
                    }

                    continue;
                }

                logThrottle = true;

                var delaySecs = 0;
                T workItem = default(T);
                try
                {
                    Interlocked.Increment(ref this.activeFetchers);
                    this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
                    TraceHelper.Trace(
                        TraceEventType.Verbose, 
                        "WorkItemDispatcherDispatch-StartFetch",
                        this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));

                    Stopwatch timer = Stopwatch.StartNew();
                    // 在这里开始 fetch workitem
                    workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);

                    if (!IsNull(workItem))
                    {
                        string workItemId = this.workItemIdentifier(workItem);
                        this.LogHelper.FetchWorkItemCompleted(
                            context,
                            workItemId,
                            timer.Elapsed,
                            this.concurrentWorkItemCount,
                            this.MaxConcurrentWorkItems);
                    }

                    TraceHelper.Trace(
                        TraceEventType.Verbose, 
                        "WorkItemDispatcherDispatch-EndFetch",
                        this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
                }
                catch (TimeoutException)
                {
                    delaySecs = 0;
                }
                catch (TaskCanceledException exception)
                {
                    TraceHelper.Trace(
                        TraceEventType.Information,
                        "WorkItemDispatcherDispatch-TaskCanceledException",
                        this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}"));
                    delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
                }
                catch (Exception exception)
                {
                    if (!this.isStarted)
                    {
                        TraceHelper.Trace(
                            TraceEventType.Information, 
                            "WorkItemDispatcherDispatch-HarmlessException",
                            this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
                    }
                    else
                    {
                        this.LogHelper.FetchWorkItemFailure(context, exception);
                        // TODO : dump full node context here
                        TraceHelper.TraceException(
                            TraceEventType.Warning, 
                            "WorkItemDispatcherDispatch-Exception", 
                            exception,
                            this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
                        delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
                    }
                }
                finally
                {
                    Interlocked.Decrement(ref this.activeFetchers);
                }

                // 然后开始调度 workitem
                var scheduledWorkItem = false;
                if (!IsNull(workItem))
                {
                    if (!this.isStarted)
                    {
                        if (this.SafeReleaseWorkItem != null)
                        {
                            await this.SafeReleaseWorkItem(workItem);
                        }
                    }
                    else
                    {
                        Interlocked.Increment(ref this.concurrentWorkItemCount);
                        // We just want this to Run we intentionally don't wait
                        #pragma warning disable 4014 
                       // 开始 workitem 的处理
                        Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
                        #pragma warning restore 4014

                        scheduledWorkItem = true;
                    }
                }

                delaySecs = Math.Max(this.delayOverrideSecs, delaySecs);
                if (delaySecs > 0)
                {
                    await Task.Delay(TimeSpan.FromSeconds(delaySecs));
                }

                if (!scheduledWorkItem)
                {
                    this.concurrencyLock.Release();
                }
            }

            this.LogHelper.DispatcherStopped(context);
        }

这里的 FetchWorkItem() 方法和 ProcessWorkItemAsync() 方法都是由构造函数传递的(实际由 TaskOrchestrationDispatcher 传递)。

StopAsync() 方法

Stops the dispatcher to stop getting and processing orchestration events

停止调度程序,以停止获取和处理协调事件

        public async Task StopAsync(bool forced)
        {
            await this.dispatcher.StopAsync(forced);
        }

3 - DurableTask Orchestration源码学习

学习durabletask Orchestration的源码

3.1 - IOrchestrationService

IOrchestrationService 的源码分析

Orchestration Service interface for performing task hub management operations and handling orchestrations and work items’ state

协调服务接口,用于执行任务中心管理操作,处理协调和工作项状态

代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationService.cs

接口定义

IOrchestrationService 是一个接口:

    public interface IOrchestrationService
    {}

方法定义

Start

        /// <summary>
        /// Starts the service initializing the required resources
        /// </summary>
        Task StartAsync();

Stop

        /// <summary>
        /// Stops the orchestration service gracefully
        /// </summary>
        Task StopAsync();

        /// <summary>
        /// Stops the orchestration service with optional forced flag
        /// </summary>
        Task StopAsync(bool isForced);

Create

        /// <summary>
        /// Deletes and Creates the necessary resources for the orchestration service and the instance store
        /// </summary>
        Task CreateAsync();

        /// <summary>
        /// Deletes and Creates the necessary resources for the orchestration service and optionally the instance store
        /// </summary>
        Task CreateAsync(bool recreateInstanceStore);

        /// <summary>
        /// Creates the necessary resources for the orchestration service and the instance store
        /// </summary>
        Task CreateIfNotExistsAsync();

Delete

        /// <summary>
        /// Deletes the resources for the orchestration service and the instance store
        /// </summary>
        Task DeleteAsync();

        /// <summary>
        /// Deletes the resources for the orchestration service and optionally the instance store
        /// </summary>
        Task DeleteAsync(bool deleteInstanceStore);

LockNextTaskOrchestrationWorkItem

等待下一个协调工作项并返回协调工作项,这个方法是重点。

        /// <summary>
        ///     Wait for the next orchestration work item and return the orchestration work item
        /// </summary>
        Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken);

LockNextTaskActivity

        /// <summary>
        ///    Wait for an lock the next task activity to be processed 
        /// </summary>
        Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken);

3.2 - IOrchestrationServiceClient

IOrchestrationServiceClient 的源码分析

Interface to allow creation of new task orchestrations and query their status.

允许创建新任务协调和查询其状态的界面。

代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationServiceClient.cs

接口定义

IOrchestrationServiceClient 是一个接口:

    public interface IOrchestrationServiceClient
    {}

方法定义

CreateTaskOrchestration

        /// <summary>
        /// Creates a new orchestration
        /// </summary>
        /// <param name="creationMessage">Orchestration creation message</param>
        /// <exception cref="OrchestrationAlreadyExistsException">Will throw an OrchestrationAlreadyExistsException exception If any orchestration with the same instance Id exists in the instance store.</exception>
        /// <returns></returns>
        Task CreateTaskOrchestrationAsync(TaskMessage creationMessage);

        /// <summary>
        /// Creates a new orchestration and specifies a subset of states which should be de duplicated on in the client side
        /// </summary>
        /// <param name="creationMessage">Orchestration creation message</param>
        /// <param name="dedupeStatuses">States of previous orchestration executions to be considered while de-duping new orchestrations on the client</param>
        /// <exception cref="OrchestrationAlreadyExistsException">Will throw an OrchestrationAlreadyExistsException exception If any orchestration with the same instance Id exists in the instance store and it has a status specified in dedupeStatuses.</exception>
        /// <returns></returns>
        Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses);

创建新的协调,并指定应在客户端去重复的状态子集

SendTaskOrchestrationMessage

为协调发送新信息

TBD: 这个方法不太理解在做什么?

        /// <summary>
        /// Send a new message for an orchestration
        /// </summary>
        /// <param name="message">Message to send</param>
        /// <returns></returns>
        Task SendTaskOrchestrationMessageAsync(TaskMessage message);

        /// <summary>
        /// Send a new set of messages for an orchestration
        /// </summary>
        /// <param name="messages">Messages to send</param>
        /// <returns></returns>
        Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages);

WaitForOrchestration

        /// <summary>
        /// Wait for an orchestration to reach any terminal state within the given timeout
        /// </summary>
        /// <param name="instanceId">Instance id of the orchestration</param>
        /// <param name="executionId">Execution id of the orchestration</param>
        /// <param name="timeout">Maximum amount of time to wait</param>
        /// <param name="cancellationToken">Task cancellation token</param>
        Task<OrchestrationState> WaitForOrchestrationAsync(
            string instanceId, 
            string executionId,
            TimeSpan timeout, 
            CancellationToken cancellationToken);

ForceTerminateTaskOrchestration

        /// <summary>
        /// Forcefully terminate the specified orchestration instance
        /// </summary>
        /// <param name="instanceId">Instance to terminate</param>
        /// <param name="reason">Reason for termination</param>
        Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason);

GetOrchestrationState

        /// <summary>
        /// Get a list of orchestration states from the instance storage for the most current execution (generation) of the specified instance.
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="allExecutions">True if method should fetch all executions of the instance, false if the method should only fetch the most recent execution</param>
        /// <returns>List of OrchestrationState objects that represents the list of orchestrations in the instance store</returns>
        Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions);

        /// <summary>
        /// Get a list of orchestration states from the instance storage for the specified execution (generation) of the specified instance.
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="executionId">Execution id</param>
        /// <returns>The OrchestrationState of the specified instanceId or null if not found</returns>
        Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId);

GetOrchestrationHistory

        /// <summary>
        /// Get a string dump of the execution history of the specified orchestration instance specified execution (generation) of the specified instance
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="executionId">Execution id</param>
        /// <returns>String with formatted JSON representing the execution history</returns>
        Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId);

PurgeOrchestrationHistory

清除超过指定阈值时间的协调实例状态和历史记录。还会清除 Blob 存储。

        /// <summary>
        /// Purges orchestration instance state and history for orchestrations older than the specified threshold time.
        /// Also purges the blob storage.
        /// </summary>
        /// <param name="thresholdDateTimeUtc">Threshold date time in UTC</param>
        /// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
        Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);

3.3 - IOrchestrationServiceInstanceStore

IOrchestrationServiceInstanceStore 的源码分析

Instance Store provider interface to allow storage and lookup for orchestration state and event history

实例存储提供商接口,允许存储和查找协调状态和事件历史记录

代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationServiceInstanceStore.cs

接口定义

IOrchestrationServiceInstanceStore 是一个接口:

    public interface IOrchestrationServiceInstanceStore
    {}

方法定义

生命周期相关的方法

        /// <summary>
        /// Gets the maximum length a history entry can be so it can be truncated if necessary
        /// </summary>
        /// <returns>The maximum length</returns>
        int MaxHistoryEntryLength { get; }

        /// <summary>
        /// Runs initialization to prepare the instance store for use
        /// </summary>
        /// <param name="recreate">Flag to indicate whether the store should be recreated.</param>
        Task InitializeStoreAsync(bool recreate);

        /// <summary>
        /// Deletes instances instance store
        /// </summary>
        Task DeleteStoreAsync();

Entity 相关的方法

        /// <summary>
        /// Writes a list of history events to instance store
        /// </summary>
        /// <param name="entities">List of history events to write</param>
        Task<object> WriteEntitiesAsync(IEnumerable<InstanceEntityBase> entities);

        /// <summary>
        /// Get a list of state events from instance store
        /// </summary>
        /// <param name="instanceId">The instance id to return state for</param>
        /// <param name="executionId">The execution id to return state for</param>
        /// <returns>The matching orchestration state or null if not found</returns>
        Task<IEnumerable<OrchestrationStateInstanceEntity>> GetEntitiesAsync(string instanceId, string executionId);

        /// <summary>
        /// Deletes a list of history events from instance store
        /// </summary>
        /// <param name="entities">List of history events to delete</param>
        Task<object> DeleteEntitiesAsync(IEnumerable<InstanceEntityBase> entities);

GetOrchestrationState

获取指定实例的协调状态列表

        /// <summary>
        /// Gets a list of orchestration states for a given instance
        /// </summary>
        /// <param name="instanceId">The instance id to return state for</param>
        /// <param name="allInstances">Flag indication whether to get all history execution ids or just the most recent</param>
        /// <returns>List of matching orchestration states</returns>
        Task<IEnumerable<OrchestrationStateInstanceEntity>> GetOrchestrationStateAsync(string instanceId, bool allInstances);

        /// <summary>
        /// Gets the orchestration state for a given instance and execution id
        /// </summary>
        /// <param name="instanceId">The instance id to return state for</param>
        /// <param name="executionId">The execution id to return state for</param>
        /// <returns>The matching orchestration state or null if not found</returns>
        Task<OrchestrationStateInstanceEntity> GetOrchestrationStateAsync(string instanceId, string executionId);

OrchestrationStateInstanceEntity 是这样的:

    public class OrchestrationStateInstanceEntity : InstanceEntityBase
    {
        /// <summary>
        /// The orchestration state for this instance entity
        /// </summary>
        public OrchestrationState State;
    }

ForceTerminateTaskOrchestration

        /// <summary>
        /// Forcefully terminate the specified orchestration instance
        /// </summary>
        /// <param name="instanceId">Instance to terminate</param>
        /// <param name="reason">Reason for termination</param>
        Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason);

GetOrchestrationState

        /// <summary>
        /// Get a list of orchestration states from the instance storage for the most current execution (generation) of the specified instance.
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="allExecutions">True if method should fetch all executions of the instance, false if the method should only fetch the most recent execution</param>
        /// <returns>List of OrchestrationState objects that represents the list of orchestrations in the instance store</returns>
        Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions);

        /// <summary>
        /// Get a list of orchestration states from the instance storage for the specified execution (generation) of the specified instance.
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="executionId">Execution id</param>
        /// <returns>The OrchestrationState of the specified instanceId or null if not found</returns>
        Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId);

GetOrchestrationHistory

        /// <summary>
        /// Get a string dump of the execution history of the specified orchestration instance specified execution (generation) of the specified instance
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="executionId">Execution id</param>
        /// <returns>String with formatted JSON representing the execution history</returns>
        Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId);

PurgeOrchestrationHistory

清除超过指定阈值时间的协调实例状态和历史记录。还会清除 Blob 存储。

        /// <summary>
        /// Purges orchestration instance state and history for orchestrations older than the specified threshold time.
        /// Also purges the blob storage.
        /// </summary>
        /// <param name="thresholdDateTimeUtc">Threshold date time in UTC</param>
        /// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
        Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);

OrchestrationHistory 相关的方法


        /// <summary>
        /// Gets the list of history events for a given instance and execution id
        /// </summary>
        /// <param name="instanceId">The instance id to return history for</param>
        /// <param name="executionId">The execution id to return history for</param>
        /// <returns>List of history events</returns>
        Task<IEnumerable<OrchestrationWorkItemInstanceEntity>> GetOrchestrationHistoryEventsAsync(string instanceId, string executionId);

        /// <summary>
        /// Purges history from storage for given time range
        /// </summary>
        /// <param name="thresholdDateTimeUtc">The datetime in UTC to use as the threshold for purging history</param>
        /// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
        /// <returns>The number of history events purged.</returns>
        Task<int> PurgeOrchestrationHistoryEventsAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);

JumpStartEntities 相关的方法

        /// <summary>
        /// Writes a list of jump start events to instance store
        /// </summary>
        /// <param name="entities">List of jump start events to write</param>
        Task<object> WriteJumpStartEntitiesAsync(IEnumerable<OrchestrationJumpStartInstanceEntity> entities);

        /// <summary>
        /// Deletes a list of jump start events from instance store
        /// </summary>
        /// <param name="entities">List of jump start events to delete</param>
        Task<object> DeleteJumpStartEntitiesAsync(IEnumerable<OrchestrationJumpStartInstanceEntity> entities);

        /// <summary>
        /// Get a list of jump start events from instance store
        /// </summary>
        /// <returns>List of jump start events</returns>
        Task<IEnumerable<OrchestrationJumpStartInstanceEntity>> GetJumpStartEntitiesAsync(int top);