DurableTask Core源码学习
- 1: 核心编程模型
- 1.1: Orchestration
- 1.1.1: OrchestrationInstance
- 1.1.2: OrchestrationContext
- 1.1.3: OrchestrationState
- 1.2: Activity
- 1.2.1: TaskActivity
- 1.2.2: TaskContext
- 1.3: OrchestrationExecution
- 1.3.1: OrchestrationExecutionContext
- 1.3.2: OrchestrationExecutionCursor
- 1.4: TaskOrchestration
- 1.4.1: TaskOrchestration
- 1.4.2: TaskOrchestrationContext
- 1.5: OrchestrationState
- 1.5.1: OrchestrationState
- 1.5.2: OrchestrationStateQuery
- 1.5.3: OrchestrationStateQueryFilter
- 1.5.4: OrchestrationStateInstanceFilter
- 1.5.5: OrchestrationStateNameVersionFilter
- 1.6: Entity
- 1.6.1: TaskEntity
- 1.6.2: EntityId
- 1.7: History
- 1.7.1: History概述
- 1.7.2: HistoryEvent事件
- 1.7.3: ExecutionStartedEvent事件
- 1.7.4: OrchestratorStartedEvent事件
- 2: DurableTask Dispatcher源码学习
- 3: DurableTask Orchestration源码学习
1 - 核心编程模型
1.1 - Orchestration
1.1.1 - OrchestrationInstance
src\DurableTask.Core\OrchestrationInstance.cs
OrchestrationInstance 中定义了几个属性:
- InstanceId
- ExecutionId()
[DataContract]
public class OrchestrationInstance : IExtensibleDataObject
{
/// <summary>
/// The instance id, assigned as unique to the orchestration
/// </summary>
[DataMember]
public string InstanceId { get; set; }
/// <summary>
/// The execution id, unique to the execution of this instance
/// </summary>
[DataMember]
public string ExecutionId { get; set; }
为了支持 versioning,准备修改 OrchestrationInstance,增加一个 InstanceVersion 字段,类型为 string
/// <summary>
/// The version of this orchestration instance
/// </summary>
[DataMember]
public string InstanceVersion { get; set; }
1.1.2 - OrchestrationContext
src\DurableTask.Core\OrchestrationContext.cs
Context for an orchestration containing the instance, replay status, orchestration methods and proxy methods
包含实例、重放状态、协调方法和代理方法的协调上下文
类定义
OrchestrationContext 是一个 抽象类:
public abstract class OrchestrationContext
{
}
实现类有 TaskOrchestrationContext。
OrchestrationContext 定义有以下 (public virtual) 方法:
- CreateClient()
- CreateClientV2()
- CreateRetryableClient()
- ScheduleWithRetry()
- ScheduleTask()
- CreateTimer()
- CreateSubOrchestrationInstance()
- CreateSubOrchestrationInstanceWithRetry()
- SendEvent()
- ContinueAsNew()
ScheduleTask()
/// <param name="version">Name of the orchestration as specified by the ObjectCreator</param>
public abstract Task<TResult> ScheduleTask<TResult>(string name, string version, params object[] parameters);
这里是有 version 概念的。
ScheduleWithRetry()
public virtual Task<T> ScheduleWithRetry<T>(Type taskActivityType, RetryOptions retryOptions,
params object[] parameters)
{
return ScheduleWithRetry<T>(NameVersionHelper.GetDefaultName(taskActivityType),
NameVersionHelper.GetDefaultVersion(taskActivityType),
retryOptions, parameters);
}
NameVersionHelper.GetDefaultVersion(taskActivityType) 目前没有实现,只是写死了返回空字符串:
public static string GetDefaultVersion(object obj)
{
return string.Empty;
}
之后调用带 version 参数的重载方法:
public virtual Task<T> ScheduleWithRetry<T>(string name, string version, RetryOptions retryOptions,
params object[] parameters)
{
Task<T> RetryCall() => ScheduleTask<T>(name, version, parameters);
var retryInterceptor = new RetryInterceptor<T>(this, retryOptions, RetryCall);
return retryInterceptor.Invoke();
}
还是调用到 ScheduleTask() 方法上了。
CreateTimer()
public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state);
public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken);
CreateSubOrchestrationInstance()
public virtual Task<T> CreateSubOrchestrationInstance<T>(Type orchestrationType, object input)
{
return CreateSubOrchestrationInstance<T>(NameVersionHelper.GetDefaultName(orchestrationType),
NameVersionHelper.GetDefaultVersion(orchestrationType), input);
}
public virtual Task<T> CreateSubOrchestrationInstance<T>(Type orchestrationType, string instanceId, object input)
{
return CreateSubOrchestrationInstance<T>(NameVersionHelper.GetDefaultName(orchestrationType),
NameVersionHelper.GetDefaultVersion(orchestrationType), instanceId, input);
}
public abstract Task<T> CreateSubOrchestrationInstance<T>(string name, string version, object input);
这个方法也是定义有 version 参数的,只是依然是没有被使用。
SendEvent()
public abstract void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData);
在 OrchestrationInstance 增加了 version 参数之后,这个方法也有 version 的概念了。
ContinueAsNew()
public abstract void ContinueAsNew(object input);
没有 version 的概念,最好在实现中重用当前 instance 的 version(如果有指定)。
CreateClient
Create a proxy client class to schedule remote TaskActivities via a strongly typed interface.
创建一个代理客户端类,通过强类型接口调度远程 TaskActivities。
public virtual T CreateClient<T>() where T : class
{
return CreateClient<T>(false);
}
public virtual T CreateClient<T>(bool useFullyQualifiedMethodNames) where T : class
{
return CreateClient<T>(() => new ScheduleProxy(this, useFullyQualifiedMethodNames));
}
private static T CreateClient<T>(Func<IInterceptor> createScheduleProxy) where T : class
{
if (!typeof(T).IsInterface && !typeof(T).IsClass)
{
throw new InvalidOperationException($"{nameof(T)} must be an interface or class.");
}
IInterceptor scheduleProxy = createScheduleProxy();
if (typeof(T).IsClass)
{
if (typeof(T).IsSealed)
{
throw new InvalidOperationException("Class cannot be sealed.");
}
return ProxyGenerator.CreateClassProxy<T>(scheduleProxy);
}
return ProxyGenerator.CreateInterfaceProxyWithoutTarget<T>(scheduleProxy);
}
这里没有 version 的概念。
1.1.3 - OrchestrationState
src\DurableTask.Core\OrchestrationState.cs
OrchestrationState 中定义了几个属性:
- CompletedTime
- CompressedSize
- CreatedTime
- Input
- LastUpdatedTime
- Name
- OrchestrationInstance: 包含 InstanceId 和 ExecutionId
- Output
- ParentInstance
- Size
- Status
- Tags
- Version: string 格式,看能否复用。
- Generation
- ScheduledStartTime
- FailureDetails
1.2 - Activity
1.2.1 - TaskActivity
src\DurableTask.Core\TaskActivity.cs
TaskActivity 中定义了三个方法:
- Run()
- RunAsync()
Run() 方法
public abstract string Run(TaskContext context, string input);
blocked for AsyncTaskActivity:
/// <summary>
/// Synchronous execute method, blocked for AsyncTaskActivity
/// </summary>
/// <returns>string.Empty</returns>
public override string Run(TaskContext context, string input)
{
// will never run
return string.Empty;
}
RunAsync() 方法
public virtual Task<string> RunAsync(TaskContext context, string input)
{
return Task.FromResult(Run(context, input));
}
会被覆盖为:
public override async Task<string> RunAsync(TaskContext context, string input)
{
TInput parameter = default(TInput);
var jArray = Utils.ConvertToJArray(input);
int parameterCount = jArray.Count;
if (parameterCount > 1)
{
throw new TaskFailureException(
"TaskActivity implementation cannot be invoked due to more than expected input parameters. Signature mismatch.");
}
if (parameterCount == 1)
{
JToken jToken = jArray[0];
if (jToken is JValue jValue)
{
parameter = jValue.ToObject<TInput>();
}
else
{
string serializedValue = jToken.ToString();
parameter = DataConverter.Deserialize<TInput>(serializedValue);
}
}
TResult result;
try
{
result = await ExecuteAsync(context, parameter);
}
catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
string details = null;
FailureDetails failureDetails = null;
if (context != null && context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
{
details = Utils.SerializeCause(e, DataConverter);
}
else
{
failureDetails = new FailureDetails(e);
}
throw new TaskFailureException(e.Message, e, details)
.WithFailureDetails(failureDetails);
}
string serializedResult = DataConverter.Serialize(result);
return serializedResult;
}
}
ExecuteAsync() 是一个abstract 方法:
protected abstract Task<TResult> ExecuteAsync(TaskContext context, TInput input);
GetStatus() 方法
public abstract string GetStatus();
1.2.2 - TaskContext
src\DurableTask.Core\TaskContext.cs
TaskActivity 中定义了以下属性
- OrchestrationInstance: 包含 InstanceId 和 InstanceId
- ErrorPropagationMode
1.3 - OrchestrationExecution
1.3.1 - OrchestrationExecutionContext
src\DurableTask.Core\OrchestrationExecutionContext.cs
Context associated with the orchestration being executed.
与正在执行的协调相关的上下文。
[DataContract]
public class OrchestrationExecutionContext
{
}
OrchestrationTags()
这个类就定义了一个 OrchestrationTags 方法:
/// <summary>
/// Gets the orchestration tags
/// </summary>
[DataMember]
public IDictionary<string, string> OrchestrationTags { get; internal set; }
1.3.2 - OrchestrationExecutionCursor
src\DurableTask.Core\OrchestrationExecutionCursor.cs
Context associated with the orchestration being executed.
与正在执行的协调相关的上下文。
类定义:
internal class OrchestrationExecutionCursor
{
}
构造函数:
public OrchestrationExecutionCursor(
OrchestrationRuntimeState state,
TaskOrchestration orchestration,
TaskOrchestrationExecutor executor,
IEnumerable<OrchestratorAction> latestDecisions)
{
RuntimeState = state;
TaskOrchestration = orchestration;
OrchestrationExecutor = executor;
LatestDecisions = latestDecisions;
}
get/set 方法:
public OrchestrationRuntimeState RuntimeState { get; }
public TaskOrchestration TaskOrchestration { get; }
public TaskOrchestrationExecutor OrchestrationExecutor { get; }
public IEnumerable<OrchestratorAction> LatestDecisions { get; set; }
就是一个值对象。
Cursor 游标体现在哪里?
1.4 - TaskOrchestration
1.4.1 - TaskOrchestration
src\DurableTask.Core\TaskOrchestration.cs
TaskOrchestration.cs 中定义了三个方法:
- Execute()
- RaiseEvent()
- GetStatus()
Execute() 方法
public abstract Task<string> Execute(OrchestrationContext context, string input);
方法实现为:
public override async Task<string> Execute(OrchestrationContext context, string input)
{
var parameter = DataConverter.Deserialize<TInput>(input);
TResult result;
try
{
result = await RunTask(context, parameter);
}
catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
string details = null;
FailureDetails failureDetails = null;
if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
{
details = Utils.SerializeCause(e, DataConverter);
}
else
{
failureDetails = new FailureDetails(e);
}
throw new OrchestrationFailureException(e.Message, details)
{
FailureDetails = failureDetails,
};
}
return DataConverter.Serialize(result);
}
RunTask() 方法是个抽象方法。
public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);
默认的 DataConverter 是 json:
public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
{
/// <summary>
/// Creates a new TaskOrchestration with the default DataConverter
/// </summary>
protected TaskOrchestration()
{
DataConverter = JsonDataConverter.Default;
}
/// <summary>
/// The DataConverter to use for input and output serialization/deserialization
/// </summary>
public DataConverter DataConverter { get; protected set; }
RaiseEvent() 方法
public abstract void RaiseEvent(OrchestrationContext context, string name, string input);
方法实现为:
public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
var parameter = DataConverter.Deserialize<TEvent>(input);
OnEvent(context, name, parameter);
}
OnEvent() 是一个空实现。
public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
{
// do nothing
}
GetStatus() 方法
public abstract string GetStatus();
实现
在这个项目中除了 sample 和 test 外没有实现类。
在 samples\Correlation.Samples\HelloOrchestrator.cs 中有一个最简单的实现:
[KnownType(typeof(Hello))]
internal class HelloOrchestrator : TaskOrchestration<string, string>
{
public override async Task<string> RunTask(OrchestrationContext context, string input)
{
// await contextBase.ScheduleTask<string>(typeof(Hello), "world");
// if you pass an empty string it throws an error
return await context.ScheduleTask<string>(typeof(Hello), "world");
}
}
internal class Hello : TaskActivity<string, string>
{
protected override string Execute(TaskContext context, string input)
{
if (string.IsNullOrEmpty(input))
{
throw new ArgumentNullException(nameof(input));
}
Console.WriteLine($"Activity: Hello {input}");
return $"Hello, {input}!";
}
}
备注:这个实现和 Dapr workflow java sdk 中的定义最贴近,也最适合用来增加一个 getVersion() 方法来获取当前 worker 的版本,但是为什么 quickstart 中用的是静态方法?
TBD:请教一下 Chris。
1.4.2 - TaskOrchestrationContext
src\DurableTask.Core\TaskOrchestrationContext.cs
类定义
TaskOrchestrationContext 继承自 OrchestrationContext:
internal class TaskOrchestrationContext : OrchestrationContext
{
}
Execute() 方法
public abstract Task<string> Execute(OrchestrationContext context, string input);
方法实现为:
public override async Task<string> Execute(OrchestrationContext context, string input)
{
var parameter = DataConverter.Deserialize<TInput>(input);
TResult result;
try
{
result = await RunTask(context, parameter);
}
catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
string details = null;
FailureDetails failureDetails = null;
if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
{
details = Utils.SerializeCause(e, DataConverter);
}
else
{
failureDetails = new FailureDetails(e);
}
throw new OrchestrationFailureException(e.Message, details)
{
FailureDetails = failureDetails,
};
}
return DataConverter.Serialize(result);
}
RunTask() 方法是个抽象方法。
public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);
默认的 DataConverter 是 json:
public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
{
/// <summary>
/// Creates a new TaskOrchestration with the default DataConverter
/// </summary>
protected TaskOrchestration()
{
DataConverter = JsonDataConverter.Default;
}
/// <summary>
/// The DataConverter to use for input and output serialization/deserialization
/// </summary>
public DataConverter DataConverter { get; protected set; }
RaiseEvent() 方法
public abstract void RaiseEvent(OrchestrationContext context, string name, string input);
方法实现为:
public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
var parameter = DataConverter.Deserialize<TEvent>(input);
OnEvent(context, name, parameter);
}
OnEvent() 是一个空实现。
public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
{
// do nothing
}
GetStatus() 方法
public abstract string GetStatus();
实现
在这个项目中除了 sample 和 test 外没有实现类。
在 samples\Correlation.Samples\HelloOrchestrator.cs 中有一个最简单的实现:
[KnownType(typeof(Hello))]
internal class HelloOrchestrator : TaskOrchestration<string, string>
{
public override async Task<string> RunTask(OrchestrationContext context, string input)
{
// await contextBase.ScheduleTask<string>(typeof(Hello), "world");
// if you pass an empty string it throws an error
return await context.ScheduleTask<string>(typeof(Hello), "world");
}
}
internal class Hello : TaskActivity<string, string>
{
protected override string Execute(TaskContext context, string input)
{
if (string.IsNullOrEmpty(input))
{
throw new ArgumentNullException(nameof(input));
}
Console.WriteLine($"Activity: Hello {input}");
return $"Hello, {input}!";
}
}
备注:这个实现和 Dapr workflow java sdk 中的定义最贴近,也最适合用来增加一个 getVersion() 方法来获取当前 worker 的版本,但是为什么 quickstart 中用的是静态方法?
TBD:请教一下 Chris。
1.5 - OrchestrationState
1.5.1 - OrchestrationState
src\DurableTask.Core\OrchestrationState.cs
Represents the state of an orchestration
类定义
OrchestrationState 是一个 DataContract :
[DataContract]
public class OrchestrationState : IExtensibleDataObject
{
}
定义有如下 DataMember:
| name | 类型 | 描述 |
|---|---|---|
| Name | string | 协调的名称 |
| Input | string | 协调的序列化输入 |
| Output | string | 协调的序列化输出 |
| OrchestrationInstance | OrchestrationInstance | 该状态代表的协调实例 |
| OrchestrationStatus | OrchestrationStatus | 当前协调状态 |
| Status | string | 字符串格式的当前协调状态 |
| ParentInstance | ParentInstance | 父实例,如果这个协调有 |
| Version | string | 协调版本 |
| Tags | IDictionary<string, string> | 与该协调相关的标记和字符串值字典 |
| Generation | int | 协调的代。重复使用的 instanceIds 将递增该值。 |
| CreatedTime | DateTime | 协调的创建时间 |
| ScheduledStartTime | DateTime | 开始协调的时间 |
| CompletedTime | DateTime | 协调完成时间 |
| LastUpdatedTime | DateTime | 协调的最后更新时间 |
| Size | long | 原始(未压缩)序列化运行时状态的大小 |
| CompressedSize | long | 压缩序列化运行时状态的大小 |
| FailureDetails | FailureDetails | 获取或设置与协调相关的故障详细信息。 |
OrchestrationState有 Version 字段定义,另外 OrchestrationState 的 OrchestrationInstance 字段也带有 version。
1.5.2 - OrchestrationStateQuery
src\DurableTask.Core\OrchestrationStateQuery.cs
Query class that can be used to filter results from the Orchestration instance store.
可用于从协调实例存储中过滤结果的查询类。
注意: 实例方法不是线程安全的。
类定义
public class OrchestrationStateQuery {
}
构造函数
public OrchestrationStateQuery()
{
FilterMap = new Dictionary<Type, OrchestrationStateQueryFilter>();
}
FilterMap()
public IDictionary<Type, OrchestrationStateQueryFilter> FilterMap { get; private set; }
GetFilters()
获取查询的 primary_filter、collection_of(secondary_filters)
public Tuple<OrchestrationStateQueryFilter, IEnumerable<OrchestrationStateQueryFilter>> GetFilters()
{
ICollection<OrchestrationStateQueryFilter> filters = FilterMap.Values;
if (filters.Count == 0)
{
return null;
}
var secondaryFilters = new List<OrchestrationStateQueryFilter>();
OrchestrationStateQueryFilter primaryFilter = filters.First();
int primaryFilterPrecedence = SafeGetFilterPrecedence(primaryFilter);
if (filters.Count > 1)
{
foreach (OrchestrationStateQueryFilter filter in filters)
{
int newPrecedence = SafeGetFilterPrecedence(filter);
if (newPrecedence > primaryFilterPrecedence)
{
secondaryFilters.Add(primaryFilter);
primaryFilter = filter;
primaryFilterPrecedence = newPrecedence;
}
else
{
secondaryFilters.Add(filter);
}
}
}
return new Tuple<OrchestrationStateQueryFilter, IEnumerable<OrchestrationStateQueryFilter>>(
primaryFilter, secondaryFilters);
}
1.5.3 - OrchestrationStateQueryFilter
src\DurableTask.Core\OrchestrationStateQueryFilter.cs
Abstract class for an orchestration state query filter
协调状态查询过滤器的抽象类
类定义
OrchestrationStateInstanceFilter 实现了 OrchestrationStateQueryFilter
public abstract class OrchestrationStateQueryFilter
{
}
这是一个空的抽象类。
它的实现有:
- OrchestrationStateInstanceFilter
- OrchestrationStateNameVersionFilter: 这个可以用来做版本过滤
1.5.4 - OrchestrationStateInstanceFilter
src\DurableTask.Core\OrchestrationStateInstanceFilter.cs
Filter for Orchestration instance filter
协调实例过滤器
类定义
OrchestrationStateInstanceFilter 实现了 OrchestrationStateQueryFilter
public class OrchestrationStateInstanceFilter : OrchestrationStateQueryFilter
{
构造函数
// 使用默认设置创建 OrchestrationStateInstanceFilter 的新实例
public OrchestrationStateInstanceFilter()
{
// default is exact match
StartsWith = false;
}
只定义有两个方法用来存取 InstanceId / ExecutionId 作为过滤器的匹配方式,另外 StartsWith 设置筛选器的匹配类型:
// 获取或设置过滤器的 InstanceId
public string InstanceId { get; set; }
// 获取或设置过滤器的 ExecutionId
public string ExecutionId { get; set; }
// 获取或设置筛选器的匹配类型:以开始或精确匹配开始
public bool StartsWith { get; set; }
1.5.5 - OrchestrationStateNameVersionFilter
src\DurableTask.Core\OrchestrationStateInstanceFilter.cs
Filter for Orchestration Name and Version
协调名称和版本过滤器
类定义
OrchestrationStateNameVersionFilter 实现了 OrchestrationStateQueryFilter
public class OrchestrationStateNameVersionFilter : OrchestrationStateQueryFilter
{
}
构造函数
// 使用默认设置创建 OrchestrationStateInstanceFilter 的新实例
public OrchestrationStateInstanceFilter()
{
// default is exact match
StartsWith = false;
}
只定义有两个方法用来存取 InstanceId / ExecutionId 作为过滤器的匹配方式,另外 StartsWith 设置筛选器的匹配类型:
public string Name { get; set; }
public string Version { get; set; }
1.6 - Entity
1.6.1 - TaskEntity
Abstract base class for entities
src\DurableTask.Core\Entities\TaskEntity.cs
TaskActivity 中定义了三个方法:
- ExecuteOperationBatchAsync()
ExecuteOperationBatchAsync() 方法
public abstract Task<EntityBatchResult> ExecuteOperationBatchAsync(EntityBatchRequest operations);
EnztityBatchRequest 类
A request for execution of a batch of operations on an entity.
- string InstanceId
- string EntityState
List<OperationRequest> Operations
OperationRequest 类
包含属性:
- string Operation
- Guid Id
- string Input
1.6.2 - EntityId
A unique identifier for an entity, consisting of entity name and entity key.
src\DurableTask.Core\Entities\EntityId.cs
EntityId 中定义以下属性:
- string Name
- string Key
1.7 - History
1.7.1 - History概述
介绍
以下介绍来自 README.md
Durable Task Framework History Events
以下是构成协调状态的一些常见历史事件。您可以在 DTFx 的 Azure Storage 和 MSSQL 存储后端的历史记录表中轻松查看这些事件。在使用 DTFx 代码、调试问题或创建直接读取历史记录的诊断工具(如 Durable Functions Monitor 项目)时,了解这些事件非常有用。
| Event Type | Description |
|---|---|
OrchestratorStarted |
协调器函数正在开始新的_执行/execution_。您将在历史记录中看到许多此类事件–每次协调器从 “等待 “状态恢复时都会出现一个。请注意,这并不意味着协调器首次启动–首次执行由 “ExecutionStarted “历史事件表示(见下文)。该事件的 timestamp 时间戳用于填充 CurrentDateTimeUtc 属性。 |
ExecutionStarted |
协调已开始首次执行。该事件包含协调器名称、输入内容和协调器的_scheduled_时间(可能早于历史记录中前面的 OrchestratorStarted事件)。这总是协调历史中的第二个事件。 |
TaskScheduled |
协调器调度了一项活动任务。该事件包括活动名称、输入和一个连续的 “EventId”,可用于将 “TaskScheduled " 事件与相应的 “TaskCompleted “或 “TaskFailed “事件关联起来。请注意,如果一个活动任务被重试,可能会生成多个 Task*** 事件。 |
TaskCompleted |
调度的任务活动已成功完成。TaskScheduledId 字段将与相应 TaskScheduled 事件的 “EventId” 字段匹配。 |
TaskFailed |
计划的任务活动以失败告终。TaskScheduledId 字段将与相应 “TaskScheduled” 事件的 “EventId” 字段匹配。 |
SubOrchestrationInstanceCreated |
协调器已调度子协调器。该事件包含已调度协调器的名称、实例 ID、输入和有序事件 ID,可用于将 SubOrchestrationInstanceCreated 事件与后续的 SubOrchestrationInstanceCompleted 或 SubOrchestrationInstanceFailed 历史事件关联起来。时间戳指的是调度子协调器的时间,它将早于开始执行的时间。请注意,如果一个活动任务被重试,可能会产生多个 SubOrchestrationInstance*** 事件。 |
SubOrchestrationInstanceCompleted |
调度的子协调器已成功完成。TaskScheduledId “字段将与相应 “SubOrchestrationInstanceCreated “事件的 “EventId “字段匹配。 |
SubOrchestrationInstanceFailed |
计划的子协调器已完成,但出现故障。TaskScheduledId 字段将与相应 SubOrchestrationInstanceCreated 事件的 EventId 字段匹配。 |
TimerCreated |
协调器安排了一个持久定时器。FireAt “属性包含定时器启动的日期。 |
TimerFired |
先前安排的持久定时器已启动。TimerId 字段将与相应 TimeCreated 事件的 EventId 字段匹配。 |
EventRaised |
协调(或持久实体中的实体)收到外部事件。该记录包含事件名称、有效载荷和事件_发送_的时间戳(应与历史事件实际被持久化的时间相同或更早)。 |
EventSent |
协调(或entity)向另一个协调(或entity)发送了单向消息。 |
ExecutionCompleted |
协调已完成。该事件包括协调的输出,不区分成功或失败。 |
ExecutionTerminated |
协调被 API 调用强制终止。该事件的时间戳表示计划终止的时间,而不一定是实际终止的时间。 |
OrchestratorCompleted |
协调器函数已等待并提交了任何副作用。您将在历史记录中看到许多此类事件–协调器每次等待时都会出现一个。请注意,这并不意味着协调器已经完成(完成由 ExecutionCompleted 或 ExecutionTerminated 表示)。 |
GenericEvent |
通用历史事件,有一个 Data 字段,但没有特定含义。这种历史事件并不常用。在某些情况下,该事件用于触发空闲协调的全新重放,例如在协调重绕之后。 |
HistoryStateEvent |
包含协调历史快照的历史事件。大多数现代后端类型都不使用这种事件类型。 |
1.7.2 - HistoryEvent事件
包含属性:
- int EventId
- EventType EventType
- bool IsPlayed
- DateTime Timestamp
- ExtensionDataObject ExtensionData
这个类也是其他 event 的父类。
1.7.3 - ExecutionStartedEvent事件
包含属性:
- string EventId
- string Input
- EventType EventType
- ParentInstance ParentInstance
- string Name
- string Version:可以复用
- IDictionary<string, string> Tags
- string Correlation
- DistributedTraceContext ParentTraceContext
- DateTime ScheduledStartTime
- int Generation
1.7.4 - OrchestratorStartedEvent事件
包含属性:
- string EventId
- EventType EventType
2 - DurableTask Dispatcher源码学习
2.1 - TaskOrchestrationDispatcher源码学习
Dispatcher for orchestrations to handle processing and renewing, completion of orchestration events
协调的调度程序,用于处理、更新和完成协调事件
src\DurableTask.Core\TaskOrchestrationDispatcher.cs
类定义
没任何借口或者基类:
public class TaskOrchestrationDispatcher{}
类字段
static readonly Task CompletedTask = Task.FromResult(0);
readonly INameVersionObjectManager<TaskOrchestration> objectManager;
readonly IOrchestrationService orchestrationService;
readonly WorkItemDispatcher<TaskOrchestrationWorkItem> dispatcher;
readonly DispatchMiddlewarePipeline dispatchPipeline;
readonly LogHelper logHelper;
ErrorPropagationMode errorPropagationMode;
readonly NonBlockingCountdownLock concurrentSessionLock;
readonly IEntityOrchestrationService? entityOrchestrationService;
readonly EntityBackendProperties? entityBackendProperties;
readonly TaskOrchestrationEntityParameters? entityParameters;
主要的功能在 objectManager , orchestrationService , dispatcher 和 DispatchMiddlewarePipeline。
构造函数
internal TaskOrchestrationDispatcher(
IOrchestrationService orchestrationService,
INameVersionObjectManager<TaskOrchestration> objectManager,
DispatchMiddlewarePipeline dispatchPipeline,
LogHelper logHelper,
ErrorPropagationMode errorPropagationMode)
{
// 这几个字段都是传递进来的
this.objectManager = objectManager ?? throw new ArgumentNullException(nameof(objectManager));
this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService));
this.dispatchPipeline = dispatchPipeline ?? throw new ArgumentNullException(nameof(dispatchPipeline));
this.logHelper = logHelper ?? throw new ArgumentNullException(nameof(logHelper));
this.errorPropagationMode = errorPropagationMode;
// orchestrationService 转为 IEntityOrchestrationService
this.entityOrchestrationService = orchestrationService as IEntityOrchestrationService;
this.entityBackendProperties = this.entityOrchestrationService?.EntityBackendProperties;
this.entityParameters = TaskOrchestrationEntityParameters.FromEntityBackendProperties(this.entityBackendProperties);
// 初始化 dispatcher
this.dispatcher = new WorkItemDispatcher<TaskOrchestrationWorkItem>(
"TaskOrchestrationDispatcher",
item => item == null ? string.Empty : item.InstanceId,
this.OnFetchWorkItemAsync,
this.OnProcessWorkItemSessionAsync)
{
// 初始化 dispatcher 的字段
GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
GetDelayInSecondsAfterOnProcessException = orchestrationService.GetDelayInSecondsAfterOnProcessException,
SafeReleaseWorkItem = orchestrationService.ReleaseTaskOrchestrationWorkItemAsync,
AbortWorkItem = orchestrationService.AbandonTaskOrchestrationWorkItemAsync,
DispatcherCount = orchestrationService.TaskOrchestrationDispatcherCount,
MaxConcurrentWorkItems = orchestrationService.MaxConcurrentTaskOrchestrationWorkItems,
LogHelper = logHelper,
};
// To avoid starvation, we only allow half of all concurrently execution orchestrations to
// leverage extended sessions.
var maxConcurrentSessions = (int)Math.Ceiling(this.dispatcher.MaxConcurrentWorkItems / 2.0);
this.concurrentSessionLock = new NonBlockingCountdownLock(maxConcurrentSessions);
}
StartAsync() 方法
Starts the dispatcher to start getting and processing orchestration events
启动调度程序,开始获取和处理协调事件
public async Task StartAsync()
{
await this.dispatcher.StartAsync();
}
OnFetchWorkItemAsync() 方法
Method to get the next work item to process within supplied timeout
在提供的超时时间内获取下一个要处理的工作项的方法
protected Task<TaskOrchestrationWorkItem> OnFetchWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
if (this.entityBackendProperties?.UseSeparateQueueForEntityWorkItems == true)
{
// only orchestrations should be served by this dispatcher, so we call
// the method which returns work items for orchestrations only.
// 该调度器只应为协调提供服务,因此我们调用这个方法,该方法只返回协调的工作项。
Console.WriteLine("OnFetchWorkItemAsync: UseSeparateQueueForEntityWorkItems == true");
Console.WriteLine("OnFetchWorkItemAsync: this.entityOrchestrationService=" + this.entityOrchestrationService?.GetType().FullName);
return this.entityOrchestrationService!.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
else
{
// both entities and orchestrations are served by this dispatcher,
// so we call the method that may return work items for either.
// 实体和编排都由该调度器提供服务,因此我们调用的方法可能会返回两者的工作项。
return this.orchestrationService.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
}
增加日志打印,看到:
OnFetchWorkItemAsync: UseSeparateQueueForEntityWorkItems == true
OnFetchWorkItemAsync: this.entityOrchestrationService=Microsoft.Azure.WebJobs.Extensions.DurableTask.AzureStorageDurabilityProvider
OnProcessWorkItemSessionAsync() 方法
async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
{
// DTFx history replay expects that ExecutionStarted comes before other events.
// If this is not already the case, due to a race-condition, we re-order the
// messages to enforce this expectation.
EnsureExecutionStartedIsFirst(workItem.NewMessages);
try
{
if (workItem.Session == null)
{
// Legacy behavior
await this.OnProcessWorkItemAsync(workItem);
return;
}
var isExtendedSession = false;
CorrelationTraceClient.Propagate(
() =>
{
// Check if it is extended session.
// TODO: Remove this code - it looks incorrect and dangerous
isExtendedSession = this.concurrentSessionLock.Acquire();
this.concurrentSessionLock.Release();
workItem.IsExtendedSession = isExtendedSession;
});
var processCount = 0;
try
{
while (true)
{
// If the provider provided work items, execute them.
if (workItem.NewMessages?.Count > 0)
{
bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem);
if (isCompletedOrInterrupted)
{
break;
}
processCount++;
}
// Fetches beyond the first require getting an extended session lock, used to prevent starvation.
if (processCount > 0 && !isExtendedSession)
{
isExtendedSession = this.concurrentSessionLock.Acquire();
if (!isExtendedSession)
{
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-MaxOperations", "Failed to acquire concurrent session lock.");
break;
}
}
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-StartFetch", "Starting fetch of existing session.");
Stopwatch timer = Stopwatch.StartNew();
// Wait for new messages to arrive for the session. This call is expected to block (asynchronously)
// until either new messages are available or until a provider-specific timeout has expired.
workItem.NewMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
if (workItem.NewMessages == null)
{
break;
}
TraceHelper.Trace(
TraceEventType.Verbose,
"OnProcessWorkItemSession-EndFetch",
$"Fetched {workItem.NewMessages.Count} new message(s) after {timer.ElapsedMilliseconds} ms from existing session.");
workItem.OrchestrationRuntimeState.NewEvents.Clear();
}
}
finally
{
if (isExtendedSession)
{
TraceHelper.Trace(
TraceEventType.Verbose,
"OnProcessWorkItemSession-Release",
$"Releasing extended session after {processCount} batch(es).");
this.concurrentSessionLock.Release();
}
}
}
catch (SessionAbortedException e)
{
// Either the orchestration or the orchestration service explicitly abandoned the session.
OrchestrationInstance instance = workItem.OrchestrationRuntimeState?.OrchestrationInstance ?? new OrchestrationInstance { InstanceId = workItem.InstanceId };
this.logHelper.OrchestrationAborted(instance, e.Message);
TraceHelper.TraceInstance(TraceEventType.Warning, "TaskOrchestrationDispatcher-ExecutionAborted", instance, "{0}", e.Message);
await this.orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
}
}
OnProcessWorkItemAsync()
protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
var messagesToSend = new List<TaskMessage>();
var timerMessages = new List<TaskMessage>();
var orchestratorMessages = new List<TaskMessage>();
var isCompleted = false;
var continuedAsNew = false;
var isInterrupted = false;
// correlation
CorrelationTraceClient.Propagate(() => CorrelationTraceContext.Current = workItem.TraceContext);
ExecutionStartedEvent? continueAsNewExecutionStarted = null;
TaskMessage? continuedAsNewMessage = null;
IList<HistoryEvent>? carryOverEvents = null;
string? carryOverStatus = null;
workItem.OrchestrationRuntimeState.LogHelper = this.logHelper;
OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
OrchestrationRuntimeState originalOrchestrationRuntimeState = runtimeState;
// Distributed tracing support: each orchestration execution is a trace activity
// that derives from an established parent trace context. It is expected that some
// listener will receive these events and publish them to a distributed trace logger.
ExecutionStartedEvent startEvent =
runtimeState.ExecutionStartedEvent ??
workItem.NewMessages.Select(msg => msg.Event).OfType<ExecutionStartedEvent>().FirstOrDefault();
Activity? traceActivity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent);
OrchestrationState? instanceState = null;
Task? renewTask = null;
using var renewCancellationTokenSource = new CancellationTokenSource();
if (workItem.LockedUntilUtc < DateTime.MaxValue)
{
// start a task to run RenewUntil
renewTask = Task.Factory.StartNew(
() => RenewUntil(workItem, this.orchestrationService, this.logHelper, nameof(TaskOrchestrationDispatcher), renewCancellationTokenSource.Token),
renewCancellationTokenSource.Token);
}
try
{
// Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper))
{
// TODO : mark an orchestration as faulted if there is data corruption
this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
TraceHelper.TraceSession(
TraceEventType.Error,
"TaskOrchestrationDispatcher-DeletedOrchestration",
runtimeState.OrchestrationInstance?.InstanceId!,
"Received work-item for an invalid orchestration");
isCompleted = true;
traceActivity?.Dispose();
}
else
{
do
{
continuedAsNew = false;
continuedAsNewMessage = null;
this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name);
TraceHelper.TraceInstance(
TraceEventType.Verbose,
"TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin",
runtimeState.OrchestrationInstance!,
"Executing user orchestration: {0}",
JsonDataConverter.Default.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true));
if (workItem.Cursor == null)
{
workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem);
}
else
{
await this.ResumeOrchestrationAsync(workItem);
}
IReadOnlyList<OrchestratorAction> decisions = workItem.Cursor.LatestDecisions.ToList();
this.logHelper.OrchestrationExecuted(
runtimeState.OrchestrationInstance!,
runtimeState.Name,
decisions);
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-ExecuteUserOrchestration-End",
runtimeState.OrchestrationInstance!,
"Executed user orchestration. Received {0} orchestrator actions: {1}",
decisions.Count,
string.Join(", ", decisions.Select(d => d.Id + ":" + d.OrchestratorActionType)));
// TODO: Exception handling for invalid decisions, which is increasingly likely
// when custom middleware is involved (e.g. out-of-process scenarios).
foreach (OrchestratorAction decision in decisions)
{
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-ProcessOrchestratorAction",
runtimeState.OrchestrationInstance!,
"Processing orchestrator action of type {0}",
decision.OrchestratorActionType);
switch (decision.OrchestratorActionType)
{
case OrchestratorActionType.ScheduleOrchestrator:
var scheduleTaskAction = (ScheduleTaskOrchestratorAction)decision;
var message = this.ProcessScheduleTaskDecision(
scheduleTaskAction,
runtimeState,
this.IncludeParameters,
traceActivity);
messagesToSend.Add(message);
break;
case OrchestratorActionType.CreateTimer:
var timerOrchestratorAction = (CreateTimerOrchestratorAction)decision;
timerMessages.Add(this.ProcessCreateTimerDecision(
timerOrchestratorAction,
runtimeState,
isInternal: false));
break;
case OrchestratorActionType.CreateSubOrchestration:
var createSubOrchestrationAction = (CreateSubOrchestrationAction)decision;
orchestratorMessages.Add(
this.ProcessCreateSubOrchestrationInstanceDecision(
createSubOrchestrationAction,
runtimeState,
this.IncludeParameters,
traceActivity));
break;
case OrchestratorActionType.SendEvent:
var sendEventAction = (SendEventOrchestratorAction)decision;
orchestratorMessages.Add(
this.ProcessSendEventDecision(sendEventAction, runtimeState));
break;
case OrchestratorActionType.OrchestrationComplete:
OrchestrationCompleteOrchestratorAction completeDecision = (OrchestrationCompleteOrchestratorAction)decision;
TaskMessage? workflowInstanceCompletedMessage =
this.ProcessWorkflowCompletedTaskDecision(completeDecision, runtimeState, this.IncludeDetails, out continuedAsNew);
if (workflowInstanceCompletedMessage != null)
{
// Send complete message to parent workflow or to itself to start a new execution
// Store the event so we can rebuild the state
carryOverEvents = null;
if (continuedAsNew)
{
continuedAsNewMessage = workflowInstanceCompletedMessage;
continueAsNewExecutionStarted = workflowInstanceCompletedMessage.Event as ExecutionStartedEvent;
if (completeDecision.CarryoverEvents.Any())
{
carryOverEvents = completeDecision.CarryoverEvents.ToList();
completeDecision.CarryoverEvents.Clear();
}
}
else
{
orchestratorMessages.Add(workflowInstanceCompletedMessage);
}
}
isCompleted = !continuedAsNew;
break;
default:
throw TraceHelper.TraceExceptionInstance(
TraceEventType.Error,
"TaskOrchestrationDispatcher-UnsupportedDecisionType",
runtimeState.OrchestrationInstance!,
new NotSupportedException($"Decision type '{decision.OrchestratorActionType}' not supported"));
}
// Underlying orchestration service provider may have a limit of messages per call, to avoid the situation
// we keep on asking the provider if message count is ok and stop processing new decisions if not.
//
// We also put in a fake timer to force next orchestration task for remaining messages
int totalMessages = messagesToSend.Count + orchestratorMessages.Count + timerMessages.Count;
if (this.orchestrationService.IsMaxMessageCountExceeded(totalMessages, runtimeState))
{
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-MaxMessageCountReached",
runtimeState.OrchestrationInstance!,
"MaxMessageCount reached. Adding timer to process remaining events in next attempt.");
if (isCompleted || continuedAsNew)
{
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-OrchestrationAlreadyCompleted",
runtimeState.OrchestrationInstance!,
"Orchestration already completed. Skip adding timer for splitting messages.");
break;
}
var dummyTimer = new CreateTimerOrchestratorAction
{
Id = FrameworkConstants.FakeTimerIdToSplitDecision,
FireAt = DateTime.UtcNow
};
timerMessages.Add(this.ProcessCreateTimerDecision(
dummyTimer,
runtimeState,
isInternal: true));
isInterrupted = true;
break;
}
}
// correlation
CorrelationTraceClient.Propagate(() =>
{
if (runtimeState.ExecutionStartedEvent != null)
runtimeState.ExecutionStartedEvent.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
});
// finish up processing of the work item
if (!continuedAsNew && runtimeState.Events.Last().EventType != EventType.OrchestratorCompleted)
{
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
}
if (isCompleted)
{
TraceHelper.TraceSession(TraceEventType.Information, "TaskOrchestrationDispatcher-DeletingSessionState", workItem.InstanceId, "Deleting session state");
if (runtimeState.ExecutionStartedEvent != null)
{
instanceState = Utils.BuildOrchestrationState(runtimeState);
}
}
else
{
if (continuedAsNew)
{
TraceHelper.TraceSession(
TraceEventType.Information,
"TaskOrchestrationDispatcher-UpdatingStateForContinuation",
workItem.InstanceId,
"Updating state for continuation");
// correlation
CorrelationTraceClient.Propagate(() =>
{
continueAsNewExecutionStarted!.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
});
// Copy the distributed trace context, if any
continueAsNewExecutionStarted!.SetParentTraceContext(runtimeState.ExecutionStartedEvent);
runtimeState = new OrchestrationRuntimeState();
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
runtimeState.AddEvent(continueAsNewExecutionStarted!);
runtimeState.Status = workItem.OrchestrationRuntimeState.Status ?? carryOverStatus;
carryOverStatus = workItem.OrchestrationRuntimeState.Status;
if (carryOverEvents != null)
{
foreach (var historyEvent in carryOverEvents)
{
runtimeState.AddEvent(historyEvent);
}
}
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
workItem.OrchestrationRuntimeState = runtimeState;
workItem.Cursor = null;
}
instanceState = Utils.BuildOrchestrationState(runtimeState);
}
} while (continuedAsNew);
}
}
finally
{
if (renewTask != null)
{
try
{
renewCancellationTokenSource.Cancel();
await renewTask;
}
catch (ObjectDisposedException)
{
// ignore
}
catch (OperationCanceledException)
{
// ignore
}
}
}
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
// some backends expect the original runtime state object
workItem.OrchestrationRuntimeState = originalOrchestrationRuntimeState;
}
runtimeState.Status = runtimeState.Status ?? carryOverStatus;
if (instanceState != null)
{
instanceState.Status = runtimeState.Status;
}
await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
workItem,
runtimeState,
continuedAsNew ? null : messagesToSend,
orchestratorMessages,
continuedAsNew ? null : timerMessages,
continuedAsNewMessage,
instanceState);
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
workItem.OrchestrationRuntimeState = runtimeState;
}
return isCompleted || continuedAsNew || isInterrupted;
}
StopAsync() 方法
Stops the dispatcher to stop getting and processing orchestration events
停止调度程序,以停止获取和处理协调事件
public async Task StopAsync(bool forced)
{
await this.dispatcher.StopAsync(forced);
}
NonBlockingCountdownLock
一个 CountdownLock 的实现:
internal class NonBlockingCountdownLock
{
int available;
public NonBlockingCountdownLock(int available)
{
if (available <= 0)
{
throw new ArgumentOutOfRangeException(nameof(available));
}
this.available = available;
this.Capacity = available;
}
public int Capacity { get; }
public bool Acquire()
{
if (this.available <= 0)
{
return false;
}
if (Interlocked.Decrement(ref this.available) >= 0)
{
return true;
}
// the counter went negative - fix it
Interlocked.Increment(ref this.available);
return false;
}
public void Release()
{
Interlocked.Increment(ref this.available);
}
}
2.2 - WorkItemDispatcher源码学习
Dispatcher class for fetching and processing work items of the supplied type
调度程序类,用于获取和处理所提供类型的工作项
src\DurableTask.Core\WorkItemDispatcher.cs
类定义
public class WorkItemDispatcher<T> : IDisposable
类字段
const int DefaultMaxConcurrentWorkItems = 20;
const int DefaultDispatcherCount = 1;
const int BackOffIntervalOnInvalidOperationSecs = 10;
const int CountDownToZeroDelay = 5;
// ReSharper disable once StaticMemberInGenericType
static readonly TimeSpan DefaultReceiveTimeout = TimeSpan.FromSeconds(30);
readonly string id;
readonly string name;
readonly object thisLock = new object();
readonly SemaphoreSlim initializationLock = new SemaphoreSlim(1, 1);
volatile int concurrentWorkItemCount;
volatile int countDownToZeroDelay;
volatile int delayOverrideSecs;
volatile int activeFetchers;
bool isStarted;
SemaphoreSlim concurrencyLock;
CancellationTokenSource shutdownCancellationTokenSource;
方法定义
readonly Func<T, string> workItemIdentifier;
Func<TimeSpan, CancellationToken, Task<T>> FetchWorkItem { get; }
Func<T, Task> ProcessWorkItem { get; }
/// <summary>
/// Method to execute for safely releasing a work item
/// </summary>
public Func<T, Task> SafeReleaseWorkItem;
/// <summary>
/// Method to execute for aborting a work item
/// </summary>
public Func<T, Task> AbortWorkItem;
/// <summary>
/// Method to get a delay to wait after a fetch exception
/// </summary>
public Func<Exception, int> GetDelayInSecondsAfterOnFetchException = (exception) => 0;
/// <summary>
/// Method to get a delay to wait after a process exception
/// </summary>
public Func<Exception, int> GetDelayInSecondsAfterOnProcessException = (exception) => 0;
构造函数
public WorkItemDispatcher(
string name,
Func<T, string> workItemIdentifier,
Func<TimeSpan, CancellationToken, Task<T>> fetchWorkItem,
Func<T, Task> processWorkItem)
{
this.name = name;
this.id = Guid.NewGuid().ToString("N");
this.workItemIdentifier = workItemIdentifier ?? throw new ArgumentNullException(nameof(workItemIdentifier));
this.FetchWorkItem = fetchWorkItem ?? throw new ArgumentNullException(nameof(fetchWorkItem));
this.ProcessWorkItem = processWorkItem ?? throw new ArgumentNullException(nameof(processWorkItem));
}
对照 TaskOrchestrationDispatcher 中的初始化代码:
internal TaskOrchestrationDispatcher(
IOrchestrationService orchestrationService,
INameVersionObjectManager<TaskOrchestration> objectManager,
DispatchMiddlewarePipeline dispatchPipeline,
LogHelper logHelper,
ErrorPropagationMode errorPropagationMode)
{
......
// 初始化 dispatcher
this.dispatcher = new WorkItemDispatcher<TaskOrchestrationWorkItem>(
"TaskOrchestrationDispatcher",
item => item == null ? string.Empty : item.InstanceId,
this.OnFetchWorkItemAsync,
this.OnProcessWorkItemSessionAsync)
{
// 初始化 dispatcher 的字段
GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
GetDelayInSecondsAfterOnProcessException = orchestrationService.GetDelayInSecondsAfterOnProcessException,
SafeReleaseWorkItem = orchestrationService.ReleaseTaskOrchestrationWorkItemAsync,
AbortWorkItem = orchestrationService.AbandonTaskOrchestrationWorkItemAsync,
DispatcherCount = orchestrationService.TaskOrchestrationDispatcherCount,
MaxConcurrentWorkItems = orchestrationService.MaxConcurrentTaskOrchestrationWorkItems,
LogHelper = logHelper,
};
......
}
Func<T, string> workItemIdentifier 的实现是:
item => item == null ? string.Empty : item.InstanceId,
fetchWorkItem 的实现是 TaskOrchestrationDispatcher.this.OnFetchWorkItemAsync,fetchWorkItem 的实现是 TaskOrchestrationDispatcher.this.OnProcessWorkItemSessionAsync,
DispatcherCount 由 orchestrationService.TaskOrchestrationDispatcherCount 指定,
MaxConcurrentWorkItems 由 orchestrationService.MaxConcurrentTaskOrchestrationWorkItems 指定。
StartAsync() 方法
Starts the work item dispatcher:
public async Task StartAsync()
{
if (!this.isStarted)
{
await this.initializationLock.WaitAsync();
try
{
if (this.isStarted)
{
throw TraceHelper.TraceException(TraceEventType.Error, "WorkItemDispatcherStart-AlreadyStarted", new InvalidOperationException($"WorkItemDispatcher '{this.name}' has already started"));
}
this.concurrencyLock?.Dispose();
this.concurrencyLock = new SemaphoreSlim(this.MaxConcurrentWorkItems);
this.shutdownCancellationTokenSource?.Dispose();
this.shutdownCancellationTokenSource = new CancellationTokenSource();
this.isStarted = true;
TraceHelper.Trace(TraceEventType.Information, "WorkItemDispatcherStart", $"WorkItemDispatcher('{this.name}') starting. Id {this.id}.");
for (var i = 0; i < this.DispatcherCount; i++)
{
string dispatcherId = i.ToString();
// 创建 context
var context = new WorkItemDispatcherContext(this.name, this.id, dispatcherId);
this.LogHelper.DispatcherStarting(context);
// We just want this to Run we intentionally don't wait
// 我们只是想让它运行起来,我们故意不等待
#pragma warning disable 4014
Task.Run(() => this.DispatchAsync(context));
#pragma warning restore 4014
}
}
finally
{
this.initializationLock.Release();
}
}
}
DispatchAsync() 方法
async Task DispatchAsync(WorkItemDispatcherContext context)
{
string dispatcherId = context.DispatcherId;
bool logThrottle = true;
while (this.isStarted)
{
if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
{
if (logThrottle)
{
// This can happen frequently under heavy load.
// To avoid log spam, we log just once until we can proceed.
this.LogHelper.FetchingThrottled(
context,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-MaxOperations",
this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));
logThrottle = false;
}
continue;
}
logThrottle = true;
var delaySecs = 0;
T workItem = default(T);
try
{
Interlocked.Increment(ref this.activeFetchers);
this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-StartFetch",
this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
Stopwatch timer = Stopwatch.StartNew();
// 在这里开始 fetch workitem
workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);
if (!IsNull(workItem))
{
string workItemId = this.workItemIdentifier(workItem);
this.LogHelper.FetchWorkItemCompleted(
context,
workItemId,
timer.Elapsed,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
}
TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-EndFetch",
this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
}
catch (TimeoutException)
{
delaySecs = 0;
}
catch (TaskCanceledException exception)
{
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherDispatch-TaskCanceledException",
this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
catch (Exception exception)
{
if (!this.isStarted)
{
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherDispatch-HarmlessException",
this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
}
else
{
this.LogHelper.FetchWorkItemFailure(context, exception);
// TODO : dump full node context here
TraceHelper.TraceException(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-Exception",
exception,
this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
}
finally
{
Interlocked.Decrement(ref this.activeFetchers);
}
// 然后开始调度 workitem
var scheduledWorkItem = false;
if (!IsNull(workItem))
{
if (!this.isStarted)
{
if (this.SafeReleaseWorkItem != null)
{
await this.SafeReleaseWorkItem(workItem);
}
}
else
{
Interlocked.Increment(ref this.concurrentWorkItemCount);
// We just want this to Run we intentionally don't wait
#pragma warning disable 4014
// 开始 workitem 的处理
Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
#pragma warning restore 4014
scheduledWorkItem = true;
}
}
delaySecs = Math.Max(this.delayOverrideSecs, delaySecs);
if (delaySecs > 0)
{
await Task.Delay(TimeSpan.FromSeconds(delaySecs));
}
if (!scheduledWorkItem)
{
this.concurrencyLock.Release();
}
}
this.LogHelper.DispatcherStopped(context);
}
这里的 FetchWorkItem() 方法和 ProcessWorkItemAsync() 方法都是由构造函数传递的(实际由 TaskOrchestrationDispatcher 传递)。
StopAsync() 方法
Stops the dispatcher to stop getting and processing orchestration events
停止调度程序,以停止获取和处理协调事件
public async Task StopAsync(bool forced)
{
await this.dispatcher.StopAsync(forced);
}
3 - DurableTask Orchestration源码学习
3.1 - IOrchestrationService
Orchestration Service interface for performing task hub management operations and handling orchestrations and work items’ state
协调服务接口,用于执行任务中心管理操作,处理协调和工作项状态
代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationService.cs
接口定义
IOrchestrationService 是一个接口:
public interface IOrchestrationService
{}
方法定义
Start
/// <summary>
/// Starts the service initializing the required resources
/// </summary>
Task StartAsync();
Stop
/// <summary>
/// Stops the orchestration service gracefully
/// </summary>
Task StopAsync();
/// <summary>
/// Stops the orchestration service with optional forced flag
/// </summary>
Task StopAsync(bool isForced);
Create
/// <summary>
/// Deletes and Creates the necessary resources for the orchestration service and the instance store
/// </summary>
Task CreateAsync();
/// <summary>
/// Deletes and Creates the necessary resources for the orchestration service and optionally the instance store
/// </summary>
Task CreateAsync(bool recreateInstanceStore);
/// <summary>
/// Creates the necessary resources for the orchestration service and the instance store
/// </summary>
Task CreateIfNotExistsAsync();
Delete
/// <summary>
/// Deletes the resources for the orchestration service and the instance store
/// </summary>
Task DeleteAsync();
/// <summary>
/// Deletes the resources for the orchestration service and optionally the instance store
/// </summary>
Task DeleteAsync(bool deleteInstanceStore);
LockNextTaskOrchestrationWorkItem
等待下一个协调工作项并返回协调工作项,这个方法是重点。
/// <summary>
/// Wait for the next orchestration work item and return the orchestration work item
/// </summary>
Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken);
LockNextTaskActivity
/// <summary>
/// Wait for an lock the next task activity to be processed
/// </summary>
Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken);
3.2 - IOrchestrationServiceClient
Interface to allow creation of new task orchestrations and query their status.
允许创建新任务协调和查询其状态的界面。
代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationServiceClient.cs
接口定义
IOrchestrationServiceClient 是一个接口:
public interface IOrchestrationServiceClient
{}
方法定义
CreateTaskOrchestration
/// <summary>
/// Creates a new orchestration
/// </summary>
/// <param name="creationMessage">Orchestration creation message</param>
/// <exception cref="OrchestrationAlreadyExistsException">Will throw an OrchestrationAlreadyExistsException exception If any orchestration with the same instance Id exists in the instance store.</exception>
/// <returns></returns>
Task CreateTaskOrchestrationAsync(TaskMessage creationMessage);
/// <summary>
/// Creates a new orchestration and specifies a subset of states which should be de duplicated on in the client side
/// </summary>
/// <param name="creationMessage">Orchestration creation message</param>
/// <param name="dedupeStatuses">States of previous orchestration executions to be considered while de-duping new orchestrations on the client</param>
/// <exception cref="OrchestrationAlreadyExistsException">Will throw an OrchestrationAlreadyExistsException exception If any orchestration with the same instance Id exists in the instance store and it has a status specified in dedupeStatuses.</exception>
/// <returns></returns>
Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses);
创建新的协调,并指定应在客户端去重复的状态子集
SendTaskOrchestrationMessage
为协调发送新信息
TBD: 这个方法不太理解在做什么?
/// <summary>
/// Send a new message for an orchestration
/// </summary>
/// <param name="message">Message to send</param>
/// <returns></returns>
Task SendTaskOrchestrationMessageAsync(TaskMessage message);
/// <summary>
/// Send a new set of messages for an orchestration
/// </summary>
/// <param name="messages">Messages to send</param>
/// <returns></returns>
Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages);
WaitForOrchestration
/// <summary>
/// Wait for an orchestration to reach any terminal state within the given timeout
/// </summary>
/// <param name="instanceId">Instance id of the orchestration</param>
/// <param name="executionId">Execution id of the orchestration</param>
/// <param name="timeout">Maximum amount of time to wait</param>
/// <param name="cancellationToken">Task cancellation token</param>
Task<OrchestrationState> WaitForOrchestrationAsync(
string instanceId,
string executionId,
TimeSpan timeout,
CancellationToken cancellationToken);
ForceTerminateTaskOrchestration
/// <summary>
/// Forcefully terminate the specified orchestration instance
/// </summary>
/// <param name="instanceId">Instance to terminate</param>
/// <param name="reason">Reason for termination</param>
Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason);
GetOrchestrationState
/// <summary>
/// Get a list of orchestration states from the instance storage for the most current execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="allExecutions">True if method should fetch all executions of the instance, false if the method should only fetch the most recent execution</param>
/// <returns>List of OrchestrationState objects that represents the list of orchestrations in the instance store</returns>
Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions);
/// <summary>
/// Get a list of orchestration states from the instance storage for the specified execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>The OrchestrationState of the specified instanceId or null if not found</returns>
Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId);
GetOrchestrationHistory
/// <summary>
/// Get a string dump of the execution history of the specified orchestration instance specified execution (generation) of the specified instance
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>String with formatted JSON representing the execution history</returns>
Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId);
PurgeOrchestrationHistory
清除超过指定阈值时间的协调实例状态和历史记录。还会清除 Blob 存储。
/// <summary>
/// Purges orchestration instance state and history for orchestrations older than the specified threshold time.
/// Also purges the blob storage.
/// </summary>
/// <param name="thresholdDateTimeUtc">Threshold date time in UTC</param>
/// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);
3.3 - IOrchestrationServiceInstanceStore
Instance Store provider interface to allow storage and lookup for orchestration state and event history
实例存储提供商接口,允许存储和查找协调状态和事件历史记录
代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationServiceInstanceStore.cs
接口定义
IOrchestrationServiceInstanceStore 是一个接口:
public interface IOrchestrationServiceInstanceStore
{}
方法定义
生命周期相关的方法
/// <summary>
/// Gets the maximum length a history entry can be so it can be truncated if necessary
/// </summary>
/// <returns>The maximum length</returns>
int MaxHistoryEntryLength { get; }
/// <summary>
/// Runs initialization to prepare the instance store for use
/// </summary>
/// <param name="recreate">Flag to indicate whether the store should be recreated.</param>
Task InitializeStoreAsync(bool recreate);
/// <summary>
/// Deletes instances instance store
/// </summary>
Task DeleteStoreAsync();
Entity 相关的方法
/// <summary>
/// Writes a list of history events to instance store
/// </summary>
/// <param name="entities">List of history events to write</param>
Task<object> WriteEntitiesAsync(IEnumerable<InstanceEntityBase> entities);
/// <summary>
/// Get a list of state events from instance store
/// </summary>
/// <param name="instanceId">The instance id to return state for</param>
/// <param name="executionId">The execution id to return state for</param>
/// <returns>The matching orchestration state or null if not found</returns>
Task<IEnumerable<OrchestrationStateInstanceEntity>> GetEntitiesAsync(string instanceId, string executionId);
/// <summary>
/// Deletes a list of history events from instance store
/// </summary>
/// <param name="entities">List of history events to delete</param>
Task<object> DeleteEntitiesAsync(IEnumerable<InstanceEntityBase> entities);
GetOrchestrationState
获取指定实例的协调状态列表
/// <summary>
/// Gets a list of orchestration states for a given instance
/// </summary>
/// <param name="instanceId">The instance id to return state for</param>
/// <param name="allInstances">Flag indication whether to get all history execution ids or just the most recent</param>
/// <returns>List of matching orchestration states</returns>
Task<IEnumerable<OrchestrationStateInstanceEntity>> GetOrchestrationStateAsync(string instanceId, bool allInstances);
/// <summary>
/// Gets the orchestration state for a given instance and execution id
/// </summary>
/// <param name="instanceId">The instance id to return state for</param>
/// <param name="executionId">The execution id to return state for</param>
/// <returns>The matching orchestration state or null if not found</returns>
Task<OrchestrationStateInstanceEntity> GetOrchestrationStateAsync(string instanceId, string executionId);
OrchestrationStateInstanceEntity 是这样的:
public class OrchestrationStateInstanceEntity : InstanceEntityBase
{
/// <summary>
/// The orchestration state for this instance entity
/// </summary>
public OrchestrationState State;
}
ForceTerminateTaskOrchestration
/// <summary>
/// Forcefully terminate the specified orchestration instance
/// </summary>
/// <param name="instanceId">Instance to terminate</param>
/// <param name="reason">Reason for termination</param>
Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason);
GetOrchestrationState
/// <summary>
/// Get a list of orchestration states from the instance storage for the most current execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="allExecutions">True if method should fetch all executions of the instance, false if the method should only fetch the most recent execution</param>
/// <returns>List of OrchestrationState objects that represents the list of orchestrations in the instance store</returns>
Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions);
/// <summary>
/// Get a list of orchestration states from the instance storage for the specified execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>The OrchestrationState of the specified instanceId or null if not found</returns>
Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId);
GetOrchestrationHistory
/// <summary>
/// Get a string dump of the execution history of the specified orchestration instance specified execution (generation) of the specified instance
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>String with formatted JSON representing the execution history</returns>
Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId);
PurgeOrchestrationHistory
清除超过指定阈值时间的协调实例状态和历史记录。还会清除 Blob 存储。
/// <summary>
/// Purges orchestration instance state and history for orchestrations older than the specified threshold time.
/// Also purges the blob storage.
/// </summary>
/// <param name="thresholdDateTimeUtc">Threshold date time in UTC</param>
/// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);
OrchestrationHistory 相关的方法
/// <summary>
/// Gets the list of history events for a given instance and execution id
/// </summary>
/// <param name="instanceId">The instance id to return history for</param>
/// <param name="executionId">The execution id to return history for</param>
/// <returns>List of history events</returns>
Task<IEnumerable<OrchestrationWorkItemInstanceEntity>> GetOrchestrationHistoryEventsAsync(string instanceId, string executionId);
/// <summary>
/// Purges history from storage for given time range
/// </summary>
/// <param name="thresholdDateTimeUtc">The datetime in UTC to use as the threshold for purging history</param>
/// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
/// <returns>The number of history events purged.</returns>
Task<int> PurgeOrchestrationHistoryEventsAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);
JumpStartEntities 相关的方法
/// <summary>
/// Writes a list of jump start events to instance store
/// </summary>
/// <param name="entities">List of jump start events to write</param>
Task<object> WriteJumpStartEntitiesAsync(IEnumerable<OrchestrationJumpStartInstanceEntity> entities);
/// <summary>
/// Deletes a list of jump start events from instance store
/// </summary>
/// <param name="entities">List of jump start events to delete</param>
Task<object> DeleteJumpStartEntitiesAsync(IEnumerable<OrchestrationJumpStartInstanceEntity> entities);
/// <summary>
/// Get a list of jump start events from instance store
/// </summary>
/// <returns>List of jump start events</returns>
Task<IEnumerable<OrchestrationJumpStartInstanceEntity>> GetJumpStartEntitiesAsync(int top);