核心编程模型
- 1: Orchestration
- 1.1: OrchestrationInstance
- 1.2: OrchestrationContext
- 1.3: OrchestrationState
- 2: Activity
- 2.1: TaskActivity
- 2.2: TaskContext
- 3: OrchestrationExecution
- 4: TaskOrchestration
- 4.1: TaskOrchestration
- 4.2: TaskOrchestrationContext
- 5: OrchestrationState
- 5.1: OrchestrationState
- 5.2: OrchestrationStateQuery
- 5.3: OrchestrationStateQueryFilter
- 5.4: OrchestrationStateInstanceFilter
- 5.5: OrchestrationStateNameVersionFilter
- 6: Entity
- 6.1: TaskEntity
- 6.2: EntityId
- 7: History
- 7.1: History概述
- 7.2: HistoryEvent事件
- 7.3: ExecutionStartedEvent事件
- 7.4: OrchestratorStartedEvent事件
1 - Orchestration
1.1 - OrchestrationInstance
src\DurableTask.Core\OrchestrationInstance.cs
OrchestrationInstance 中定义了几个属性:
- InstanceId
- ExecutionId()
[DataContract]
public class OrchestrationInstance : IExtensibleDataObject
{
/// <summary>
/// The instance id, assigned as unique to the orchestration
/// </summary>
[DataMember]
public string InstanceId { get; set; }
/// <summary>
/// The execution id, unique to the execution of this instance
/// </summary>
[DataMember]
public string ExecutionId { get; set; }
为了支持 versioning,准备修改 OrchestrationInstance,增加一个 InstanceVersion 字段,类型为 string
/// <summary>
/// The version of this orchestration instance
/// </summary>
[DataMember]
public string InstanceVersion { get; set; }
1.2 - OrchestrationContext
src\DurableTask.Core\OrchestrationContext.cs
Context for an orchestration containing the instance, replay status, orchestration methods and proxy methods
包含实例、重放状态、协调方法和代理方法的协调上下文
类定义
OrchestrationContext 是一个 抽象类:
public abstract class OrchestrationContext
{
}
实现类有 TaskOrchestrationContext。
OrchestrationContext 定义有以下 (public virtual) 方法:
- CreateClient()
- CreateClientV2()
- CreateRetryableClient()
- ScheduleWithRetry()
- ScheduleTask()
- CreateTimer()
- CreateSubOrchestrationInstance()
- CreateSubOrchestrationInstanceWithRetry()
- SendEvent()
- ContinueAsNew()
ScheduleTask()
/// <param name="version">Name of the orchestration as specified by the ObjectCreator</param>
public abstract Task<TResult> ScheduleTask<TResult>(string name, string version, params object[] parameters);
这里是有 version 概念的。
ScheduleWithRetry()
public virtual Task<T> ScheduleWithRetry<T>(Type taskActivityType, RetryOptions retryOptions,
params object[] parameters)
{
return ScheduleWithRetry<T>(NameVersionHelper.GetDefaultName(taskActivityType),
NameVersionHelper.GetDefaultVersion(taskActivityType),
retryOptions, parameters);
}
NameVersionHelper.GetDefaultVersion(taskActivityType) 目前没有实现,只是写死了返回空字符串:
public static string GetDefaultVersion(object obj)
{
return string.Empty;
}
之后调用带 version 参数的重载方法:
public virtual Task<T> ScheduleWithRetry<T>(string name, string version, RetryOptions retryOptions,
params object[] parameters)
{
Task<T> RetryCall() => ScheduleTask<T>(name, version, parameters);
var retryInterceptor = new RetryInterceptor<T>(this, retryOptions, RetryCall);
return retryInterceptor.Invoke();
}
还是调用到 ScheduleTask() 方法上了。
CreateTimer()
public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state);
public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken);
CreateSubOrchestrationInstance()
public virtual Task<T> CreateSubOrchestrationInstance<T>(Type orchestrationType, object input)
{
return CreateSubOrchestrationInstance<T>(NameVersionHelper.GetDefaultName(orchestrationType),
NameVersionHelper.GetDefaultVersion(orchestrationType), input);
}
public virtual Task<T> CreateSubOrchestrationInstance<T>(Type orchestrationType, string instanceId, object input)
{
return CreateSubOrchestrationInstance<T>(NameVersionHelper.GetDefaultName(orchestrationType),
NameVersionHelper.GetDefaultVersion(orchestrationType), instanceId, input);
}
public abstract Task<T> CreateSubOrchestrationInstance<T>(string name, string version, object input);
这个方法也是定义有 version 参数的,只是依然是没有被使用。
SendEvent()
public abstract void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData);
在 OrchestrationInstance 增加了 version 参数之后,这个方法也有 version 的概念了。
ContinueAsNew()
public abstract void ContinueAsNew(object input);
没有 version 的概念,最好在实现中重用当前 instance 的 version(如果有指定)。
CreateClient
Create a proxy client class to schedule remote TaskActivities via a strongly typed interface.
创建一个代理客户端类,通过强类型接口调度远程 TaskActivities。
public virtual T CreateClient<T>() where T : class
{
return CreateClient<T>(false);
}
public virtual T CreateClient<T>(bool useFullyQualifiedMethodNames) where T : class
{
return CreateClient<T>(() => new ScheduleProxy(this, useFullyQualifiedMethodNames));
}
private static T CreateClient<T>(Func<IInterceptor> createScheduleProxy) where T : class
{
if (!typeof(T).IsInterface && !typeof(T).IsClass)
{
throw new InvalidOperationException($"{nameof(T)} must be an interface or class.");
}
IInterceptor scheduleProxy = createScheduleProxy();
if (typeof(T).IsClass)
{
if (typeof(T).IsSealed)
{
throw new InvalidOperationException("Class cannot be sealed.");
}
return ProxyGenerator.CreateClassProxy<T>(scheduleProxy);
}
return ProxyGenerator.CreateInterfaceProxyWithoutTarget<T>(scheduleProxy);
}
这里没有 version 的概念。
1.3 - OrchestrationState
src\DurableTask.Core\OrchestrationState.cs
OrchestrationState 中定义了几个属性:
- CompletedTime
- CompressedSize
- CreatedTime
- Input
- LastUpdatedTime
- Name
- OrchestrationInstance: 包含 InstanceId 和 ExecutionId
- Output
- ParentInstance
- Size
- Status
- Tags
- Version: string 格式,看能否复用。
- Generation
- ScheduledStartTime
- FailureDetails
2 - Activity
2.1 - TaskActivity
src\DurableTask.Core\TaskActivity.cs
TaskActivity 中定义了三个方法:
- Run()
- RunAsync()
Run() 方法
public abstract string Run(TaskContext context, string input);
blocked for AsyncTaskActivity:
/// <summary>
/// Synchronous execute method, blocked for AsyncTaskActivity
/// </summary>
/// <returns>string.Empty</returns>
public override string Run(TaskContext context, string input)
{
// will never run
return string.Empty;
}
RunAsync() 方法
public virtual Task<string> RunAsync(TaskContext context, string input)
{
return Task.FromResult(Run(context, input));
}
会被覆盖为:
public override async Task<string> RunAsync(TaskContext context, string input)
{
TInput parameter = default(TInput);
var jArray = Utils.ConvertToJArray(input);
int parameterCount = jArray.Count;
if (parameterCount > 1)
{
throw new TaskFailureException(
"TaskActivity implementation cannot be invoked due to more than expected input parameters. Signature mismatch.");
}
if (parameterCount == 1)
{
JToken jToken = jArray[0];
if (jToken is JValue jValue)
{
parameter = jValue.ToObject<TInput>();
}
else
{
string serializedValue = jToken.ToString();
parameter = DataConverter.Deserialize<TInput>(serializedValue);
}
}
TResult result;
try
{
result = await ExecuteAsync(context, parameter);
}
catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
string details = null;
FailureDetails failureDetails = null;
if (context != null && context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
{
details = Utils.SerializeCause(e, DataConverter);
}
else
{
failureDetails = new FailureDetails(e);
}
throw new TaskFailureException(e.Message, e, details)
.WithFailureDetails(failureDetails);
}
string serializedResult = DataConverter.Serialize(result);
return serializedResult;
}
}
ExecuteAsync() 是一个abstract 方法:
protected abstract Task<TResult> ExecuteAsync(TaskContext context, TInput input);
GetStatus() 方法
public abstract string GetStatus();
2.2 - TaskContext
src\DurableTask.Core\TaskContext.cs
TaskActivity 中定义了以下属性
- OrchestrationInstance: 包含 InstanceId 和 InstanceId
- ErrorPropagationMode
3 - OrchestrationExecution
3.1 - OrchestrationExecutionContext
src\DurableTask.Core\OrchestrationExecutionContext.cs
Context associated with the orchestration being executed.
与正在执行的协调相关的上下文。
[DataContract]
public class OrchestrationExecutionContext
{
}
OrchestrationTags()
这个类就定义了一个 OrchestrationTags 方法:
/// <summary>
/// Gets the orchestration tags
/// </summary>
[DataMember]
public IDictionary<string, string> OrchestrationTags { get; internal set; }
3.2 - OrchestrationExecutionCursor
src\DurableTask.Core\OrchestrationExecutionCursor.cs
Context associated with the orchestration being executed.
与正在执行的协调相关的上下文。
类定义:
internal class OrchestrationExecutionCursor
{
}
构造函数:
public OrchestrationExecutionCursor(
OrchestrationRuntimeState state,
TaskOrchestration orchestration,
TaskOrchestrationExecutor executor,
IEnumerable<OrchestratorAction> latestDecisions)
{
RuntimeState = state;
TaskOrchestration = orchestration;
OrchestrationExecutor = executor;
LatestDecisions = latestDecisions;
}
get/set 方法:
public OrchestrationRuntimeState RuntimeState { get; }
public TaskOrchestration TaskOrchestration { get; }
public TaskOrchestrationExecutor OrchestrationExecutor { get; }
public IEnumerable<OrchestratorAction> LatestDecisions { get; set; }
就是一个值对象。
Cursor 游标体现在哪里?
4 - TaskOrchestration
4.1 - TaskOrchestration
src\DurableTask.Core\TaskOrchestration.cs
TaskOrchestration.cs 中定义了三个方法:
- Execute()
- RaiseEvent()
- GetStatus()
Execute() 方法
public abstract Task<string> Execute(OrchestrationContext context, string input);
方法实现为:
public override async Task<string> Execute(OrchestrationContext context, string input)
{
var parameter = DataConverter.Deserialize<TInput>(input);
TResult result;
try
{
result = await RunTask(context, parameter);
}
catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
string details = null;
FailureDetails failureDetails = null;
if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
{
details = Utils.SerializeCause(e, DataConverter);
}
else
{
failureDetails = new FailureDetails(e);
}
throw new OrchestrationFailureException(e.Message, details)
{
FailureDetails = failureDetails,
};
}
return DataConverter.Serialize(result);
}
RunTask() 方法是个抽象方法。
public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);
默认的 DataConverter 是 json:
public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
{
/// <summary>
/// Creates a new TaskOrchestration with the default DataConverter
/// </summary>
protected TaskOrchestration()
{
DataConverter = JsonDataConverter.Default;
}
/// <summary>
/// The DataConverter to use for input and output serialization/deserialization
/// </summary>
public DataConverter DataConverter { get; protected set; }
RaiseEvent() 方法
public abstract void RaiseEvent(OrchestrationContext context, string name, string input);
方法实现为:
public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
var parameter = DataConverter.Deserialize<TEvent>(input);
OnEvent(context, name, parameter);
}
OnEvent() 是一个空实现。
public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
{
// do nothing
}
GetStatus() 方法
public abstract string GetStatus();
实现
在这个项目中除了 sample 和 test 外没有实现类。
在 samples\Correlation.Samples\HelloOrchestrator.cs 中有一个最简单的实现:
[KnownType(typeof(Hello))]
internal class HelloOrchestrator : TaskOrchestration<string, string>
{
public override async Task<string> RunTask(OrchestrationContext context, string input)
{
// await contextBase.ScheduleTask<string>(typeof(Hello), "world");
// if you pass an empty string it throws an error
return await context.ScheduleTask<string>(typeof(Hello), "world");
}
}
internal class Hello : TaskActivity<string, string>
{
protected override string Execute(TaskContext context, string input)
{
if (string.IsNullOrEmpty(input))
{
throw new ArgumentNullException(nameof(input));
}
Console.WriteLine($"Activity: Hello {input}");
return $"Hello, {input}!";
}
}
备注:这个实现和 Dapr workflow java sdk 中的定义最贴近,也最适合用来增加一个 getVersion() 方法来获取当前 worker 的版本,但是为什么 quickstart 中用的是静态方法?
TBD:请教一下 Chris。
4.2 - TaskOrchestrationContext
src\DurableTask.Core\TaskOrchestrationContext.cs
类定义
TaskOrchestrationContext 继承自 OrchestrationContext:
internal class TaskOrchestrationContext : OrchestrationContext
{
}
Execute() 方法
public abstract Task<string> Execute(OrchestrationContext context, string input);
方法实现为:
public override async Task<string> Execute(OrchestrationContext context, string input)
{
var parameter = DataConverter.Deserialize<TInput>(input);
TResult result;
try
{
result = await RunTask(context, parameter);
}
catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
string details = null;
FailureDetails failureDetails = null;
if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
{
details = Utils.SerializeCause(e, DataConverter);
}
else
{
failureDetails = new FailureDetails(e);
}
throw new OrchestrationFailureException(e.Message, details)
{
FailureDetails = failureDetails,
};
}
return DataConverter.Serialize(result);
}
RunTask() 方法是个抽象方法。
public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);
默认的 DataConverter 是 json:
public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
{
/// <summary>
/// Creates a new TaskOrchestration with the default DataConverter
/// </summary>
protected TaskOrchestration()
{
DataConverter = JsonDataConverter.Default;
}
/// <summary>
/// The DataConverter to use for input and output serialization/deserialization
/// </summary>
public DataConverter DataConverter { get; protected set; }
RaiseEvent() 方法
public abstract void RaiseEvent(OrchestrationContext context, string name, string input);
方法实现为:
public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
var parameter = DataConverter.Deserialize<TEvent>(input);
OnEvent(context, name, parameter);
}
OnEvent() 是一个空实现。
public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
{
// do nothing
}
GetStatus() 方法
public abstract string GetStatus();
实现
在这个项目中除了 sample 和 test 外没有实现类。
在 samples\Correlation.Samples\HelloOrchestrator.cs 中有一个最简单的实现:
[KnownType(typeof(Hello))]
internal class HelloOrchestrator : TaskOrchestration<string, string>
{
public override async Task<string> RunTask(OrchestrationContext context, string input)
{
// await contextBase.ScheduleTask<string>(typeof(Hello), "world");
// if you pass an empty string it throws an error
return await context.ScheduleTask<string>(typeof(Hello), "world");
}
}
internal class Hello : TaskActivity<string, string>
{
protected override string Execute(TaskContext context, string input)
{
if (string.IsNullOrEmpty(input))
{
throw new ArgumentNullException(nameof(input));
}
Console.WriteLine($"Activity: Hello {input}");
return $"Hello, {input}!";
}
}
备注:这个实现和 Dapr workflow java sdk 中的定义最贴近,也最适合用来增加一个 getVersion() 方法来获取当前 worker 的版本,但是为什么 quickstart 中用的是静态方法?
TBD:请教一下 Chris。
5 - OrchestrationState
5.1 - OrchestrationState
src\DurableTask.Core\OrchestrationState.cs
Represents the state of an orchestration
类定义
OrchestrationState 是一个 DataContract :
[DataContract]
public class OrchestrationState : IExtensibleDataObject
{
}
定义有如下 DataMember:
| name | 类型 | 描述 |
|---|---|---|
| Name | string | 协调的名称 |
| Input | string | 协调的序列化输入 |
| Output | string | 协调的序列化输出 |
| OrchestrationInstance | OrchestrationInstance | 该状态代表的协调实例 |
| OrchestrationStatus | OrchestrationStatus | 当前协调状态 |
| Status | string | 字符串格式的当前协调状态 |
| ParentInstance | ParentInstance | 父实例,如果这个协调有 |
| Version | string | 协调版本 |
| Tags | IDictionary<string, string> | 与该协调相关的标记和字符串值字典 |
| Generation | int | 协调的代。重复使用的 instanceIds 将递增该值。 |
| CreatedTime | DateTime | 协调的创建时间 |
| ScheduledStartTime | DateTime | 开始协调的时间 |
| CompletedTime | DateTime | 协调完成时间 |
| LastUpdatedTime | DateTime | 协调的最后更新时间 |
| Size | long | 原始(未压缩)序列化运行时状态的大小 |
| CompressedSize | long | 压缩序列化运行时状态的大小 |
| FailureDetails | FailureDetails | 获取或设置与协调相关的故障详细信息。 |
OrchestrationState有 Version 字段定义,另外 OrchestrationState 的 OrchestrationInstance 字段也带有 version。
5.2 - OrchestrationStateQuery
src\DurableTask.Core\OrchestrationStateQuery.cs
Query class that can be used to filter results from the Orchestration instance store.
可用于从协调实例存储中过滤结果的查询类。
注意: 实例方法不是线程安全的。
类定义
public class OrchestrationStateQuery {
}
构造函数
public OrchestrationStateQuery()
{
FilterMap = new Dictionary<Type, OrchestrationStateQueryFilter>();
}
FilterMap()
public IDictionary<Type, OrchestrationStateQueryFilter> FilterMap { get; private set; }
GetFilters()
获取查询的 primary_filter、collection_of(secondary_filters)
public Tuple<OrchestrationStateQueryFilter, IEnumerable<OrchestrationStateQueryFilter>> GetFilters()
{
ICollection<OrchestrationStateQueryFilter> filters = FilterMap.Values;
if (filters.Count == 0)
{
return null;
}
var secondaryFilters = new List<OrchestrationStateQueryFilter>();
OrchestrationStateQueryFilter primaryFilter = filters.First();
int primaryFilterPrecedence = SafeGetFilterPrecedence(primaryFilter);
if (filters.Count > 1)
{
foreach (OrchestrationStateQueryFilter filter in filters)
{
int newPrecedence = SafeGetFilterPrecedence(filter);
if (newPrecedence > primaryFilterPrecedence)
{
secondaryFilters.Add(primaryFilter);
primaryFilter = filter;
primaryFilterPrecedence = newPrecedence;
}
else
{
secondaryFilters.Add(filter);
}
}
}
return new Tuple<OrchestrationStateQueryFilter, IEnumerable<OrchestrationStateQueryFilter>>(
primaryFilter, secondaryFilters);
}
5.3 - OrchestrationStateQueryFilter
src\DurableTask.Core\OrchestrationStateQueryFilter.cs
Abstract class for an orchestration state query filter
协调状态查询过滤器的抽象类
类定义
OrchestrationStateInstanceFilter 实现了 OrchestrationStateQueryFilter
public abstract class OrchestrationStateQueryFilter
{
}
这是一个空的抽象类。
它的实现有:
- OrchestrationStateInstanceFilter
- OrchestrationStateNameVersionFilter: 这个可以用来做版本过滤
5.4 - OrchestrationStateInstanceFilter
src\DurableTask.Core\OrchestrationStateInstanceFilter.cs
Filter for Orchestration instance filter
协调实例过滤器
类定义
OrchestrationStateInstanceFilter 实现了 OrchestrationStateQueryFilter
public class OrchestrationStateInstanceFilter : OrchestrationStateQueryFilter
{
构造函数
// 使用默认设置创建 OrchestrationStateInstanceFilter 的新实例
public OrchestrationStateInstanceFilter()
{
// default is exact match
StartsWith = false;
}
只定义有两个方法用来存取 InstanceId / ExecutionId 作为过滤器的匹配方式,另外 StartsWith 设置筛选器的匹配类型:
// 获取或设置过滤器的 InstanceId
public string InstanceId { get; set; }
// 获取或设置过滤器的 ExecutionId
public string ExecutionId { get; set; }
// 获取或设置筛选器的匹配类型:以开始或精确匹配开始
public bool StartsWith { get; set; }
5.5 - OrchestrationStateNameVersionFilter
src\DurableTask.Core\OrchestrationStateInstanceFilter.cs
Filter for Orchestration Name and Version
协调名称和版本过滤器
类定义
OrchestrationStateNameVersionFilter 实现了 OrchestrationStateQueryFilter
public class OrchestrationStateNameVersionFilter : OrchestrationStateQueryFilter
{
}
构造函数
// 使用默认设置创建 OrchestrationStateInstanceFilter 的新实例
public OrchestrationStateInstanceFilter()
{
// default is exact match
StartsWith = false;
}
只定义有两个方法用来存取 InstanceId / ExecutionId 作为过滤器的匹配方式,另外 StartsWith 设置筛选器的匹配类型:
public string Name { get; set; }
public string Version { get; set; }
6 - Entity
6.1 - TaskEntity
Abstract base class for entities
src\DurableTask.Core\Entities\TaskEntity.cs
TaskActivity 中定义了三个方法:
- ExecuteOperationBatchAsync()
ExecuteOperationBatchAsync() 方法
public abstract Task<EntityBatchResult> ExecuteOperationBatchAsync(EntityBatchRequest operations);
EnztityBatchRequest 类
A request for execution of a batch of operations on an entity.
- string InstanceId
- string EntityState
List<OperationRequest> Operations
OperationRequest 类
包含属性:
- string Operation
- Guid Id
- string Input
6.2 - EntityId
A unique identifier for an entity, consisting of entity name and entity key.
src\DurableTask.Core\Entities\EntityId.cs
EntityId 中定义以下属性:
- string Name
- string Key
7 - History
7.1 - History概述
介绍
以下介绍来自 README.md
Durable Task Framework History Events
以下是构成协调状态的一些常见历史事件。您可以在 DTFx 的 Azure Storage 和 MSSQL 存储后端的历史记录表中轻松查看这些事件。在使用 DTFx 代码、调试问题或创建直接读取历史记录的诊断工具(如 Durable Functions Monitor 项目)时,了解这些事件非常有用。
| Event Type | Description |
|---|---|
OrchestratorStarted |
协调器函数正在开始新的_执行/execution_。您将在历史记录中看到许多此类事件–每次协调器从 “等待 “状态恢复时都会出现一个。请注意,这并不意味着协调器首次启动–首次执行由 “ExecutionStarted “历史事件表示(见下文)。该事件的 timestamp 时间戳用于填充 CurrentDateTimeUtc 属性。 |
ExecutionStarted |
协调已开始首次执行。该事件包含协调器名称、输入内容和协调器的_scheduled_时间(可能早于历史记录中前面的 OrchestratorStarted事件)。这总是协调历史中的第二个事件。 |
TaskScheduled |
协调器调度了一项活动任务。该事件包括活动名称、输入和一个连续的 “EventId”,可用于将 “TaskScheduled " 事件与相应的 “TaskCompleted “或 “TaskFailed “事件关联起来。请注意,如果一个活动任务被重试,可能会生成多个 Task*** 事件。 |
TaskCompleted |
调度的任务活动已成功完成。TaskScheduledId 字段将与相应 TaskScheduled 事件的 “EventId” 字段匹配。 |
TaskFailed |
计划的任务活动以失败告终。TaskScheduledId 字段将与相应 “TaskScheduled” 事件的 “EventId” 字段匹配。 |
SubOrchestrationInstanceCreated |
协调器已调度子协调器。该事件包含已调度协调器的名称、实例 ID、输入和有序事件 ID,可用于将 SubOrchestrationInstanceCreated 事件与后续的 SubOrchestrationInstanceCompleted 或 SubOrchestrationInstanceFailed 历史事件关联起来。时间戳指的是调度子协调器的时间,它将早于开始执行的时间。请注意,如果一个活动任务被重试,可能会产生多个 SubOrchestrationInstance*** 事件。 |
SubOrchestrationInstanceCompleted |
调度的子协调器已成功完成。TaskScheduledId “字段将与相应 “SubOrchestrationInstanceCreated “事件的 “EventId “字段匹配。 |
SubOrchestrationInstanceFailed |
计划的子协调器已完成,但出现故障。TaskScheduledId 字段将与相应 SubOrchestrationInstanceCreated 事件的 EventId 字段匹配。 |
TimerCreated |
协调器安排了一个持久定时器。FireAt “属性包含定时器启动的日期。 |
TimerFired |
先前安排的持久定时器已启动。TimerId 字段将与相应 TimeCreated 事件的 EventId 字段匹配。 |
EventRaised |
协调(或持久实体中的实体)收到外部事件。该记录包含事件名称、有效载荷和事件_发送_的时间戳(应与历史事件实际被持久化的时间相同或更早)。 |
EventSent |
协调(或entity)向另一个协调(或entity)发送了单向消息。 |
ExecutionCompleted |
协调已完成。该事件包括协调的输出,不区分成功或失败。 |
ExecutionTerminated |
协调被 API 调用强制终止。该事件的时间戳表示计划终止的时间,而不一定是实际终止的时间。 |
OrchestratorCompleted |
协调器函数已等待并提交了任何副作用。您将在历史记录中看到许多此类事件–协调器每次等待时都会出现一个。请注意,这并不意味着协调器已经完成(完成由 ExecutionCompleted 或 ExecutionTerminated 表示)。 |
GenericEvent |
通用历史事件,有一个 Data 字段,但没有特定含义。这种历史事件并不常用。在某些情况下,该事件用于触发空闲协调的全新重放,例如在协调重绕之后。 |
HistoryStateEvent |
包含协调历史快照的历史事件。大多数现代后端类型都不使用这种事件类型。 |
7.2 - HistoryEvent事件
包含属性:
- int EventId
- EventType EventType
- bool IsPlayed
- DateTime Timestamp
- ExtensionDataObject ExtensionData
这个类也是其他 event 的父类。
7.3 - ExecutionStartedEvent事件
包含属性:
- string EventId
- string Input
- EventType EventType
- ParentInstance ParentInstance
- string Name
- string Version:可以复用
- IDictionary<string, string> Tags
- string Correlation
- DistributedTraceContext ParentTraceContext
- DateTime ScheduledStartTime
- int Generation
7.4 - OrchestratorStartedEvent事件
包含属性:
- string EventId
- EventType EventType