这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

DurableTask Orchestration源码学习

学习durabletask Orchestration的源码

1 - IOrchestrationService

IOrchestrationService 的源码分析

Orchestration Service interface for performing task hub management operations and handling orchestrations and work items’ state

协调服务接口,用于执行任务中心管理操作,处理协调和工作项状态

代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationService.cs

接口定义

IOrchestrationService 是一个接口:

    public interface IOrchestrationService
    {}

方法定义

Start

        /// <summary>
        /// Starts the service initializing the required resources
        /// </summary>
        Task StartAsync();

Stop

        /// <summary>
        /// Stops the orchestration service gracefully
        /// </summary>
        Task StopAsync();

        /// <summary>
        /// Stops the orchestration service with optional forced flag
        /// </summary>
        Task StopAsync(bool isForced);

Create

        /// <summary>
        /// Deletes and Creates the necessary resources for the orchestration service and the instance store
        /// </summary>
        Task CreateAsync();

        /// <summary>
        /// Deletes and Creates the necessary resources for the orchestration service and optionally the instance store
        /// </summary>
        Task CreateAsync(bool recreateInstanceStore);

        /// <summary>
        /// Creates the necessary resources for the orchestration service and the instance store
        /// </summary>
        Task CreateIfNotExistsAsync();

Delete

        /// <summary>
        /// Deletes the resources for the orchestration service and the instance store
        /// </summary>
        Task DeleteAsync();

        /// <summary>
        /// Deletes the resources for the orchestration service and optionally the instance store
        /// </summary>
        Task DeleteAsync(bool deleteInstanceStore);

LockNextTaskOrchestrationWorkItem

等待下一个协调工作项并返回协调工作项,这个方法是重点。

        /// <summary>
        ///     Wait for the next orchestration work item and return the orchestration work item
        /// </summary>
        Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken);

LockNextTaskActivity

        /// <summary>
        ///    Wait for an lock the next task activity to be processed 
        /// </summary>
        Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken);

2 - IOrchestrationServiceClient

IOrchestrationServiceClient 的源码分析

Interface to allow creation of new task orchestrations and query their status.

允许创建新任务协调和查询其状态的界面。

代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationServiceClient.cs

接口定义

IOrchestrationServiceClient 是一个接口:

    public interface IOrchestrationServiceClient
    {}

方法定义

CreateTaskOrchestration

        /// <summary>
        /// Creates a new orchestration
        /// </summary>
        /// <param name="creationMessage">Orchestration creation message</param>
        /// <exception cref="OrchestrationAlreadyExistsException">Will throw an OrchestrationAlreadyExistsException exception If any orchestration with the same instance Id exists in the instance store.</exception>
        /// <returns></returns>
        Task CreateTaskOrchestrationAsync(TaskMessage creationMessage);

        /// <summary>
        /// Creates a new orchestration and specifies a subset of states which should be de duplicated on in the client side
        /// </summary>
        /// <param name="creationMessage">Orchestration creation message</param>
        /// <param name="dedupeStatuses">States of previous orchestration executions to be considered while de-duping new orchestrations on the client</param>
        /// <exception cref="OrchestrationAlreadyExistsException">Will throw an OrchestrationAlreadyExistsException exception If any orchestration with the same instance Id exists in the instance store and it has a status specified in dedupeStatuses.</exception>
        /// <returns></returns>
        Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses);

创建新的协调,并指定应在客户端去重复的状态子集

SendTaskOrchestrationMessage

为协调发送新信息

TBD: 这个方法不太理解在做什么?

        /// <summary>
        /// Send a new message for an orchestration
        /// </summary>
        /// <param name="message">Message to send</param>
        /// <returns></returns>
        Task SendTaskOrchestrationMessageAsync(TaskMessage message);

        /// <summary>
        /// Send a new set of messages for an orchestration
        /// </summary>
        /// <param name="messages">Messages to send</param>
        /// <returns></returns>
        Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages);

WaitForOrchestration

        /// <summary>
        /// Wait for an orchestration to reach any terminal state within the given timeout
        /// </summary>
        /// <param name="instanceId">Instance id of the orchestration</param>
        /// <param name="executionId">Execution id of the orchestration</param>
        /// <param name="timeout">Maximum amount of time to wait</param>
        /// <param name="cancellationToken">Task cancellation token</param>
        Task<OrchestrationState> WaitForOrchestrationAsync(
            string instanceId, 
            string executionId,
            TimeSpan timeout, 
            CancellationToken cancellationToken);

ForceTerminateTaskOrchestration

        /// <summary>
        /// Forcefully terminate the specified orchestration instance
        /// </summary>
        /// <param name="instanceId">Instance to terminate</param>
        /// <param name="reason">Reason for termination</param>
        Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason);

GetOrchestrationState

        /// <summary>
        /// Get a list of orchestration states from the instance storage for the most current execution (generation) of the specified instance.
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="allExecutions">True if method should fetch all executions of the instance, false if the method should only fetch the most recent execution</param>
        /// <returns>List of OrchestrationState objects that represents the list of orchestrations in the instance store</returns>
        Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions);

        /// <summary>
        /// Get a list of orchestration states from the instance storage for the specified execution (generation) of the specified instance.
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="executionId">Execution id</param>
        /// <returns>The OrchestrationState of the specified instanceId or null if not found</returns>
        Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId);

GetOrchestrationHistory

        /// <summary>
        /// Get a string dump of the execution history of the specified orchestration instance specified execution (generation) of the specified instance
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="executionId">Execution id</param>
        /// <returns>String with formatted JSON representing the execution history</returns>
        Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId);

PurgeOrchestrationHistory

清除超过指定阈值时间的协调实例状态和历史记录。还会清除 Blob 存储。

        /// <summary>
        /// Purges orchestration instance state and history for orchestrations older than the specified threshold time.
        /// Also purges the blob storage.
        /// </summary>
        /// <param name="thresholdDateTimeUtc">Threshold date time in UTC</param>
        /// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
        Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);

3 - IOrchestrationServiceInstanceStore

IOrchestrationServiceInstanceStore 的源码分析

Instance Store provider interface to allow storage and lookup for orchestration state and event history

实例存储提供商接口,允许存储和查找协调状态和事件历史记录

代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationServiceInstanceStore.cs

接口定义

IOrchestrationServiceInstanceStore 是一个接口:

    public interface IOrchestrationServiceInstanceStore
    {}

方法定义

生命周期相关的方法

        /// <summary>
        /// Gets the maximum length a history entry can be so it can be truncated if necessary
        /// </summary>
        /// <returns>The maximum length</returns>
        int MaxHistoryEntryLength { get; }

        /// <summary>
        /// Runs initialization to prepare the instance store for use
        /// </summary>
        /// <param name="recreate">Flag to indicate whether the store should be recreated.</param>
        Task InitializeStoreAsync(bool recreate);

        /// <summary>
        /// Deletes instances instance store
        /// </summary>
        Task DeleteStoreAsync();

Entity 相关的方法

        /// <summary>
        /// Writes a list of history events to instance store
        /// </summary>
        /// <param name="entities">List of history events to write</param>
        Task<object> WriteEntitiesAsync(IEnumerable<InstanceEntityBase> entities);

        /// <summary>
        /// Get a list of state events from instance store
        /// </summary>
        /// <param name="instanceId">The instance id to return state for</param>
        /// <param name="executionId">The execution id to return state for</param>
        /// <returns>The matching orchestration state or null if not found</returns>
        Task<IEnumerable<OrchestrationStateInstanceEntity>> GetEntitiesAsync(string instanceId, string executionId);

        /// <summary>
        /// Deletes a list of history events from instance store
        /// </summary>
        /// <param name="entities">List of history events to delete</param>
        Task<object> DeleteEntitiesAsync(IEnumerable<InstanceEntityBase> entities);

GetOrchestrationState

获取指定实例的协调状态列表

        /// <summary>
        /// Gets a list of orchestration states for a given instance
        /// </summary>
        /// <param name="instanceId">The instance id to return state for</param>
        /// <param name="allInstances">Flag indication whether to get all history execution ids or just the most recent</param>
        /// <returns>List of matching orchestration states</returns>
        Task<IEnumerable<OrchestrationStateInstanceEntity>> GetOrchestrationStateAsync(string instanceId, bool allInstances);

        /// <summary>
        /// Gets the orchestration state for a given instance and execution id
        /// </summary>
        /// <param name="instanceId">The instance id to return state for</param>
        /// <param name="executionId">The execution id to return state for</param>
        /// <returns>The matching orchestration state or null if not found</returns>
        Task<OrchestrationStateInstanceEntity> GetOrchestrationStateAsync(string instanceId, string executionId);

OrchestrationStateInstanceEntity 是这样的:

    public class OrchestrationStateInstanceEntity : InstanceEntityBase
    {
        /// <summary>
        /// The orchestration state for this instance entity
        /// </summary>
        public OrchestrationState State;
    }

ForceTerminateTaskOrchestration

        /// <summary>
        /// Forcefully terminate the specified orchestration instance
        /// </summary>
        /// <param name="instanceId">Instance to terminate</param>
        /// <param name="reason">Reason for termination</param>
        Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason);

GetOrchestrationState

        /// <summary>
        /// Get a list of orchestration states from the instance storage for the most current execution (generation) of the specified instance.
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="allExecutions">True if method should fetch all executions of the instance, false if the method should only fetch the most recent execution</param>
        /// <returns>List of OrchestrationState objects that represents the list of orchestrations in the instance store</returns>
        Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions);

        /// <summary>
        /// Get a list of orchestration states from the instance storage for the specified execution (generation) of the specified instance.
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="executionId">Execution id</param>
        /// <returns>The OrchestrationState of the specified instanceId or null if not found</returns>
        Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId);

GetOrchestrationHistory

        /// <summary>
        /// Get a string dump of the execution history of the specified orchestration instance specified execution (generation) of the specified instance
        /// </summary>
        /// <param name="instanceId">Instance id</param>
        /// <param name="executionId">Execution id</param>
        /// <returns>String with formatted JSON representing the execution history</returns>
        Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId);

PurgeOrchestrationHistory

清除超过指定阈值时间的协调实例状态和历史记录。还会清除 Blob 存储。

        /// <summary>
        /// Purges orchestration instance state and history for orchestrations older than the specified threshold time.
        /// Also purges the blob storage.
        /// </summary>
        /// <param name="thresholdDateTimeUtc">Threshold date time in UTC</param>
        /// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
        Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);

OrchestrationHistory 相关的方法


        /// <summary>
        /// Gets the list of history events for a given instance and execution id
        /// </summary>
        /// <param name="instanceId">The instance id to return history for</param>
        /// <param name="executionId">The execution id to return history for</param>
        /// <returns>List of history events</returns>
        Task<IEnumerable<OrchestrationWorkItemInstanceEntity>> GetOrchestrationHistoryEventsAsync(string instanceId, string executionId);

        /// <summary>
        /// Purges history from storage for given time range
        /// </summary>
        /// <param name="thresholdDateTimeUtc">The datetime in UTC to use as the threshold for purging history</param>
        /// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
        /// <returns>The number of history events purged.</returns>
        Task<int> PurgeOrchestrationHistoryEventsAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);

JumpStartEntities 相关的方法

        /// <summary>
        /// Writes a list of jump start events to instance store
        /// </summary>
        /// <param name="entities">List of jump start events to write</param>
        Task<object> WriteJumpStartEntitiesAsync(IEnumerable<OrchestrationJumpStartInstanceEntity> entities);

        /// <summary>
        /// Deletes a list of jump start events from instance store
        /// </summary>
        /// <param name="entities">List of jump start events to delete</param>
        Task<object> DeleteJumpStartEntitiesAsync(IEnumerable<OrchestrationJumpStartInstanceEntity> entities);

        /// <summary>
        /// Get a list of jump start events from instance store
        /// </summary>
        /// <returns>List of jump start events</returns>
        Task<IEnumerable<OrchestrationJumpStartInstanceEntity>> GetJumpStartEntitiesAsync(int top);