AzureStorageOrchestrationService
AzureStorageOrchestrationService
src\DurableTask.AzureStorage\AzureStorageOrchestrationService.cs
public sealed class AzureStorageOrchestrationService :
IOrchestrationService,
IOrchestrationServiceClient,
IDisposable,
IOrchestrationServiceQueryClient,
IOrchestrationServicePurgeClient,
IEntityOrchestrationService
{
构造函数
public AzureStorageOrchestrationService(AzureStorageOrchestrationServiceSettings settings, IOrchestrationServiceInstanceStore customInstanceStore)
{
if (settings == null)
{
throw new ArgumentNullException(nameof(settings));
}
ValidateSettings(settings);
this.settings = settings;
this.azureStorageClient = new AzureStorageClient(settings);
this.stats = this.azureStorageClient.Stats;
// “sampleshub-largemessages”
string compressedMessageBlobContainerName = $"{settings.TaskHubName.ToLowerInvariant()}-largemessages";
this.messageManager = new MessageManager(this.settings, this.azureStorageClient, compressedMessageBlobContainerName);
// 初始化 allControlQueues
this.allControlQueues = new ConcurrentDictionary<string, ControlQueue>();
for (int index = 0; index < this.settings.PartitionCount; index++)
{
// 分区初始化controlQueue,然后加入 allControlQueues
// controlQueueName = “sampleshub-control-00”
var controlQueueName = GetControlQueueName(this.settings.TaskHubName, index);
ControlQueue controlQueue = new ControlQueue(this.azureStorageClient, controlQueueName, this.messageManager);
this.allControlQueues.TryAdd(controlQueue.Name, controlQueue);
}
// workItemQueueName = “sampleshub-workitems”
var workItemQueueName = GetWorkItemQueueName(this.settings.TaskHubName);
this.workItemQueue = new WorkItemQueue(this.azureStorageClient, workItemQueueName, this.messageManager);
if (customInstanceStore == null)
{
// 默认用 AzureTableTrackingStore
this.trackingStore = new AzureTableTrackingStore(this.azureStorageClient, this.messageManager);
}
else
{
this.trackingStore = new InstanceStoreBackedTrackingStore(customInstanceStore);
}
this.activeActivitySessions = new ConcurrentDictionary<string, ActivitySession>(StringComparer.OrdinalIgnoreCase);
this.hubCreationLock = new object();
this.taskHubCreator = new ResettableLazy<Task>(
this.GetTaskHubCreatorTask,
LazyThreadSafetyMode.ExecutionAndPublication);
this.leaseManager = GetBlobLeaseManager(
this.azureStorageClient,
"default");
this.orchestrationSessionManager = new OrchestrationSessionManager(
this.azureStorageClient.QueueAccountName,
this.settings,
this.stats,
this.trackingStore);
if (this.settings.UseTablePartitionManagement && this.settings.UseLegacyPartitionManagement)
{
throw new ArgumentException("Cannot use both TablePartitionManagement and LegacyPartitionManagement. For improved reliability, consider using the TablePartitionManager.");
}
else if (this.settings.UseTablePartitionManagement)
{
this.partitionManager = new TablePartitionManager(
this,
this.azureStorageClient);
}
else if (this.settings.UseLegacyPartitionManagement)
{
this.partitionManager = new LegacyPartitionManager(
this,
this.azureStorageClient);
}
else
{
this.partitionManager = new SafePartitionManager(
this,
this.azureStorageClient,
this.orchestrationSessionManager);
}
this.appLeaseManager = new AppLeaseManager(
this.azureStorageClient,
this.partitionManager,
this.settings.TaskHubName.ToLowerInvariant() + "-applease",
this.settings.TaskHubName.ToLowerInvariant() + "-appleaseinfo",
this.settings.AppLeaseOptions);
}
GetControlQueuesAsync()
internal static async Task<Queue[]> GetControlQueuesAsync(
AzureStorageClient azureStorageClient,
int defaultPartitionCount)
{
if (azureStorageClient == null)
{
throw new ArgumentNullException(nameof(azureStorageClient));
}
string taskHub = azureStorageClient.Settings.TaskHubName;
// Need to check for leases in Azure Table Storage. Scale Controller calls into this method.
int partitionCount;
Table partitionTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.PartitionTableName);
// Check if table partition manager is used. If so, get partition count from table.
// Else, get the partition count from the blobs.
if (await partitionTable.ExistsAsync())
{
TableEntitiesResponseInfo<DynamicTableEntity> result = await partitionTable.ExecuteQueryAsync(new TableQuery<DynamicTableEntity>());
partitionCount = result.ReturnedEntities.Count;
}
else
{
BlobLeaseManager inactiveLeaseManager = GetBlobLeaseManager(azureStorageClient, "inactive");
TaskHubInfo hubInfo = await inactiveLeaseManager.GetOrCreateTaskHubInfoAsync(
GetTaskHubInfo(taskHub, defaultPartitionCount),
checkIfStale: false);
partitionCount = hubInfo.PartitionCount;
};
var controlQueues = new Queue[partitionCount];
for (int i = 0; i < partitionCount; i++)
{
controlQueues[i] = azureStorageClient.GetQueueReference(GetControlQueueName(taskHub, i));
}
return controlQueues;
}
GetWorkItemQueue()
internal static Queue GetWorkItemQueue(AzureStorageClient azureStorageClient)
{
string queueName = GetWorkItemQueueName(azureStorageClient.Settings.TaskHubName);
return azureStorageClient.GetQueueReference(queueName);
}
LockNextTaskOrchestrationWorkItemAsync()
public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
return LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: false, cancellationToken);
}
LockNextTaskOrchestrationWorkItemAsync()
async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(bool entitiesOnly, CancellationToken cancellationToken)
{
Guid traceActivityId = StartNewLogicalTraceScope(useExisting: true);
await this.EnsureTaskHubAsync();
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, this.shutdownSource.Token))
{
OrchestrationSession session = null;
TaskOrchestrationWorkItem orchestrationWorkItem = null;
try
{
// This call will block until the next session is ready
// 获得下一个 session,关键在这里
session = await this.orchestrationSessionManager.GetNextSessionAsync(entitiesOnly, linkedCts.Token);
if (session == null)
{
return null;
}
// Make sure we still own the partition. If not, abandon the session.
if (session.ControlQueue.IsReleased)
{
await this.AbandonAndReleaseSessionAsync(session);
return null;
}
session.StartNewLogicalTraceScope();
List<MessageData> outOfOrderMessages = null;
foreach (MessageData message in session.CurrentMessageBatch)
{
if (session.IsOutOfOrderMessage(message))
{
if (outOfOrderMessages == null)
{
outOfOrderMessages = new List<MessageData>();
}
// This can happen if a lease change occurs and a new node receives a message for an
// orchestration that has not yet checkpointed its history. We abandon such messages
// so that they can be reprocessed after the history checkpoint has completed.
this.settings.Logger.ReceivedOutOfOrderMessage(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session.Instance.InstanceId,
session.Instance.ExecutionId,
session.ControlQueue.Name,
message.TaskMessage.Event.EventType.ToString(),
Utils.GetTaskEventId(message.TaskMessage.Event),
message.OriginalQueueMessage.Id,
message.Episode.GetValueOrDefault(-1),
session.LastCheckpointTime);
outOfOrderMessages.Add(message);
}
else
{
session.TraceProcessingMessage(message, isExtendedSession: false);
}
}
if (outOfOrderMessages?.Count > 0)
{
// This will also remove the messages from the current batch.
await this.AbandonMessagesAsync(session, outOfOrderMessages);
}
if (session.CurrentMessageBatch.Count == 0)
{
// All messages were removed. Release the work item.
await this.AbandonAndReleaseSessionAsync(session);
return null;
}
// Create or restore Correlation TraceContext
TraceContextBase currentRequestTraceContext = null;
CorrelationTraceClient.Propagate(
() =>
{
var isReplaying = session.RuntimeState.ExecutionStartedEvent?.IsPlayed ?? false;
TraceContextBase parentTraceContext = GetParentTraceContext(session);
currentRequestTraceContext = GetRequestTraceContext(isReplaying, parentTraceContext);
});
// 创建 orchestrationWorkItem
orchestrationWorkItem = new TaskOrchestrationWorkItem
{
InstanceId = session.Instance.InstanceId,
LockedUntilUtc = session.CurrentMessageBatch.Min(msg => msg.OriginalQueueMessage.NextVisibleTime.Value.UtcDateTime),
NewMessages = session.CurrentMessageBatch.Select(m => m.TaskMessage).ToList(),
OrchestrationRuntimeState = session.RuntimeState,
Session = this.settings.ExtendedSessionsEnabled ? session : null,
TraceContext = currentRequestTraceContext,
};
if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, out string warningMessage))
{
// If all messages belong to the same execution ID, then all of them need to be discarded.
// However, it's also possible to have messages for *any* execution ID batched together with messages
// to a *specific* (non-executable) execution ID. Those messages should *not* be discarded since
// they might be consumable by another orchestration with the same instance id but different execution ID.
var messagesToDiscard = new List<MessageData>();
var messagesToAbandon = new List<MessageData>();
foreach (MessageData msg in session.CurrentMessageBatch)
{
if (msg.TaskMessage.OrchestrationInstance.ExecutionId == session.Instance.ExecutionId)
{
messagesToDiscard.Add(msg);
}
else
{
messagesToAbandon.Add(msg);
}
}
// If no messages have a matching execution ID, then delete all of them. This means all the
// messages are external (external events, termination, etc.) and were sent to an instance that
// doesn't exist or is no longer in a running state.
if (messagesToDiscard.Count == 0)
{
messagesToDiscard.AddRange(messagesToAbandon);
messagesToAbandon.Clear();
}
// Add all abandoned messages to the deferred list. These messages will not be deleted right now.
// If they can be matched with another orchestration, then great. Otherwise they will be deleted
// the next time they are picked up.
messagesToAbandon.ForEach(session.DeferMessage);
var eventListBuilder = new StringBuilder(orchestrationWorkItem.NewMessages.Count * 40);
foreach (MessageData msg in messagesToDiscard)
{
eventListBuilder.Append(msg.TaskMessage.Event.EventType.ToString()).Append(',');
}
this.settings.Logger.DiscardingWorkItem(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session.Instance.InstanceId,
session.Instance.ExecutionId,
orchestrationWorkItem.NewMessages.Count,
session.RuntimeState.Events.Count,
eventListBuilder.ToString(0, eventListBuilder.Length - 1) /* remove trailing comma */,
warningMessage);
// The instance has already completed or never existed. Delete this message batch.
await this.DeleteMessageBatchAsync(session, messagesToDiscard);
await this.ReleaseTaskOrchestrationWorkItemAsync(orchestrationWorkItem);
return null;
}
System.Console.WriteLine("********* versioning ************* LockNextTaskOrchestrationWorkItemAsync()");
//Console.WriteLine("********* versioning *************: orchestrationWorkItem: name=" + orchestrationWorkItem.OrchestrationRuntimeState!.Name + ", version=" + orchestrationWorkItem.OrchestrationRuntimeState!.Version);
return orchestrationWorkItem;
}
catch (OperationCanceledException)
{
if (session != null)
{
// host is shutting down - release any queued messages
await this.AbandonAndReleaseSessionAsync(session);
}
return null;
}
catch (Exception e)
{
// print out the exception
Console.WriteLine("********* versioning ************* LockNextTaskOrchestrationWorkItemAsync() exception: " + e.ToString());
this.settings.Logger.OrchestrationProcessingFailure(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session?.Instance.InstanceId ?? string.Empty,
session?.Instance.ExecutionId ?? string.Empty,
e.ToString());
if (orchestrationWorkItem != null)
{
// The work-item needs to be released so that it can be retried later.
await this.ReleaseTaskOrchestrationWorkItemAsync(orchestrationWorkItem);
}
throw;
}
}
}