1 - Orchestration

核心编程模型的 Orchestration 定义

1.1 - OrchestrationInstance

核心编程模型之 OrchestrationInstance

src\DurableTask.Core\OrchestrationInstance.cs

OrchestrationInstance 中定义了几个属性:

  • InstanceId
  • ExecutionId()
    [DataContract]
    public class OrchestrationInstance : IExtensibleDataObject
    {
        /// <summary>
        /// The instance id, assigned as unique to the orchestration
        /// </summary>
        [DataMember]
        public string InstanceId { get; set; }

        /// <summary>
        /// The execution id, unique to the execution of this instance
        /// </summary>
        [DataMember]
        public string ExecutionId { get; set; }

为了支持 versioning,准备修改 OrchestrationInstance,增加一个 InstanceVersion 字段,类型为 string

        /// <summary>
        /// The version of this orchestration instance
        /// </summary>
        [DataMember]
        public string InstanceVersion { get; set; }

1.2 - OrchestrationContext

核心编程模型之 OrchestrationContext

src\DurableTask.Core\OrchestrationContext.cs

Context for an orchestration containing the instance, replay status, orchestration methods and proxy methods

包含实例、重放状态、协调方法和代理方法的协调上下文

类定义

OrchestrationContext 是一个 抽象类:

    public abstract class OrchestrationContext
    {
    }

实现类有 TaskOrchestrationContext。

OrchestrationContext 定义有以下 (public virtual) 方法:

  • CreateClient()
  • CreateClientV2()
  • CreateRetryableClient()
  • ScheduleWithRetry()
  • ScheduleTask()
  • CreateTimer()
  • CreateSubOrchestrationInstance()
  • CreateSubOrchestrationInstanceWithRetry()
  • SendEvent()
  • ContinueAsNew()

ScheduleTask()

        /// <param name="version">Name of the orchestration as specified by the ObjectCreator</param>
public abstract Task<TResult> ScheduleTask<TResult>(string name, string version, params object[] parameters);

这里是有 version 概念的。

ScheduleWithRetry()

        public virtual Task<T> ScheduleWithRetry<T>(Type taskActivityType, RetryOptions retryOptions,
            params object[] parameters)
        {
            return ScheduleWithRetry<T>(NameVersionHelper.GetDefaultName(taskActivityType),
                NameVersionHelper.GetDefaultVersion(taskActivityType),
                retryOptions, parameters);
        }

NameVersionHelper.GetDefaultVersion(taskActivityType) 目前没有实现,只是写死了返回空字符串:

        public static string GetDefaultVersion(object obj)
        {
            return string.Empty;
        }

之后调用带 version 参数的重载方法:

        public virtual Task<T> ScheduleWithRetry<T>(string name, string version, RetryOptions retryOptions,
            params object[] parameters)
        {
            Task<T> RetryCall() => ScheduleTask<T>(name, version, parameters);
            var retryInterceptor = new RetryInterceptor<T>(this, retryOptions, RetryCall);
            return retryInterceptor.Invoke();
        }

还是调用到 ScheduleTask() 方法上了。

CreateTimer()

        public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state);

        public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken);

CreateSubOrchestrationInstance()

        public virtual Task<T> CreateSubOrchestrationInstance<T>(Type orchestrationType, object input)
        {
            return CreateSubOrchestrationInstance<T>(NameVersionHelper.GetDefaultName(orchestrationType),
                NameVersionHelper.GetDefaultVersion(orchestrationType), input);
        }

        public virtual Task<T> CreateSubOrchestrationInstance<T>(Type orchestrationType, string instanceId, object input)
        {
            return CreateSubOrchestrationInstance<T>(NameVersionHelper.GetDefaultName(orchestrationType),
                NameVersionHelper.GetDefaultVersion(orchestrationType), instanceId, input);
        }

        public abstract Task<T> CreateSubOrchestrationInstance<T>(string name, string version, object input);

这个方法也是定义有 version 参数的,只是依然是没有被使用。

SendEvent()

         public abstract void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData);

在 OrchestrationInstance 增加了 version 参数之后,这个方法也有 version 的概念了。

ContinueAsNew()

public abstract void ContinueAsNew(object input);

没有 version 的概念,最好在实现中重用当前 instance 的 version(如果有指定)。

CreateClient

Create a proxy client class to schedule remote TaskActivities via a strongly typed interface.

创建一个代理客户端类,通过强类型接口调度远程 TaskActivities。

        public virtual T CreateClient<T>() where T : class
        {
            return CreateClient<T>(false);
        }

        public virtual T CreateClient<T>(bool useFullyQualifiedMethodNames) where T : class
        {
            return CreateClient<T>(() => new ScheduleProxy(this, useFullyQualifiedMethodNames));
        }

				private static T CreateClient<T>(Func<IInterceptor> createScheduleProxy) where T : class
        {
            if (!typeof(T).IsInterface && !typeof(T).IsClass)
            {
                throw new InvalidOperationException($"{nameof(T)} must be an interface or class.");
            }

            IInterceptor scheduleProxy = createScheduleProxy();

            if (typeof(T).IsClass)
            {
                if (typeof(T).IsSealed)
                {
                    throw new InvalidOperationException("Class cannot be sealed.");
                }

                return ProxyGenerator.CreateClassProxy<T>(scheduleProxy);
            }

            return ProxyGenerator.CreateInterfaceProxyWithoutTarget<T>(scheduleProxy);
        }

这里没有 version 的概念。

1.3 - OrchestrationState

核心编程模型之 OrchestrationState

src\DurableTask.Core\OrchestrationState.cs

OrchestrationState 中定义了几个属性:

  • CompletedTime
  • CompressedSize
  • CreatedTime
  • Input
  • LastUpdatedTime
  • Name
  • OrchestrationInstance: 包含 InstanceId 和 ExecutionId
  • Output
  • ParentInstance
  • Size
  • Status
  • Tags
  • Version: string  格式,看能否复用。
  • Generation
  • ScheduledStartTime
  • FailureDetails

2 - Activity

核心编程模型的 Activity 定义

2.1 - TaskActivity

核心编程模型之 TaskActivity

src\DurableTask.Core\TaskActivity.cs

TaskActivity 中定义了三个方法:

  • Run()
  • RunAsync()

Run() 方法

public abstract string Run(TaskContext context, string input);

blocked for AsyncTaskActivity:

        /// <summary>
        /// Synchronous execute method, blocked for AsyncTaskActivity
        /// </summary>
        /// <returns>string.Empty</returns>
        public override string Run(TaskContext context, string input)
        {
            // will never run
            return string.Empty;
        }

RunAsync() 方法

        public virtual Task<string> RunAsync(TaskContext context, string input)
        {
            return Task.FromResult(Run(context, input));
        }

会被覆盖为:

public override async Task<string> RunAsync(TaskContext context, string input)
        {
            TInput parameter = default(TInput);

            var jArray = Utils.ConvertToJArray(input);

            int parameterCount = jArray.Count;
            if (parameterCount > 1)
            {
                throw new TaskFailureException(
                    "TaskActivity implementation cannot be invoked due to more than expected input parameters.  Signature mismatch.");
            }
            
            if (parameterCount == 1)
            {
                JToken jToken = jArray[0];
                if (jToken is JValue jValue)
                {
                    parameter = jValue.ToObject<TInput>();
                }
                else
                {
                    string serializedValue = jToken.ToString();
                    parameter = DataConverter.Deserialize<TInput>(serializedValue);
                }
            }

            TResult result;
            try
            {
                result = await ExecuteAsync(context, parameter);
            }
            catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
            {
                string details = null;
                FailureDetails failureDetails = null;
                if (context != null && context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
                {
                    details = Utils.SerializeCause(e, DataConverter);
                }
                else
                {
                    failureDetails = new FailureDetails(e);
                }

                throw new TaskFailureException(e.Message, e, details)
                    .WithFailureDetails(failureDetails);
            }

            string serializedResult = DataConverter.Serialize(result);
            return serializedResult;
        }
    }

ExecuteAsync() 是一个abstract 方法:

        protected abstract Task<TResult> ExecuteAsync(TaskContext context, TInput input);

GetStatus() 方法

public abstract string GetStatus();

2.2 - TaskContext

核心编程模型之 TaskContext

src\DurableTask.Core\TaskContext.cs

TaskActivity 中定义了以下属性

  • OrchestrationInstance: 包含 InstanceId 和 InstanceId
  • ErrorPropagationMode

3 - OrchestrationExecution

核心编程模型的 OrchestrationExecution 定义

3.1 - OrchestrationExecutionContext

OrchestrationExecutionContext

src\DurableTask.Core\OrchestrationExecutionContext.cs

Context associated with the orchestration being executed.

与正在执行的协调相关的上下文。

    [DataContract]
    public class OrchestrationExecutionContext
    {
    }

OrchestrationTags()

这个类就定义了一个 OrchestrationTags 方法:


        /// <summary>
        /// Gets the orchestration tags
        /// </summary>
        [DataMember]
        public IDictionary<string, string> OrchestrationTags { get; internal set; }

3.2 - OrchestrationExecutionCursor

OrchestrationExecutionCursor

src\DurableTask.Core\OrchestrationExecutionCursor.cs

Context associated with the orchestration being executed.

与正在执行的协调相关的上下文。

类定义:

    internal class OrchestrationExecutionCursor
    {
    }

构造函数:

        public OrchestrationExecutionCursor(
            OrchestrationRuntimeState state,
            TaskOrchestration orchestration,
            TaskOrchestrationExecutor executor,
            IEnumerable<OrchestratorAction> latestDecisions)
        {
            RuntimeState = state;
            TaskOrchestration = orchestration;
            OrchestrationExecutor = executor;
            LatestDecisions = latestDecisions;
        }

get/set 方法:

        public OrchestrationRuntimeState RuntimeState { get; }

        public TaskOrchestration TaskOrchestration { get; }

        public TaskOrchestrationExecutor OrchestrationExecutor { get; }

        public IEnumerable<OrchestratorAction> LatestDecisions { get; set; }

就是一个值对象。

Cursor 游标体现在哪里?

4 - TaskOrchestration

核心编程模型的 worker 端的 TaskOrchestration 定义

4.1 - TaskOrchestration

核心编程模型之 TaskOrchestration

src\DurableTask.Core\TaskOrchestration.cs

TaskOrchestration.cs 中定义了三个方法:

  • Execute()
  • RaiseEvent()
  • GetStatus()

Execute() 方法

public abstract Task<string> Execute(OrchestrationContext context, string input);

方法实现为:

public override async Task<string> Execute(OrchestrationContext context, string input)
        {
            var parameter = DataConverter.Deserialize<TInput>(input);
            TResult result;
            try
            {
                result = await RunTask(context, parameter);
            }
            catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
            {
                string details = null;
                FailureDetails failureDetails = null;
                if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
                {
                    details = Utils.SerializeCause(e, DataConverter);
                }
                else
                {
                    failureDetails = new FailureDetails(e);
                }

                throw new OrchestrationFailureException(e.Message, details)
                {
                    FailureDetails = failureDetails,
                };
            }

            return DataConverter.Serialize(result);
        }

RunTask() 方法是个抽象方法。

public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);

默认的 DataConverter 是 json:

    public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
    {
        /// <summary>
        /// Creates a new TaskOrchestration with the default DataConverter
        /// </summary>
        protected TaskOrchestration()
        {
            DataConverter = JsonDataConverter.Default;
        }

        /// <summary>
        /// The DataConverter to use for input and output serialization/deserialization
        /// </summary>
        public DataConverter DataConverter { get; protected set; }

RaiseEvent() 方法

public abstract void RaiseEvent(OrchestrationContext context, string name, string input);

方法实现为:

public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
  var parameter = DataConverter.Deserialize<TEvent>(input);
  OnEvent(context, name, parameter);
}

OnEvent() 是一个空实现。

        public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
        {
            // do nothing
        }

GetStatus() 方法

public abstract string GetStatus();

实现

在这个项目中除了 sample 和 test 外没有实现类。

samples\Correlation.Samples\HelloOrchestrator.cs 中有一个最简单的实现:

[KnownType(typeof(Hello))]
    internal class HelloOrchestrator : TaskOrchestration<string, string>
    {
        public override async Task<string> RunTask(OrchestrationContext context, string input)
        {
          //  await contextBase.ScheduleTask<string>(typeof(Hello), "world");
          //   if you pass an empty string it throws an error
            return await context.ScheduleTask<string>(typeof(Hello), "world");
        }
    }

    internal class Hello : TaskActivity<string, string>
    {
        protected override string Execute(TaskContext context, string input)
        {
            if (string.IsNullOrEmpty(input))
            {
                throw new ArgumentNullException(nameof(input));
            }

            Console.WriteLine($"Activity: Hello {input}");
            return $"Hello, {input}!";
        }
    }

备注:这个实现和 Dapr workflow java sdk 中的定义最贴近,也最适合用来增加一个 getVersion() 方法来获取当前 worker 的版本,但是为什么 quickstart 中用的是静态方法?

TBD:请教一下 Chris。

4.2 - TaskOrchestrationContext

TaskOrchestrationContext

src\DurableTask.Core\TaskOrchestrationContext.cs

类定义

TaskOrchestrationContext 继承自 OrchestrationContext:

    internal class TaskOrchestrationContext : OrchestrationContext
    {
    }

Execute() 方法

public abstract Task<string> Execute(OrchestrationContext context, string input);

方法实现为:

public override async Task<string> Execute(OrchestrationContext context, string input)
        {
            var parameter = DataConverter.Deserialize<TInput>(input);
            TResult result;
            try
            {
                result = await RunTask(context, parameter);
            }
            catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
            {
                string details = null;
                FailureDetails failureDetails = null;
                if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
                {
                    details = Utils.SerializeCause(e, DataConverter);
                }
                else
                {
                    failureDetails = new FailureDetails(e);
                }

                throw new OrchestrationFailureException(e.Message, details)
                {
                    FailureDetails = failureDetails,
                };
            }

            return DataConverter.Serialize(result);
        }

RunTask() 方法是个抽象方法。

public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);

默认的 DataConverter 是 json:

    public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
    {
        /// <summary>
        /// Creates a new TaskOrchestration with the default DataConverter
        /// </summary>
        protected TaskOrchestration()
        {
            DataConverter = JsonDataConverter.Default;
        }

        /// <summary>
        /// The DataConverter to use for input and output serialization/deserialization
        /// </summary>
        public DataConverter DataConverter { get; protected set; }

RaiseEvent() 方法

public abstract void RaiseEvent(OrchestrationContext context, string name, string input);

方法实现为:

public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
  var parameter = DataConverter.Deserialize<TEvent>(input);
  OnEvent(context, name, parameter);
}

OnEvent() 是一个空实现。

        public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
        {
            // do nothing
        }

GetStatus() 方法

public abstract string GetStatus();

实现

在这个项目中除了 sample 和 test 外没有实现类。

samples\Correlation.Samples\HelloOrchestrator.cs 中有一个最简单的实现:

[KnownType(typeof(Hello))]
    internal class HelloOrchestrator : TaskOrchestration<string, string>
    {
        public override async Task<string> RunTask(OrchestrationContext context, string input)
        {
          //  await contextBase.ScheduleTask<string>(typeof(Hello), "world");
          //   if you pass an empty string it throws an error
            return await context.ScheduleTask<string>(typeof(Hello), "world");
        }
    }

    internal class Hello : TaskActivity<string, string>
    {
        protected override string Execute(TaskContext context, string input)
        {
            if (string.IsNullOrEmpty(input))
            {
                throw new ArgumentNullException(nameof(input));
            }

            Console.WriteLine($"Activity: Hello {input}");
            return $"Hello, {input}!";
        }
    }

备注:这个实现和 Dapr workflow java sdk 中的定义最贴近,也最适合用来增加一个 getVersion() 方法来获取当前 worker 的版本,但是为什么 quickstart 中用的是静态方法?

TBD:请教一下 Chris。

5 - OrchestrationState

核心编程模型的 OrchestrationState 定义

5.1 - OrchestrationState

核心编程模型的 OrchestrationState 定义

src\DurableTask.Core\OrchestrationState.cs

Represents the state of an orchestration

类定义

OrchestrationState 是一个 DataContract :

    [DataContract]
    public class OrchestrationState : IExtensibleDataObject
    {
    }

定义有如下 DataMember:

name 类型 描述
Name string 协调的名称
Input string 协调的序列化输入
Output string 协调的序列化输出
OrchestrationInstance OrchestrationInstance 该状态代表的协调实例
OrchestrationStatus OrchestrationStatus 当前协调状态
Status string 字符串格式的当前协调状态
ParentInstance ParentInstance 父实例,如果这个协调有
Version string 协调版本
Tags IDictionary<string, string> 与该协调相关的标记和字符串值字典
Generation int 协调的代。重复使用的 instanceIds 将递增该值。
CreatedTime DateTime 协调的创建时间
ScheduledStartTime DateTime 开始协调的时间
CompletedTime DateTime 协调完成时间
LastUpdatedTime DateTime 协调的最后更新时间
Size long 原始(未压缩)序列化运行时状态的大小
CompressedSize long 压缩序列化运行时状态的大小
FailureDetails FailureDetails 获取或设置与协调相关的故障详细信息。

OrchestrationState有 Version 字段定义,另外 OrchestrationState 的 OrchestrationInstance 字段也带有 version。

5.2 - OrchestrationStateQuery

OrchestrationStateQuery

src\DurableTask.Core\OrchestrationStateQuery.cs

Query class that can be used to filter results from the Orchestration instance store.

可用于从协调实例存储中过滤结果的查询类。

注意: 实例方法不是线程安全的。

类定义

 public class OrchestrationStateQuery    {
    }

构造函数

        public OrchestrationStateQuery()
        {
            FilterMap = new Dictionary<Type, OrchestrationStateQueryFilter>();
        }

FilterMap()

        public IDictionary<Type, OrchestrationStateQueryFilter> FilterMap { get; private set; }

GetFilters()

获取查询的 primary_filter、collection_of(secondary_filters)

public Tuple<OrchestrationStateQueryFilter, IEnumerable<OrchestrationStateQueryFilter>> GetFilters()
        {
            ICollection<OrchestrationStateQueryFilter> filters = FilterMap.Values;
            if (filters.Count == 0)
            {
                return null;
            }

            var secondaryFilters = new List<OrchestrationStateQueryFilter>();

            OrchestrationStateQueryFilter primaryFilter = filters.First();
            int primaryFilterPrecedence = SafeGetFilterPrecedence(primaryFilter);

            if (filters.Count > 1)
            {
                foreach (OrchestrationStateQueryFilter filter in filters)
                {
                    int newPrecedence = SafeGetFilterPrecedence(filter);
                    if (newPrecedence > primaryFilterPrecedence)
                    {
                        secondaryFilters.Add(primaryFilter);

                        primaryFilter = filter;
                        primaryFilterPrecedence = newPrecedence;
                    }
                    else
                    {
                        secondaryFilters.Add(filter);
                    }
                }
            }

            return new Tuple<OrchestrationStateQueryFilter, IEnumerable<OrchestrationStateQueryFilter>>(
                primaryFilter, secondaryFilters);
        }

5.3 - OrchestrationStateQueryFilter

OrchestrationStateQueryFilter

src\DurableTask.Core\OrchestrationStateQueryFilter.cs

Abstract class for an orchestration state query filter

协调状态查询过滤器的抽象类

类定义

OrchestrationStateInstanceFilter 实现了 OrchestrationStateQueryFilter

    public abstract class OrchestrationStateQueryFilter
    {
    }

这是一个空的抽象类。

它的实现有:

  • OrchestrationStateInstanceFilter
  • OrchestrationStateNameVersionFilter: 这个可以用来做版本过滤

5.4 - OrchestrationStateInstanceFilter

OrchestrationStateInstanceFilter

src\DurableTask.Core\OrchestrationStateInstanceFilter.cs

Filter for Orchestration instance filter

协调实例过滤器

类定义

OrchestrationStateInstanceFilter 实现了 OrchestrationStateQueryFilter

    public class OrchestrationStateInstanceFilter : OrchestrationStateQueryFilter
    {

构造函数

        // 使用默认设置创建 OrchestrationStateInstanceFilter 的新实例
				public OrchestrationStateInstanceFilter()
        {
            // default is exact match
            StartsWith = false;
        }

只定义有两个方法用来存取 InstanceId / ExecutionId 作为过滤器的匹配方式,另外 StartsWith 设置筛选器的匹配类型:

				// 获取或设置过滤器的 InstanceId
        public string InstanceId { get; set; }

				// 获取或设置过滤器的 ExecutionId
        public string ExecutionId { get; set; }

				// 获取或设置筛选器的匹配类型:以开始或精确匹配开始
        public bool StartsWith { get; set; }

5.5 - OrchestrationStateNameVersionFilter

OrchestrationStateNameVersionFilter

src\DurableTask.Core\OrchestrationStateInstanceFilter.cs

Filter for Orchestration Name and Version

协调名称和版本过滤器

类定义

OrchestrationStateNameVersionFilter 实现了 OrchestrationStateQueryFilter

    public class OrchestrationStateNameVersionFilter : OrchestrationStateQueryFilter
    {
    }

构造函数

        // 使用默认设置创建 OrchestrationStateInstanceFilter 的新实例
				public OrchestrationStateInstanceFilter()
        {
            // default is exact match
            StartsWith = false;
        }

只定义有两个方法用来存取 InstanceId / ExecutionId 作为过滤器的匹配方式,另外 StartsWith 设置筛选器的匹配类型:


        public string Name { get; set; }

        public string Version { get; set; }

6 - Entity

核心编程模型的 Entity 定义

6.1 - TaskEntity

核心编程模型之 TaskEntity

Abstract base class for entities

src\DurableTask.Core\Entities\TaskEntity.cs

TaskActivity 中定义了三个方法:

  • ExecuteOperationBatchAsync()

ExecuteOperationBatchAsync() 方法

public abstract Task<EntityBatchResult> ExecuteOperationBatchAsync(EntityBatchRequest operations);

EnztityBatchRequest 类

A request for execution of a batch of operations on an entity.

  • string InstanceId
  • string EntityState
  • List<OperationRequest> Operations

OperationRequest 类

包含属性:

  • string Operation
  • Guid Id
  • string Input

6.2 - EntityId

核心编程模型之 EntityId

A unique identifier for an entity, consisting of entity name and entity key.

src\DurableTask.Core\Entities\EntityId.cs

EntityId 中定义以下属性:

  • string Name
  • string Key

7 - History

核心编程模型的 History event 定义

7.1 - History概述

核心编程模型的 History event 概述

介绍

以下介绍来自 README.md

Durable Task Framework History Events

以下是构成协调状态的一些常见历史事件。您可以在 DTFx 的 Azure StorageMSSQL 存储后端的历史记录表中轻松查看这些事件。在使用 DTFx 代码、调试问题或创建直接读取历史记录的诊断工具(如 Durable Functions Monitor 项目)时,了解这些事件非常有用。

Event Type Description
OrchestratorStarted 协调器函数正在开始新的_执行/execution_。您将在历史记录中看到许多此类事件–每次协调器从 “等待 “状态恢复时都会出现一个。请注意,这并不意味着协调器首次启动–首次执行由 “ExecutionStarted “历史事件表示(见下文)。该事件的 timestamp 时间戳用于填充 CurrentDateTimeUtc 属性。
ExecutionStarted 协调已开始首次执行。该事件包含协调器名称、输入内容和协调器的_scheduled_时间(可能早于历史记录中前面的 OrchestratorStarted事件)。这总是协调历史中的第二个事件。
TaskScheduled 协调器调度了一项活动任务。该事件包括活动名称、输入和一个连续的 “EventId”,可用于将 “TaskScheduled " 事件与相应的 “TaskCompleted “或 “TaskFailed “事件关联起来。请注意,如果一个活动任务被重试,可能会生成多个 Task*** 事件。
TaskCompleted 调度的任务活动已成功完成。TaskScheduledId 字段将与相应 TaskScheduled 事件的 “EventId” 字段匹配。
TaskFailed 计划的任务活动以失败告终。TaskScheduledId 字段将与相应 “TaskScheduled” 事件的 “EventId” 字段匹配。
SubOrchestrationInstanceCreated 协调器已调度子协调器。该事件包含已调度协调器的名称、实例 ID、输入和有序事件 ID,可用于将 SubOrchestrationInstanceCreated 事件与后续的 SubOrchestrationInstanceCompleted 或 SubOrchestrationInstanceFailed 历史事件关联起来。时间戳指的是调度子协调器的时间,它将早于开始执行的时间。请注意,如果一个活动任务被重试,可能会产生多个 SubOrchestrationInstance*** 事件。
SubOrchestrationInstanceCompleted 调度的子协调器已成功完成。TaskScheduledId “字段将与相应 “SubOrchestrationInstanceCreated “事件的 “EventId “字段匹配。
SubOrchestrationInstanceFailed 计划的子协调器已完成,但出现故障。TaskScheduledId 字段将与相应 SubOrchestrationInstanceCreated 事件的 EventId 字段匹配。
TimerCreated 协调器安排了一个持久定时器。FireAt “属性包含定时器启动的日期。
TimerFired 先前安排的持久定时器已启动。TimerId 字段将与相应 TimeCreated 事件的 EventId 字段匹配。
EventRaised 协调(或持久实体中的实体)收到外部事件。该记录包含事件名称、有效载荷和事件_发送_的时间戳(应与历史事件实际被持久化的时间相同或更早)。
EventSent 协调(或entity)向另一个协调(或entity)发送了单向消息。
ExecutionCompleted 协调已完成。该事件包括协调的输出,不区分成功或失败。
ExecutionTerminated 协调被 API 调用强制终止。该事件的时间戳表示计划终止的时间,而不一定是实际终止的时间。
OrchestratorCompleted 协调器函数已等待并提交了任何副作用。您将在历史记录中看到许多此类事件–协调器每次等待时都会出现一个。请注意,这并不意味着协调器已经完成(完成由 ExecutionCompletedExecutionTerminated 表示)。
GenericEvent 通用历史事件,有一个 Data 字段,但没有特定含义。这种历史事件并不常用。在某些情况下,该事件用于触发空闲协调的全新重放,例如在协调重绕之后。
HistoryStateEvent 包含协调历史快照的历史事件。大多数现代后端类型都不使用这种事件类型。

7.2 - HistoryEvent事件

核心编程模型的 HistoryEvent 事件

包含属性:

  • int EventId
  • EventType EventType
  • bool IsPlayed
  • DateTime Timestamp
  • ExtensionDataObject ExtensionData

这个类也是其他 event 的父类。

7.3 - ExecutionStartedEvent事件

核心编程模型的 ExecutionStartedEvent 事件

包含属性:

  • string EventId
  • string Input
  • EventType EventType
  • ParentInstance ParentInstance
  • string Name
  • string Version:可以复用
  • IDictionary<string, string> Tags
  • string Correlation
  • DistributedTraceContext ParentTraceContext
  • DateTime ScheduledStartTime
  • int Generation

7.4 - OrchestratorStartedEvent事件

核心编程模型的 OrchestratorStartedEvent 事件

包含属性:

  • string EventId
  • EventType EventType