这是本节的多页打印视图。
点击此处打印.
返回本页常规视图.
Fetch work item 流程
Fetch work item 流程
入口
src\DurableTask.Core\TaskOrchestrationDispatcher.cs
protected Task<TaskOrchestrationWorkItem> OnFetchWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
if (this.entityBackendProperties?.UseSeparateQueueForEntityWorkItems == true)
{
// duruale function quickstart 走到这里
// only orchestrations should be served by this dispatcher, so we call
// the method which returns work items for orchestrations only.
Console.WriteLine("OnFetchWorkItemAsync: UseSeparateQueueForEntityWorkItems == true");
Console.WriteLine("OnFetchWorkItemAsync: this.entityOrchestrationService=" + this.entityOrchestrationService?.GetType().FullName);
return this.entityOrchestrationService!.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
else
{
// sample 走到这里
// both entities and orchestrations are served by this dispatcher,
// so we call the method that may return work items for either.
return this.orchestrationService.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
}
AzureStorageOrchestrationService
src\DurableTask.AzureStorage\AzureStorageOrchestrationService.cs
public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
......
return LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: false, cancellationToken);
}
async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(bool entitiesOnly, CancellationToken cancellationToken)
{
// This call will block until the next session is ready
session = await this.orchestrationSessionManager.GetNextSessionAsync(entitiesOnly, linkedCts.Token);
OrchestrationSessionManager
src\DurableTask.AzureStorage\OrchestrationSessionManager.cs
public async Task<OrchestrationSession?> GetNextSessionAsync(bool entitiesOnly, CancellationToken cancellationToken)
{
// bool entitiesOnly,所以这里的 readyForProcessingQueue 是 orchestrationsReadyForProcessingQueue
var readyForProcessingQueue = entitiesOnly? this.entitiesReadyForProcessingQueue : this.orchestrationsReadyForProcessingQueue;
while (!cancellationToken.IsCancellationRequested)
{
// This call will block until:
// 1) a batch of messages has been received for a particular instance and
// 2) the history for that instance has been fetched
// 关键在这里
LinkedListNode<PendingMessageBatch> node = await readyForProcessingQueue.DequeueAsync(cancellationToken);
}
这里的 readyForProcessingQueue (也即是 orchestrationsReadyForProcessingQueue),看看它是怎么 enqueue 的。
orchestrationsReadyForProcessingQueue
orchestrationsReadyForProcessingQueue 定义在这里:
readonly AsyncQueue<LinkedListNode<PendingMessageBatch>> orchestrationsReadyForProcessingQueue = new AsyncQueue<LinkedListNode<PendingMessageBatch>>();
async Task ScheduleOrchestrationStatePrefetch(
LinkedListNode<PendingMessageBatch> node,
Guid traceActivityId,
CancellationToken cancellationToken)
{
PendingMessageBatch batch = node.Value;
AnalyticsEventSource.SetLogicalTraceActivityId(traceActivityId);
try
{
if (batch.OrchestrationState == null)
{
OrchestrationHistory history = await this.trackingStore.GetHistoryEventsAsync(
batch.OrchestrationInstanceId,
batch.OrchestrationExecutionId,
cancellationToken);
batch.OrchestrationState = new OrchestrationRuntimeState(history.Events);
batch.ETag = history.ETag;
batch.LastCheckpointTime = history.LastCheckpointTime;
batch.TrackingStoreContext = history.TrackingStoreContext;
}
if (this.settings.UseSeparateQueueForEntityWorkItems
&& DurableTask.Core.Common.Entities.IsEntityInstance(batch.OrchestrationInstanceId))
{
this.entitiesReadyForProcessingQueue.Enqueue(node);
}
else
{
// 在这里 enqueue
this.orchestrationsReadyForProcessingQueue.Enqueue(node);
}
}
......
这个 ScheduleOrchestrationStatePrefetch 有两处引用:
-
自己调用自己: 如果发生异常,则需要重新调用自身
async Task ScheduleOrchestrationStatePrefetch(
LinkedListNode<PendingMessageBatch> node,
Guid traceActivityId,
CancellationToken cancellationToken)
{
try
{
catch (Exception e)
{
// Sleep briefly to avoid a tight failure loop.
// 短暂睡眠,避免出现紧密的故障循环。
await Task.Delay(TimeSpan.FromSeconds(5));
// This is a background operation so failure is not an option. All exceptions must be handled.
// To avoid starvation, we need to re-enqueue this async operation instead of retrying in a loop.
// 这是一个后台操作,因此不能失败。必须处理所有异常。
// 为避免出现饥饿状态,我们需要重新queue 这个异步操作,而不是循环重试。
await Task.Run(() => this.ScheduleOrchestrationStatePrefetch(node, traceActivityId, cancellationToken));
}
-
在AddMessageToPendingOrchestration() 方法中调用
internal void AddMessageToPendingOrchestration(
ControlQueue controlQueue,
IEnumerable<MessageData> queueMessages,
Guid traceActivityId,
CancellationToken cancellationToken)
{
foreach (MessageData data in queueMessages)
{
......
// If there is no batch for this instanceID-executionID pair, create one
if (targetBatch == null)
{
targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId);
node = this.pendingOrchestrationMessageBatches.AddLast(targetBatch);
// Before the batch of messages can be processed, we need to download the latest execution state.
// This is done beforehand in the background as a performance optimization.
Task.Run(() => this.ScheduleOrchestrationStatePrefetch(node, traceActivityId, cancellationToken));
}
}
}
而这个 AddMessageToPendingOrchestration() 方法被 DequeueLoop() 方法调用:
async Task DequeueLoop(string partitionId, ControlQueue controlQueue, CancellationToken cancellationToken)
{
......
// Every dequeue operation has a common trace ID so that batches of dequeued messages can be correlated together.
// Both the dequeue traces and the processing traces will share the same "related" trace activity ID.
Guid traceActivityId = AzureStorageOrchestrationService.StartNewLogicalTraceScope(useExisting: false);
// This will block until either new messages arrive or the queue is released.
// 从 command queue 中获取 messages
IReadOnlyList<MessageData> messages = await controlQueue.GetMessagesAsync(cancellationToken);
if (messages.Count > 0)
{
// De-dupe any execution started messages
// 对任何已开始执行的信息进行去磁(De-dupe)
IEnumerable<MessageData> filteredMessages = await this.DedupeExecutionStartedMessagesAsync(
controlQueue,
messages,
traceActivityId,
cancellationToken);
this.AddMessageToPendingOrchestration(controlQueue, filteredMessages, traceActivityId, cancellationToken);
}
......
}
总结
1 - OrchestrationSessionManager
OrchestrationSessionManager
src\DurableTask.AzureStorage\OrchestrationSessionManager.cs
class OrchestrationSessionManager : IDisposable
{
readonly Dictionary<string, OrchestrationSession> activeOrchestrationSessions = new Dictionary<string, OrchestrationSession>(StringComparer.OrdinalIgnoreCase);
readonly ConcurrentDictionary<string, ControlQueue> ownedControlQueues = new ConcurrentDictionary<string, ControlQueue>();
readonly LinkedList<PendingMessageBatch> pendingOrchestrationMessageBatches = new LinkedList<PendingMessageBatch>();
readonly AsyncQueue<LinkedListNode<PendingMessageBatch>> orchestrationsReadyForProcessingQueue = new AsyncQueue<LinkedListNode<PendingMessageBatch>>();
readonly AsyncQueue<LinkedListNode<PendingMessageBatch>> entitiesReadyForProcessingQueue = new AsyncQueue<LinkedListNode<PendingMessageBatch>>();
}
activeOrchestrationSessions
activeOrchestrationSessions 是一个 Dictionary
readonly Dictionary<string, OrchestrationSession> activeOrchestrationSessions = new Dictionary<string, OrchestrationSession>(StringComparer.OrdinalIgnoreCase);
public bool IsControlQueueProcessingMessages(string partitionId)
{
return this.activeOrchestrationSessions.Values.Where(session => string.Equals(session.ControlQueue.Name, partitionId)).Any();
}
构造函数
public OrchestrationSessionManager(
string queueAccountName,
AzureStorageOrchestrationServiceSettings settings,
AzureStorageOrchestrationServiceStats stats,
ITrackingStore trackingStore)
{
this.storageAccountName = queueAccountName;
this.settings = settings;
this.stats = stats;
this.trackingStore = trackingStore;
this.fetchRuntimeStateQueue = new DispatchQueue(this.settings.MaxStorageOperationConcurrency);
}
ControlQueue 相关的代码
拥有的 ControlQueue,类型为 ConcurrentDictionary,这是要保存多个 ControlQueue:
readonly ConcurrentDictionary<string, ControlQueue> ownedControlQueues = new ConcurrentDictionary<string, ControlQueue>();
key 是 partitionId。
queus 方法返回 ownedControlQueues 的值:
internal IEnumerable<ControlQueue> Queues => this.ownedControlQueues.Values;
AddQueue()
public void AddQueue(string partitionId, ControlQueue controlQueue, CancellationToken cancellationToken)
{
if (this.ownedControlQueues.TryAdd(partitionId, controlQueue))
{
_ = Task.Run(() => this.DequeueLoop(partitionId, controlQueue, cancellationToken));
}
else
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Attempted to add a control queue {controlQueue.Name} multiple times!");
}
}
加入成功后,执行 DequeueLoop() 方法:
async Task DequeueLoop(string partitionId, ControlQueue controlQueue, CancellationToken cancellationToken)
{
this.settings.Logger.PartitionManagerInfo(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Started listening for messages on queue {controlQueue.Name}.");
while (!controlQueue.IsReleased)
{
try
{
// Every dequeue operation has a common trace ID so that batches of dequeued messages can be correlated together.
// Both the dequeue traces and the processing traces will share the same "related" trace activity ID.
Guid traceActivityId = AzureStorageOrchestrationService.StartNewLogicalTraceScope(useExisting: false);
// This will block until either new messages arrive or the queue is released.
IReadOnlyList<MessageData> messages = await controlQueue.GetMessagesAsync(cancellationToken);
if (messages.Count > 0)
{
// De-dupe any execution started messages
IEnumerable<MessageData> filteredMessages = await this.DedupeExecutionStartedMessagesAsync(
controlQueue,
messages,
traceActivityId,
cancellationToken);
this.AddMessageToPendingOrchestration(controlQueue, filteredMessages, traceActivityId, cancellationToken);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// shutting down
break;
}
catch (Exception e)
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Exception in the dequeue loop for control queue {controlQueue.Name}. Exception: {e}");
Thread.Sleep(TimeSpan.FromSeconds(1));
}
}
this.settings.Logger.PartitionManagerInfo(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Stopped listening for messages on queue {controlQueue.Name}.");
}
RemoveQueue
public void RemoveQueue(string partitionId, CloseReason? reason, string caller)
{
if (this.ownedControlQueues.TryRemove(partitionId, out ControlQueue controlQueue))
{
controlQueue.Release(reason, caller);
}
}
ReleaseQueue()
public void ReleaseQueue(string partitionId, CloseReason? reason, string caller)
{
if (this.ownedControlQueues.TryGetValue(partitionId, out ControlQueue controlQueue))
{
controlQueue.Release(reason, caller);
}
}
IsControlQueueReceivingMessages()
public bool IsControlQueueReceivingMessages(string partitionId)
{
return this.ownedControlQueues.TryGetValue(partitionId, out ControlQueue controlQueue)
&& !controlQueue.IsReleased;
}
IsControlQueueProcessingMessages()
public bool IsControlQueueProcessingMessages(string partitionId)
{
return this.activeOrchestrationSessions.Values.Where(session => string.Equals(session.ControlQueue.Name, partitionId)).Any();
}
需要仔细理解的方法
GetNextSessionAsync()
public async Task<OrchestrationSession?> GetNextSessionAsync(bool entitiesOnly, CancellationToken cancellationToken)
{
var readyForProcessingQueue = entitiesOnly? this.entitiesReadyForProcessingQueue : this.orchestrationsReadyForProcessingQueue;
while (!cancellationToken.IsCancellationRequested)
{
// This call will block until:
// 1) a batch of messages has been received for a particular instance and
// 2) the history for that instance has been fetched
LinkedListNode<PendingMessageBatch> node = await readyForProcessingQueue.DequeueAsync(cancellationToken);
lock (this.messageAndSessionLock)
{
PendingMessageBatch nextBatch = node.Value;
this.pendingOrchestrationMessageBatches.Remove(node);
if (!this.activeOrchestrationSessions.TryGetValue(nextBatch.OrchestrationInstanceId, out var existingSession))
{
OrchestrationInstance instance = nextBatch.OrchestrationState?.OrchestrationInstance ??
new OrchestrationInstance
{
InstanceId = nextBatch.OrchestrationInstanceId,
ExecutionId = nextBatch.OrchestrationExecutionId,
};
// 如果 activeOrchestrationSessions 中不存在,则在 AzureStorage 中 trace
Guid traceActivityId = AzureStorageOrchestrationService.StartNewLogicalTraceScope(useExisting: true);
OrchestrationSession session = new OrchestrationSession(
this.settings,
this.storageAccountName,
instance,
nextBatch.ControlQueue,
nextBatch.Messages,
nextBatch.OrchestrationState,
nextBatch.ETag,
nextBatch.LastCheckpointTime,
nextBatch.TrackingStoreContext,
this.settings.ExtendedSessionIdleTimeout,
traceActivityId);
// 然后创建一个 session 保存到 activeOrchestrationSessions 中
// 这样就保证了 activeOrchestrationSessions 中的 session 都是被 AzureStorage trace 的
this.activeOrchestrationSessions.Add(instance.InstanceId, session);
return session;
}
else if (nextBatch.OrchestrationExecutionId == existingSession.Instance?.ExecutionId)
{
// there is already an active session with the same execution id.
// The session might be waiting for more messages. If it is, signal them.
// 已经有一个具有相同执行 ID 的活动会话
// 该会话可能正在等待更多信息。 如果是,则发出信号。
existingSession.AddOrReplaceMessages(node.Value.Messages);
}
else
{
// A message arrived for a different generation of an existing orchestration instance.
// Put it back into the ready queue so that it can be processed once the current generation
// is done executing.
// 为现有协调实例的另一代发送消息。
// 将其放回就绪队列,以便在当前一代执行完毕后对其进行处理。
if (readyForProcessingQueue.Count == 0)
{
// To avoid a tight dequeue loop, delay for a bit before putting this node back into the queue.
// This is only necessary when the queue is empty. The main dequeue thread must not be blocked
// by this delay, which is why we use Task.Delay(...).ContinueWith(...) instead of await.
Task.Delay(millisecondsDelay: 200).ContinueWith(_ =>
{
lock (this.messageAndSessionLock)
{
this.pendingOrchestrationMessageBatches.AddLast(node);
readyForProcessingQueue.Enqueue(node);
}
});
}
else
{
this.pendingOrchestrationMessageBatches.AddLast(node);
readyForProcessingQueue.Enqueue(node);
}
}
}
}
return null;
}
TryGetExistingSession()
public bool TryGetExistingSession(string instanceId, out OrchestrationSession session)
{
lock (this.messageAndSessionLock)
{
return this.activeOrchestrationSessions.TryGetValue(instanceId, out session);
}
}
暂时先不深入的方法
AddMessageToPendingOrchestration
Adds history messages to an orchestration for its next replay. “Pending” here is unrelated to the Pending runtimeStatus.
为协调的下一次重放添加历史消息。这里的 “待处理 “与运行时状态 “待处理 “无关。
internal void AddMessageToPendingOrchestration(
ControlQueue controlQueue,
IEnumerable<MessageData> queueMessages,
Guid traceActivityId,
CancellationToken cancellationToken)
{
// Conditions to consider:
// 1. Do we need to create a new orchestration session or does one already exist?
// 2. Do we already have a copy of this message?
// 3. Do we need to add messages to a currently executing orchestration?
lock (this.messageAndSessionLock)
{
var existingSessionMessages = new Dictionary<OrchestrationSession, List<MessageData>>();
foreach (MessageData data in queueMessages)
{
// The instanceID identifies the orchestration across replays and ContinueAsNew generations.
// The executionID identifies a generation of an orchestration instance, doesn't change across replays.
string instanceId = data.TaskMessage.OrchestrationInstance.InstanceId;
string executionId = data.TaskMessage.OrchestrationInstance.ExecutionId;
// If the target orchestration is already in memory, we can potentially add the message to the session directly
// rather than adding it to the pending list. This behavior applies primarily when extended sessions are enabled.
// We can't do this for ExecutionStarted messages - those must *always* go to the pending list since they are for
// creating entirely new orchestration instances.
if (data.TaskMessage.Event.EventType != EventType.ExecutionStarted &&
this.activeOrchestrationSessions.TryGetValue(instanceId, out OrchestrationSession session))
{
// A null executionId value means that this is a management operation, like RaiseEvent or Terminate, which
// should be delivered to the current session.
if (executionId == null || session.Instance.ExecutionId == executionId)
{
List<MessageData> pendingMessages;
if (!existingSessionMessages.TryGetValue(session, out pendingMessages))
{
pendingMessages = new List<MessageData>();
existingSessionMessages.Add(session, pendingMessages);
}
pendingMessages.Add(data);
continue;
}
// Looks like this message is for another generation of the active orchestration. Let it fall
// into the pending list below. If it's a message for an older generation, it will be eventually
// discarded after we discover that we have no state associated with its execution ID. This is
// most common in scenarios involving durable timers and ContinueAsNew. Otherwise, this message
// will be processed after the current session unloads.
}
PendingMessageBatch? targetBatch = null; // batch for the current instanceID-executionID pair
// Unless the message is an ExecutionStarted event, we attempt to assign the current message to an
// existing batch by walking backwards through the list of batches until we find one with a matching InstanceID.
// This is assumed to be more efficient than walking forward if most messages arrive in the queue in groups.
LinkedListNode<PendingMessageBatch> node = this.pendingOrchestrationMessageBatches.Last;
while (node != null && data.TaskMessage.Event.EventType != EventType.ExecutionStarted)
{
PendingMessageBatch batch = node.Value;
if (batch.OrchestrationInstanceId == instanceId)
{
if (executionId == null || batch.OrchestrationExecutionId == executionId)
{
targetBatch = batch;
break;
}
else if (batch.OrchestrationExecutionId == null)
{
targetBatch = batch;
batch.OrchestrationExecutionId = executionId;
break;
}
}
node = node.Previous;
}
// If there is no batch for this instanceID-executionID pair, create one
if (targetBatch == null)
{
targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId);
node = this.pendingOrchestrationMessageBatches.AddLast(targetBatch);
// Before the batch of messages can be processed, we need to download the latest execution state.
// This is done beforehand in the background as a performance optimization.
Task.Run(() => this.ScheduleOrchestrationStatePrefetch(node, traceActivityId, cancellationToken));
}
// New messages are added; duplicate messages are replaced
targetBatch.Messages.AddOrReplace(data);
}
// The session might be waiting for more messages. If it is, signal them.
foreach (var pair in existingSessionMessages)
{
OrchestrationSession session = pair.Key;
List<MessageData> newMessages = pair.Value;
// New messages are added; duplicate messages are replaced
session.AddOrReplaceMessages(newMessages);
}
}
}
其他方法
DrainAsync()
The drain process occurs when the lease is stolen or the worker is shutting down, prompting the worker to cease listening for new messages and to finish processing all the existing information in memory.
当租约被盗或 Worker 即将关闭时,就会发生泄密过程,促使 Worker 停止监听新信息,并完成对内存中所有现有信息的处理。
DedupeExecutionStartedMessagesAsync()
This method enumerates all the provided queue messages looking for ExecutionStarted messages. If any are found, it queries table storage to ensure that each message has a matching record in the Instances table. If not, this method will either asynchronously discard the message or abandon it for reprocessing in case the Instances table record hasn’t been written yet (this happens asynchronously and there is no guaranteed order). Meanwhile, this method will return the list of filtered messages.
此方法会枚举所有提供的队列消息,查找 ExecutionStarted 消息。如果找到,它将查询表存储,确保每条消息在实例表中都有匹配记录。如果没有,该方法会异步丢弃消息,或者在实例表记录尚未写入的情况下放弃消息以便重新处理(这是异步发生的,没有顺序保证)。同时,该方法将返回已过滤的消息列表。
DedupeExecutionStartedMessagesAsync
2 - AzureStorageOrchestrationService
AzureStorageOrchestrationService
src\DurableTask.AzureStorage\AzureStorageOrchestrationService.cs
public sealed class AzureStorageOrchestrationService :
IOrchestrationService,
IOrchestrationServiceClient,
IDisposable,
IOrchestrationServiceQueryClient,
IOrchestrationServicePurgeClient,
IEntityOrchestrationService
{
构造函数
public AzureStorageOrchestrationService(AzureStorageOrchestrationServiceSettings settings, IOrchestrationServiceInstanceStore customInstanceStore)
{
if (settings == null)
{
throw new ArgumentNullException(nameof(settings));
}
ValidateSettings(settings);
this.settings = settings;
this.azureStorageClient = new AzureStorageClient(settings);
this.stats = this.azureStorageClient.Stats;
// “sampleshub-largemessages”
string compressedMessageBlobContainerName = $"{settings.TaskHubName.ToLowerInvariant()}-largemessages";
this.messageManager = new MessageManager(this.settings, this.azureStorageClient, compressedMessageBlobContainerName);
// 初始化 allControlQueues
this.allControlQueues = new ConcurrentDictionary<string, ControlQueue>();
for (int index = 0; index < this.settings.PartitionCount; index++)
{
// 分区初始化controlQueue,然后加入 allControlQueues
// controlQueueName = “sampleshub-control-00”
var controlQueueName = GetControlQueueName(this.settings.TaskHubName, index);
ControlQueue controlQueue = new ControlQueue(this.azureStorageClient, controlQueueName, this.messageManager);
this.allControlQueues.TryAdd(controlQueue.Name, controlQueue);
}
// workItemQueueName = “sampleshub-workitems”
var workItemQueueName = GetWorkItemQueueName(this.settings.TaskHubName);
this.workItemQueue = new WorkItemQueue(this.azureStorageClient, workItemQueueName, this.messageManager);
if (customInstanceStore == null)
{
// 默认用 AzureTableTrackingStore
this.trackingStore = new AzureTableTrackingStore(this.azureStorageClient, this.messageManager);
}
else
{
this.trackingStore = new InstanceStoreBackedTrackingStore(customInstanceStore);
}
this.activeActivitySessions = new ConcurrentDictionary<string, ActivitySession>(StringComparer.OrdinalIgnoreCase);
this.hubCreationLock = new object();
this.taskHubCreator = new ResettableLazy<Task>(
this.GetTaskHubCreatorTask,
LazyThreadSafetyMode.ExecutionAndPublication);
this.leaseManager = GetBlobLeaseManager(
this.azureStorageClient,
"default");
this.orchestrationSessionManager = new OrchestrationSessionManager(
this.azureStorageClient.QueueAccountName,
this.settings,
this.stats,
this.trackingStore);
if (this.settings.UseTablePartitionManagement && this.settings.UseLegacyPartitionManagement)
{
throw new ArgumentException("Cannot use both TablePartitionManagement and LegacyPartitionManagement. For improved reliability, consider using the TablePartitionManager.");
}
else if (this.settings.UseTablePartitionManagement)
{
this.partitionManager = new TablePartitionManager(
this,
this.azureStorageClient);
}
else if (this.settings.UseLegacyPartitionManagement)
{
this.partitionManager = new LegacyPartitionManager(
this,
this.azureStorageClient);
}
else
{
this.partitionManager = new SafePartitionManager(
this,
this.azureStorageClient,
this.orchestrationSessionManager);
}
this.appLeaseManager = new AppLeaseManager(
this.azureStorageClient,
this.partitionManager,
this.settings.TaskHubName.ToLowerInvariant() + "-applease",
this.settings.TaskHubName.ToLowerInvariant() + "-appleaseinfo",
this.settings.AppLeaseOptions);
}
GetControlQueuesAsync()
internal static async Task<Queue[]> GetControlQueuesAsync(
AzureStorageClient azureStorageClient,
int defaultPartitionCount)
{
if (azureStorageClient == null)
{
throw new ArgumentNullException(nameof(azureStorageClient));
}
string taskHub = azureStorageClient.Settings.TaskHubName;
// Need to check for leases in Azure Table Storage. Scale Controller calls into this method.
int partitionCount;
Table partitionTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.PartitionTableName);
// Check if table partition manager is used. If so, get partition count from table.
// Else, get the partition count from the blobs.
if (await partitionTable.ExistsAsync())
{
TableEntitiesResponseInfo<DynamicTableEntity> result = await partitionTable.ExecuteQueryAsync(new TableQuery<DynamicTableEntity>());
partitionCount = result.ReturnedEntities.Count;
}
else
{
BlobLeaseManager inactiveLeaseManager = GetBlobLeaseManager(azureStorageClient, "inactive");
TaskHubInfo hubInfo = await inactiveLeaseManager.GetOrCreateTaskHubInfoAsync(
GetTaskHubInfo(taskHub, defaultPartitionCount),
checkIfStale: false);
partitionCount = hubInfo.PartitionCount;
};
var controlQueues = new Queue[partitionCount];
for (int i = 0; i < partitionCount; i++)
{
controlQueues[i] = azureStorageClient.GetQueueReference(GetControlQueueName(taskHub, i));
}
return controlQueues;
}
GetWorkItemQueue()
internal static Queue GetWorkItemQueue(AzureStorageClient azureStorageClient)
{
string queueName = GetWorkItemQueueName(azureStorageClient.Settings.TaskHubName);
return azureStorageClient.GetQueueReference(queueName);
}
LockNextTaskOrchestrationWorkItemAsync()
public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
return LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: false, cancellationToken);
}
LockNextTaskOrchestrationWorkItemAsync()
async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(bool entitiesOnly, CancellationToken cancellationToken)
{
Guid traceActivityId = StartNewLogicalTraceScope(useExisting: true);
await this.EnsureTaskHubAsync();
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, this.shutdownSource.Token))
{
OrchestrationSession session = null;
TaskOrchestrationWorkItem orchestrationWorkItem = null;
try
{
// This call will block until the next session is ready
// 获得下一个 session,关键在这里
session = await this.orchestrationSessionManager.GetNextSessionAsync(entitiesOnly, linkedCts.Token);
if (session == null)
{
return null;
}
// Make sure we still own the partition. If not, abandon the session.
if (session.ControlQueue.IsReleased)
{
await this.AbandonAndReleaseSessionAsync(session);
return null;
}
session.StartNewLogicalTraceScope();
List<MessageData> outOfOrderMessages = null;
foreach (MessageData message in session.CurrentMessageBatch)
{
if (session.IsOutOfOrderMessage(message))
{
if (outOfOrderMessages == null)
{
outOfOrderMessages = new List<MessageData>();
}
// This can happen if a lease change occurs and a new node receives a message for an
// orchestration that has not yet checkpointed its history. We abandon such messages
// so that they can be reprocessed after the history checkpoint has completed.
this.settings.Logger.ReceivedOutOfOrderMessage(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session.Instance.InstanceId,
session.Instance.ExecutionId,
session.ControlQueue.Name,
message.TaskMessage.Event.EventType.ToString(),
Utils.GetTaskEventId(message.TaskMessage.Event),
message.OriginalQueueMessage.Id,
message.Episode.GetValueOrDefault(-1),
session.LastCheckpointTime);
outOfOrderMessages.Add(message);
}
else
{
session.TraceProcessingMessage(message, isExtendedSession: false);
}
}
if (outOfOrderMessages?.Count > 0)
{
// This will also remove the messages from the current batch.
await this.AbandonMessagesAsync(session, outOfOrderMessages);
}
if (session.CurrentMessageBatch.Count == 0)
{
// All messages were removed. Release the work item.
await this.AbandonAndReleaseSessionAsync(session);
return null;
}
// Create or restore Correlation TraceContext
TraceContextBase currentRequestTraceContext = null;
CorrelationTraceClient.Propagate(
() =>
{
var isReplaying = session.RuntimeState.ExecutionStartedEvent?.IsPlayed ?? false;
TraceContextBase parentTraceContext = GetParentTraceContext(session);
currentRequestTraceContext = GetRequestTraceContext(isReplaying, parentTraceContext);
});
// 创建 orchestrationWorkItem
orchestrationWorkItem = new TaskOrchestrationWorkItem
{
InstanceId = session.Instance.InstanceId,
LockedUntilUtc = session.CurrentMessageBatch.Min(msg => msg.OriginalQueueMessage.NextVisibleTime.Value.UtcDateTime),
NewMessages = session.CurrentMessageBatch.Select(m => m.TaskMessage).ToList(),
OrchestrationRuntimeState = session.RuntimeState,
Session = this.settings.ExtendedSessionsEnabled ? session : null,
TraceContext = currentRequestTraceContext,
};
if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, out string warningMessage))
{
// If all messages belong to the same execution ID, then all of them need to be discarded.
// However, it's also possible to have messages for *any* execution ID batched together with messages
// to a *specific* (non-executable) execution ID. Those messages should *not* be discarded since
// they might be consumable by another orchestration with the same instance id but different execution ID.
var messagesToDiscard = new List<MessageData>();
var messagesToAbandon = new List<MessageData>();
foreach (MessageData msg in session.CurrentMessageBatch)
{
if (msg.TaskMessage.OrchestrationInstance.ExecutionId == session.Instance.ExecutionId)
{
messagesToDiscard.Add(msg);
}
else
{
messagesToAbandon.Add(msg);
}
}
// If no messages have a matching execution ID, then delete all of them. This means all the
// messages are external (external events, termination, etc.) and were sent to an instance that
// doesn't exist or is no longer in a running state.
if (messagesToDiscard.Count == 0)
{
messagesToDiscard.AddRange(messagesToAbandon);
messagesToAbandon.Clear();
}
// Add all abandoned messages to the deferred list. These messages will not be deleted right now.
// If they can be matched with another orchestration, then great. Otherwise they will be deleted
// the next time they are picked up.
messagesToAbandon.ForEach(session.DeferMessage);
var eventListBuilder = new StringBuilder(orchestrationWorkItem.NewMessages.Count * 40);
foreach (MessageData msg in messagesToDiscard)
{
eventListBuilder.Append(msg.TaskMessage.Event.EventType.ToString()).Append(',');
}
this.settings.Logger.DiscardingWorkItem(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session.Instance.InstanceId,
session.Instance.ExecutionId,
orchestrationWorkItem.NewMessages.Count,
session.RuntimeState.Events.Count,
eventListBuilder.ToString(0, eventListBuilder.Length - 1) /* remove trailing comma */,
warningMessage);
// The instance has already completed or never existed. Delete this message batch.
await this.DeleteMessageBatchAsync(session, messagesToDiscard);
await this.ReleaseTaskOrchestrationWorkItemAsync(orchestrationWorkItem);
return null;
}
System.Console.WriteLine("********* versioning ************* LockNextTaskOrchestrationWorkItemAsync()");
//Console.WriteLine("********* versioning *************: orchestrationWorkItem: name=" + orchestrationWorkItem.OrchestrationRuntimeState!.Name + ", version=" + orchestrationWorkItem.OrchestrationRuntimeState!.Version);
return orchestrationWorkItem;
}
catch (OperationCanceledException)
{
if (session != null)
{
// host is shutting down - release any queued messages
await this.AbandonAndReleaseSessionAsync(session);
}
return null;
}
catch (Exception e)
{
// print out the exception
Console.WriteLine("********* versioning ************* LockNextTaskOrchestrationWorkItemAsync() exception: " + e.ToString());
this.settings.Logger.OrchestrationProcessingFailure(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session?.Instance.InstanceId ?? string.Empty,
session?.Instance.ExecutionId ?? string.Empty,
e.ToString());
if (orchestrationWorkItem != null)
{
// The work-item needs to be released so that it can be retried later.
await this.ReleaseTaskOrchestrationWorkItemAsync(orchestrationWorkItem);
}
throw;
}
}
}