DurableTask Orchestration源码学习
1 - IOrchestrationService
Orchestration Service interface for performing task hub management operations and handling orchestrations and work items’ state
协调服务接口,用于执行任务中心管理操作,处理协调和工作项状态
代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationService.cs
接口定义
IOrchestrationService 是一个接口:
public interface IOrchestrationService
{}
方法定义
Start
/// <summary>
/// Starts the service initializing the required resources
/// </summary>
Task StartAsync();
Stop
/// <summary>
/// Stops the orchestration service gracefully
/// </summary>
Task StopAsync();
/// <summary>
/// Stops the orchestration service with optional forced flag
/// </summary>
Task StopAsync(bool isForced);
Create
/// <summary>
/// Deletes and Creates the necessary resources for the orchestration service and the instance store
/// </summary>
Task CreateAsync();
/// <summary>
/// Deletes and Creates the necessary resources for the orchestration service and optionally the instance store
/// </summary>
Task CreateAsync(bool recreateInstanceStore);
/// <summary>
/// Creates the necessary resources for the orchestration service and the instance store
/// </summary>
Task CreateIfNotExistsAsync();
Delete
/// <summary>
/// Deletes the resources for the orchestration service and the instance store
/// </summary>
Task DeleteAsync();
/// <summary>
/// Deletes the resources for the orchestration service and optionally the instance store
/// </summary>
Task DeleteAsync(bool deleteInstanceStore);
LockNextTaskOrchestrationWorkItem
等待下一个协调工作项并返回协调工作项,这个方法是重点。
/// <summary>
/// Wait for the next orchestration work item and return the orchestration work item
/// </summary>
Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken);
LockNextTaskActivity
/// <summary>
/// Wait for an lock the next task activity to be processed
/// </summary>
Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken);
2 - IOrchestrationServiceClient
Interface to allow creation of new task orchestrations and query their status.
允许创建新任务协调和查询其状态的界面。
代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationServiceClient.cs
接口定义
IOrchestrationServiceClient 是一个接口:
public interface IOrchestrationServiceClient
{}
方法定义
CreateTaskOrchestration
/// <summary>
/// Creates a new orchestration
/// </summary>
/// <param name="creationMessage">Orchestration creation message</param>
/// <exception cref="OrchestrationAlreadyExistsException">Will throw an OrchestrationAlreadyExistsException exception If any orchestration with the same instance Id exists in the instance store.</exception>
/// <returns></returns>
Task CreateTaskOrchestrationAsync(TaskMessage creationMessage);
/// <summary>
/// Creates a new orchestration and specifies a subset of states which should be de duplicated on in the client side
/// </summary>
/// <param name="creationMessage">Orchestration creation message</param>
/// <param name="dedupeStatuses">States of previous orchestration executions to be considered while de-duping new orchestrations on the client</param>
/// <exception cref="OrchestrationAlreadyExistsException">Will throw an OrchestrationAlreadyExistsException exception If any orchestration with the same instance Id exists in the instance store and it has a status specified in dedupeStatuses.</exception>
/// <returns></returns>
Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses);
创建新的协调,并指定应在客户端去重复的状态子集
SendTaskOrchestrationMessage
为协调发送新信息
TBD: 这个方法不太理解在做什么?
/// <summary>
/// Send a new message for an orchestration
/// </summary>
/// <param name="message">Message to send</param>
/// <returns></returns>
Task SendTaskOrchestrationMessageAsync(TaskMessage message);
/// <summary>
/// Send a new set of messages for an orchestration
/// </summary>
/// <param name="messages">Messages to send</param>
/// <returns></returns>
Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages);
WaitForOrchestration
/// <summary>
/// Wait for an orchestration to reach any terminal state within the given timeout
/// </summary>
/// <param name="instanceId">Instance id of the orchestration</param>
/// <param name="executionId">Execution id of the orchestration</param>
/// <param name="timeout">Maximum amount of time to wait</param>
/// <param name="cancellationToken">Task cancellation token</param>
Task<OrchestrationState> WaitForOrchestrationAsync(
string instanceId,
string executionId,
TimeSpan timeout,
CancellationToken cancellationToken);
ForceTerminateTaskOrchestration
/// <summary>
/// Forcefully terminate the specified orchestration instance
/// </summary>
/// <param name="instanceId">Instance to terminate</param>
/// <param name="reason">Reason for termination</param>
Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason);
GetOrchestrationState
/// <summary>
/// Get a list of orchestration states from the instance storage for the most current execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="allExecutions">True if method should fetch all executions of the instance, false if the method should only fetch the most recent execution</param>
/// <returns>List of OrchestrationState objects that represents the list of orchestrations in the instance store</returns>
Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions);
/// <summary>
/// Get a list of orchestration states from the instance storage for the specified execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>The OrchestrationState of the specified instanceId or null if not found</returns>
Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId);
GetOrchestrationHistory
/// <summary>
/// Get a string dump of the execution history of the specified orchestration instance specified execution (generation) of the specified instance
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>String with formatted JSON representing the execution history</returns>
Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId);
PurgeOrchestrationHistory
清除超过指定阈值时间的协调实例状态和历史记录。还会清除 Blob 存储。
/// <summary>
/// Purges orchestration instance state and history for orchestrations older than the specified threshold time.
/// Also purges the blob storage.
/// </summary>
/// <param name="thresholdDateTimeUtc">Threshold date time in UTC</param>
/// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);
3 - IOrchestrationServiceInstanceStore
Instance Store provider interface to allow storage and lookup for orchestration state and event history
实例存储提供商接口,允许存储和查找协调状态和事件历史记录
代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationServiceInstanceStore.cs
接口定义
IOrchestrationServiceInstanceStore 是一个接口:
public interface IOrchestrationServiceInstanceStore
{}
方法定义
生命周期相关的方法
/// <summary>
/// Gets the maximum length a history entry can be so it can be truncated if necessary
/// </summary>
/// <returns>The maximum length</returns>
int MaxHistoryEntryLength { get; }
/// <summary>
/// Runs initialization to prepare the instance store for use
/// </summary>
/// <param name="recreate">Flag to indicate whether the store should be recreated.</param>
Task InitializeStoreAsync(bool recreate);
/// <summary>
/// Deletes instances instance store
/// </summary>
Task DeleteStoreAsync();
Entity 相关的方法
/// <summary>
/// Writes a list of history events to instance store
/// </summary>
/// <param name="entities">List of history events to write</param>
Task<object> WriteEntitiesAsync(IEnumerable<InstanceEntityBase> entities);
/// <summary>
/// Get a list of state events from instance store
/// </summary>
/// <param name="instanceId">The instance id to return state for</param>
/// <param name="executionId">The execution id to return state for</param>
/// <returns>The matching orchestration state or null if not found</returns>
Task<IEnumerable<OrchestrationStateInstanceEntity>> GetEntitiesAsync(string instanceId, string executionId);
/// <summary>
/// Deletes a list of history events from instance store
/// </summary>
/// <param name="entities">List of history events to delete</param>
Task<object> DeleteEntitiesAsync(IEnumerable<InstanceEntityBase> entities);
GetOrchestrationState
获取指定实例的协调状态列表
/// <summary>
/// Gets a list of orchestration states for a given instance
/// </summary>
/// <param name="instanceId">The instance id to return state for</param>
/// <param name="allInstances">Flag indication whether to get all history execution ids or just the most recent</param>
/// <returns>List of matching orchestration states</returns>
Task<IEnumerable<OrchestrationStateInstanceEntity>> GetOrchestrationStateAsync(string instanceId, bool allInstances);
/// <summary>
/// Gets the orchestration state for a given instance and execution id
/// </summary>
/// <param name="instanceId">The instance id to return state for</param>
/// <param name="executionId">The execution id to return state for</param>
/// <returns>The matching orchestration state or null if not found</returns>
Task<OrchestrationStateInstanceEntity> GetOrchestrationStateAsync(string instanceId, string executionId);
OrchestrationStateInstanceEntity 是这样的:
public class OrchestrationStateInstanceEntity : InstanceEntityBase
{
/// <summary>
/// The orchestration state for this instance entity
/// </summary>
public OrchestrationState State;
}
ForceTerminateTaskOrchestration
/// <summary>
/// Forcefully terminate the specified orchestration instance
/// </summary>
/// <param name="instanceId">Instance to terminate</param>
/// <param name="reason">Reason for termination</param>
Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason);
GetOrchestrationState
/// <summary>
/// Get a list of orchestration states from the instance storage for the most current execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="allExecutions">True if method should fetch all executions of the instance, false if the method should only fetch the most recent execution</param>
/// <returns>List of OrchestrationState objects that represents the list of orchestrations in the instance store</returns>
Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions);
/// <summary>
/// Get a list of orchestration states from the instance storage for the specified execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>The OrchestrationState of the specified instanceId or null if not found</returns>
Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId);
GetOrchestrationHistory
/// <summary>
/// Get a string dump of the execution history of the specified orchestration instance specified execution (generation) of the specified instance
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>String with formatted JSON representing the execution history</returns>
Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId);
PurgeOrchestrationHistory
清除超过指定阈值时间的协调实例状态和历史记录。还会清除 Blob 存储。
/// <summary>
/// Purges orchestration instance state and history for orchestrations older than the specified threshold time.
/// Also purges the blob storage.
/// </summary>
/// <param name="thresholdDateTimeUtc">Threshold date time in UTC</param>
/// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);
OrchestrationHistory 相关的方法
/// <summary>
/// Gets the list of history events for a given instance and execution id
/// </summary>
/// <param name="instanceId">The instance id to return history for</param>
/// <param name="executionId">The execution id to return history for</param>
/// <returns>List of history events</returns>
Task<IEnumerable<OrchestrationWorkItemInstanceEntity>> GetOrchestrationHistoryEventsAsync(string instanceId, string executionId);
/// <summary>
/// Purges history from storage for given time range
/// </summary>
/// <param name="thresholdDateTimeUtc">The datetime in UTC to use as the threshold for purging history</param>
/// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
/// <returns>The number of history events purged.</returns>
Task<int> PurgeOrchestrationHistoryEventsAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);
JumpStartEntities 相关的方法
/// <summary>
/// Writes a list of jump start events to instance store
/// </summary>
/// <param name="entities">List of jump start events to write</param>
Task<object> WriteJumpStartEntitiesAsync(IEnumerable<OrchestrationJumpStartInstanceEntity> entities);
/// <summary>
/// Deletes a list of jump start events from instance store
/// </summary>
/// <param name="entities">List of jump start events to delete</param>
Task<object> DeleteJumpStartEntitiesAsync(IEnumerable<OrchestrationJumpStartInstanceEntity> entities);
/// <summary>
/// Get a list of jump start events from instance store
/// </summary>
/// <returns>List of jump start events</returns>
Task<IEnumerable<OrchestrationJumpStartInstanceEntity>> GetJumpStartEntitiesAsync(int top);