DurableTask Dispatcher源码学习
1 - TaskOrchestrationDispatcher源码学习
Dispatcher for orchestrations to handle processing and renewing, completion of orchestration events
协调的调度程序,用于处理、更新和完成协调事件
src\DurableTask.Core\TaskOrchestrationDispatcher.cs
类定义
没任何借口或者基类:
public class TaskOrchestrationDispatcher{}
类字段
static readonly Task CompletedTask = Task.FromResult(0);
readonly INameVersionObjectManager<TaskOrchestration> objectManager;
readonly IOrchestrationService orchestrationService;
readonly WorkItemDispatcher<TaskOrchestrationWorkItem> dispatcher;
readonly DispatchMiddlewarePipeline dispatchPipeline;
readonly LogHelper logHelper;
ErrorPropagationMode errorPropagationMode;
readonly NonBlockingCountdownLock concurrentSessionLock;
readonly IEntityOrchestrationService? entityOrchestrationService;
readonly EntityBackendProperties? entityBackendProperties;
readonly TaskOrchestrationEntityParameters? entityParameters;
主要的功能在 objectManager , orchestrationService , dispatcher 和 DispatchMiddlewarePipeline。
构造函数
internal TaskOrchestrationDispatcher(
IOrchestrationService orchestrationService,
INameVersionObjectManager<TaskOrchestration> objectManager,
DispatchMiddlewarePipeline dispatchPipeline,
LogHelper logHelper,
ErrorPropagationMode errorPropagationMode)
{
// 这几个字段都是传递进来的
this.objectManager = objectManager ?? throw new ArgumentNullException(nameof(objectManager));
this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService));
this.dispatchPipeline = dispatchPipeline ?? throw new ArgumentNullException(nameof(dispatchPipeline));
this.logHelper = logHelper ?? throw new ArgumentNullException(nameof(logHelper));
this.errorPropagationMode = errorPropagationMode;
// orchestrationService 转为 IEntityOrchestrationService
this.entityOrchestrationService = orchestrationService as IEntityOrchestrationService;
this.entityBackendProperties = this.entityOrchestrationService?.EntityBackendProperties;
this.entityParameters = TaskOrchestrationEntityParameters.FromEntityBackendProperties(this.entityBackendProperties);
// 初始化 dispatcher
this.dispatcher = new WorkItemDispatcher<TaskOrchestrationWorkItem>(
"TaskOrchestrationDispatcher",
item => item == null ? string.Empty : item.InstanceId,
this.OnFetchWorkItemAsync,
this.OnProcessWorkItemSessionAsync)
{
// 初始化 dispatcher 的字段
GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
GetDelayInSecondsAfterOnProcessException = orchestrationService.GetDelayInSecondsAfterOnProcessException,
SafeReleaseWorkItem = orchestrationService.ReleaseTaskOrchestrationWorkItemAsync,
AbortWorkItem = orchestrationService.AbandonTaskOrchestrationWorkItemAsync,
DispatcherCount = orchestrationService.TaskOrchestrationDispatcherCount,
MaxConcurrentWorkItems = orchestrationService.MaxConcurrentTaskOrchestrationWorkItems,
LogHelper = logHelper,
};
// To avoid starvation, we only allow half of all concurrently execution orchestrations to
// leverage extended sessions.
var maxConcurrentSessions = (int)Math.Ceiling(this.dispatcher.MaxConcurrentWorkItems / 2.0);
this.concurrentSessionLock = new NonBlockingCountdownLock(maxConcurrentSessions);
}
StartAsync() 方法
Starts the dispatcher to start getting and processing orchestration events
启动调度程序,开始获取和处理协调事件
public async Task StartAsync()
{
await this.dispatcher.StartAsync();
}
OnFetchWorkItemAsync() 方法
Method to get the next work item to process within supplied timeout
在提供的超时时间内获取下一个要处理的工作项的方法
protected Task<TaskOrchestrationWorkItem> OnFetchWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
if (this.entityBackendProperties?.UseSeparateQueueForEntityWorkItems == true)
{
// only orchestrations should be served by this dispatcher, so we call
// the method which returns work items for orchestrations only.
// 该调度器只应为协调提供服务,因此我们调用这个方法,该方法只返回协调的工作项。
Console.WriteLine("OnFetchWorkItemAsync: UseSeparateQueueForEntityWorkItems == true");
Console.WriteLine("OnFetchWorkItemAsync: this.entityOrchestrationService=" + this.entityOrchestrationService?.GetType().FullName);
return this.entityOrchestrationService!.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
else
{
// both entities and orchestrations are served by this dispatcher,
// so we call the method that may return work items for either.
// 实体和编排都由该调度器提供服务,因此我们调用的方法可能会返回两者的工作项。
return this.orchestrationService.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
}
增加日志打印,看到:
OnFetchWorkItemAsync: UseSeparateQueueForEntityWorkItems == true
OnFetchWorkItemAsync: this.entityOrchestrationService=Microsoft.Azure.WebJobs.Extensions.DurableTask.AzureStorageDurabilityProvider
OnProcessWorkItemSessionAsync() 方法
async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
{
// DTFx history replay expects that ExecutionStarted comes before other events.
// If this is not already the case, due to a race-condition, we re-order the
// messages to enforce this expectation.
EnsureExecutionStartedIsFirst(workItem.NewMessages);
try
{
if (workItem.Session == null)
{
// Legacy behavior
await this.OnProcessWorkItemAsync(workItem);
return;
}
var isExtendedSession = false;
CorrelationTraceClient.Propagate(
() =>
{
// Check if it is extended session.
// TODO: Remove this code - it looks incorrect and dangerous
isExtendedSession = this.concurrentSessionLock.Acquire();
this.concurrentSessionLock.Release();
workItem.IsExtendedSession = isExtendedSession;
});
var processCount = 0;
try
{
while (true)
{
// If the provider provided work items, execute them.
if (workItem.NewMessages?.Count > 0)
{
bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem);
if (isCompletedOrInterrupted)
{
break;
}
processCount++;
}
// Fetches beyond the first require getting an extended session lock, used to prevent starvation.
if (processCount > 0 && !isExtendedSession)
{
isExtendedSession = this.concurrentSessionLock.Acquire();
if (!isExtendedSession)
{
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-MaxOperations", "Failed to acquire concurrent session lock.");
break;
}
}
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-StartFetch", "Starting fetch of existing session.");
Stopwatch timer = Stopwatch.StartNew();
// Wait for new messages to arrive for the session. This call is expected to block (asynchronously)
// until either new messages are available or until a provider-specific timeout has expired.
workItem.NewMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
if (workItem.NewMessages == null)
{
break;
}
TraceHelper.Trace(
TraceEventType.Verbose,
"OnProcessWorkItemSession-EndFetch",
$"Fetched {workItem.NewMessages.Count} new message(s) after {timer.ElapsedMilliseconds} ms from existing session.");
workItem.OrchestrationRuntimeState.NewEvents.Clear();
}
}
finally
{
if (isExtendedSession)
{
TraceHelper.Trace(
TraceEventType.Verbose,
"OnProcessWorkItemSession-Release",
$"Releasing extended session after {processCount} batch(es).");
this.concurrentSessionLock.Release();
}
}
}
catch (SessionAbortedException e)
{
// Either the orchestration or the orchestration service explicitly abandoned the session.
OrchestrationInstance instance = workItem.OrchestrationRuntimeState?.OrchestrationInstance ?? new OrchestrationInstance { InstanceId = workItem.InstanceId };
this.logHelper.OrchestrationAborted(instance, e.Message);
TraceHelper.TraceInstance(TraceEventType.Warning, "TaskOrchestrationDispatcher-ExecutionAborted", instance, "{0}", e.Message);
await this.orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
}
}
OnProcessWorkItemAsync()
protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
var messagesToSend = new List<TaskMessage>();
var timerMessages = new List<TaskMessage>();
var orchestratorMessages = new List<TaskMessage>();
var isCompleted = false;
var continuedAsNew = false;
var isInterrupted = false;
// correlation
CorrelationTraceClient.Propagate(() => CorrelationTraceContext.Current = workItem.TraceContext);
ExecutionStartedEvent? continueAsNewExecutionStarted = null;
TaskMessage? continuedAsNewMessage = null;
IList<HistoryEvent>? carryOverEvents = null;
string? carryOverStatus = null;
workItem.OrchestrationRuntimeState.LogHelper = this.logHelper;
OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
OrchestrationRuntimeState originalOrchestrationRuntimeState = runtimeState;
// Distributed tracing support: each orchestration execution is a trace activity
// that derives from an established parent trace context. It is expected that some
// listener will receive these events and publish them to a distributed trace logger.
ExecutionStartedEvent startEvent =
runtimeState.ExecutionStartedEvent ??
workItem.NewMessages.Select(msg => msg.Event).OfType<ExecutionStartedEvent>().FirstOrDefault();
Activity? traceActivity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent);
OrchestrationState? instanceState = null;
Task? renewTask = null;
using var renewCancellationTokenSource = new CancellationTokenSource();
if (workItem.LockedUntilUtc < DateTime.MaxValue)
{
// start a task to run RenewUntil
renewTask = Task.Factory.StartNew(
() => RenewUntil(workItem, this.orchestrationService, this.logHelper, nameof(TaskOrchestrationDispatcher), renewCancellationTokenSource.Token),
renewCancellationTokenSource.Token);
}
try
{
// Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper))
{
// TODO : mark an orchestration as faulted if there is data corruption
this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
TraceHelper.TraceSession(
TraceEventType.Error,
"TaskOrchestrationDispatcher-DeletedOrchestration",
runtimeState.OrchestrationInstance?.InstanceId!,
"Received work-item for an invalid orchestration");
isCompleted = true;
traceActivity?.Dispose();
}
else
{
do
{
continuedAsNew = false;
continuedAsNewMessage = null;
this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name);
TraceHelper.TraceInstance(
TraceEventType.Verbose,
"TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin",
runtimeState.OrchestrationInstance!,
"Executing user orchestration: {0}",
JsonDataConverter.Default.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true));
if (workItem.Cursor == null)
{
workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem);
}
else
{
await this.ResumeOrchestrationAsync(workItem);
}
IReadOnlyList<OrchestratorAction> decisions = workItem.Cursor.LatestDecisions.ToList();
this.logHelper.OrchestrationExecuted(
runtimeState.OrchestrationInstance!,
runtimeState.Name,
decisions);
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-ExecuteUserOrchestration-End",
runtimeState.OrchestrationInstance!,
"Executed user orchestration. Received {0} orchestrator actions: {1}",
decisions.Count,
string.Join(", ", decisions.Select(d => d.Id + ":" + d.OrchestratorActionType)));
// TODO: Exception handling for invalid decisions, which is increasingly likely
// when custom middleware is involved (e.g. out-of-process scenarios).
foreach (OrchestratorAction decision in decisions)
{
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-ProcessOrchestratorAction",
runtimeState.OrchestrationInstance!,
"Processing orchestrator action of type {0}",
decision.OrchestratorActionType);
switch (decision.OrchestratorActionType)
{
case OrchestratorActionType.ScheduleOrchestrator:
var scheduleTaskAction = (ScheduleTaskOrchestratorAction)decision;
var message = this.ProcessScheduleTaskDecision(
scheduleTaskAction,
runtimeState,
this.IncludeParameters,
traceActivity);
messagesToSend.Add(message);
break;
case OrchestratorActionType.CreateTimer:
var timerOrchestratorAction = (CreateTimerOrchestratorAction)decision;
timerMessages.Add(this.ProcessCreateTimerDecision(
timerOrchestratorAction,
runtimeState,
isInternal: false));
break;
case OrchestratorActionType.CreateSubOrchestration:
var createSubOrchestrationAction = (CreateSubOrchestrationAction)decision;
orchestratorMessages.Add(
this.ProcessCreateSubOrchestrationInstanceDecision(
createSubOrchestrationAction,
runtimeState,
this.IncludeParameters,
traceActivity));
break;
case OrchestratorActionType.SendEvent:
var sendEventAction = (SendEventOrchestratorAction)decision;
orchestratorMessages.Add(
this.ProcessSendEventDecision(sendEventAction, runtimeState));
break;
case OrchestratorActionType.OrchestrationComplete:
OrchestrationCompleteOrchestratorAction completeDecision = (OrchestrationCompleteOrchestratorAction)decision;
TaskMessage? workflowInstanceCompletedMessage =
this.ProcessWorkflowCompletedTaskDecision(completeDecision, runtimeState, this.IncludeDetails, out continuedAsNew);
if (workflowInstanceCompletedMessage != null)
{
// Send complete message to parent workflow or to itself to start a new execution
// Store the event so we can rebuild the state
carryOverEvents = null;
if (continuedAsNew)
{
continuedAsNewMessage = workflowInstanceCompletedMessage;
continueAsNewExecutionStarted = workflowInstanceCompletedMessage.Event as ExecutionStartedEvent;
if (completeDecision.CarryoverEvents.Any())
{
carryOverEvents = completeDecision.CarryoverEvents.ToList();
completeDecision.CarryoverEvents.Clear();
}
}
else
{
orchestratorMessages.Add(workflowInstanceCompletedMessage);
}
}
isCompleted = !continuedAsNew;
break;
default:
throw TraceHelper.TraceExceptionInstance(
TraceEventType.Error,
"TaskOrchestrationDispatcher-UnsupportedDecisionType",
runtimeState.OrchestrationInstance!,
new NotSupportedException($"Decision type '{decision.OrchestratorActionType}' not supported"));
}
// Underlying orchestration service provider may have a limit of messages per call, to avoid the situation
// we keep on asking the provider if message count is ok and stop processing new decisions if not.
//
// We also put in a fake timer to force next orchestration task for remaining messages
int totalMessages = messagesToSend.Count + orchestratorMessages.Count + timerMessages.Count;
if (this.orchestrationService.IsMaxMessageCountExceeded(totalMessages, runtimeState))
{
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-MaxMessageCountReached",
runtimeState.OrchestrationInstance!,
"MaxMessageCount reached. Adding timer to process remaining events in next attempt.");
if (isCompleted || continuedAsNew)
{
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-OrchestrationAlreadyCompleted",
runtimeState.OrchestrationInstance!,
"Orchestration already completed. Skip adding timer for splitting messages.");
break;
}
var dummyTimer = new CreateTimerOrchestratorAction
{
Id = FrameworkConstants.FakeTimerIdToSplitDecision,
FireAt = DateTime.UtcNow
};
timerMessages.Add(this.ProcessCreateTimerDecision(
dummyTimer,
runtimeState,
isInternal: true));
isInterrupted = true;
break;
}
}
// correlation
CorrelationTraceClient.Propagate(() =>
{
if (runtimeState.ExecutionStartedEvent != null)
runtimeState.ExecutionStartedEvent.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
});
// finish up processing of the work item
if (!continuedAsNew && runtimeState.Events.Last().EventType != EventType.OrchestratorCompleted)
{
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
}
if (isCompleted)
{
TraceHelper.TraceSession(TraceEventType.Information, "TaskOrchestrationDispatcher-DeletingSessionState", workItem.InstanceId, "Deleting session state");
if (runtimeState.ExecutionStartedEvent != null)
{
instanceState = Utils.BuildOrchestrationState(runtimeState);
}
}
else
{
if (continuedAsNew)
{
TraceHelper.TraceSession(
TraceEventType.Information,
"TaskOrchestrationDispatcher-UpdatingStateForContinuation",
workItem.InstanceId,
"Updating state for continuation");
// correlation
CorrelationTraceClient.Propagate(() =>
{
continueAsNewExecutionStarted!.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
});
// Copy the distributed trace context, if any
continueAsNewExecutionStarted!.SetParentTraceContext(runtimeState.ExecutionStartedEvent);
runtimeState = new OrchestrationRuntimeState();
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
runtimeState.AddEvent(continueAsNewExecutionStarted!);
runtimeState.Status = workItem.OrchestrationRuntimeState.Status ?? carryOverStatus;
carryOverStatus = workItem.OrchestrationRuntimeState.Status;
if (carryOverEvents != null)
{
foreach (var historyEvent in carryOverEvents)
{
runtimeState.AddEvent(historyEvent);
}
}
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
workItem.OrchestrationRuntimeState = runtimeState;
workItem.Cursor = null;
}
instanceState = Utils.BuildOrchestrationState(runtimeState);
}
} while (continuedAsNew);
}
}
finally
{
if (renewTask != null)
{
try
{
renewCancellationTokenSource.Cancel();
await renewTask;
}
catch (ObjectDisposedException)
{
// ignore
}
catch (OperationCanceledException)
{
// ignore
}
}
}
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
// some backends expect the original runtime state object
workItem.OrchestrationRuntimeState = originalOrchestrationRuntimeState;
}
runtimeState.Status = runtimeState.Status ?? carryOverStatus;
if (instanceState != null)
{
instanceState.Status = runtimeState.Status;
}
await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
workItem,
runtimeState,
continuedAsNew ? null : messagesToSend,
orchestratorMessages,
continuedAsNew ? null : timerMessages,
continuedAsNewMessage,
instanceState);
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
workItem.OrchestrationRuntimeState = runtimeState;
}
return isCompleted || continuedAsNew || isInterrupted;
}
StopAsync() 方法
Stops the dispatcher to stop getting and processing orchestration events
停止调度程序,以停止获取和处理协调事件
public async Task StopAsync(bool forced)
{
await this.dispatcher.StopAsync(forced);
}
NonBlockingCountdownLock
一个 CountdownLock 的实现:
internal class NonBlockingCountdownLock
{
int available;
public NonBlockingCountdownLock(int available)
{
if (available <= 0)
{
throw new ArgumentOutOfRangeException(nameof(available));
}
this.available = available;
this.Capacity = available;
}
public int Capacity { get; }
public bool Acquire()
{
if (this.available <= 0)
{
return false;
}
if (Interlocked.Decrement(ref this.available) >= 0)
{
return true;
}
// the counter went negative - fix it
Interlocked.Increment(ref this.available);
return false;
}
public void Release()
{
Interlocked.Increment(ref this.available);
}
}
2 - WorkItemDispatcher源码学习
Dispatcher class for fetching and processing work items of the supplied type
调度程序类,用于获取和处理所提供类型的工作项
src\DurableTask.Core\WorkItemDispatcher.cs
类定义
public class WorkItemDispatcher<T> : IDisposable
类字段
const int DefaultMaxConcurrentWorkItems = 20;
const int DefaultDispatcherCount = 1;
const int BackOffIntervalOnInvalidOperationSecs = 10;
const int CountDownToZeroDelay = 5;
// ReSharper disable once StaticMemberInGenericType
static readonly TimeSpan DefaultReceiveTimeout = TimeSpan.FromSeconds(30);
readonly string id;
readonly string name;
readonly object thisLock = new object();
readonly SemaphoreSlim initializationLock = new SemaphoreSlim(1, 1);
volatile int concurrentWorkItemCount;
volatile int countDownToZeroDelay;
volatile int delayOverrideSecs;
volatile int activeFetchers;
bool isStarted;
SemaphoreSlim concurrencyLock;
CancellationTokenSource shutdownCancellationTokenSource;
方法定义
readonly Func<T, string> workItemIdentifier;
Func<TimeSpan, CancellationToken, Task<T>> FetchWorkItem { get; }
Func<T, Task> ProcessWorkItem { get; }
/// <summary>
/// Method to execute for safely releasing a work item
/// </summary>
public Func<T, Task> SafeReleaseWorkItem;
/// <summary>
/// Method to execute for aborting a work item
/// </summary>
public Func<T, Task> AbortWorkItem;
/// <summary>
/// Method to get a delay to wait after a fetch exception
/// </summary>
public Func<Exception, int> GetDelayInSecondsAfterOnFetchException = (exception) => 0;
/// <summary>
/// Method to get a delay to wait after a process exception
/// </summary>
public Func<Exception, int> GetDelayInSecondsAfterOnProcessException = (exception) => 0;
构造函数
public WorkItemDispatcher(
string name,
Func<T, string> workItemIdentifier,
Func<TimeSpan, CancellationToken, Task<T>> fetchWorkItem,
Func<T, Task> processWorkItem)
{
this.name = name;
this.id = Guid.NewGuid().ToString("N");
this.workItemIdentifier = workItemIdentifier ?? throw new ArgumentNullException(nameof(workItemIdentifier));
this.FetchWorkItem = fetchWorkItem ?? throw new ArgumentNullException(nameof(fetchWorkItem));
this.ProcessWorkItem = processWorkItem ?? throw new ArgumentNullException(nameof(processWorkItem));
}
对照 TaskOrchestrationDispatcher 中的初始化代码:
internal TaskOrchestrationDispatcher(
IOrchestrationService orchestrationService,
INameVersionObjectManager<TaskOrchestration> objectManager,
DispatchMiddlewarePipeline dispatchPipeline,
LogHelper logHelper,
ErrorPropagationMode errorPropagationMode)
{
......
// 初始化 dispatcher
this.dispatcher = new WorkItemDispatcher<TaskOrchestrationWorkItem>(
"TaskOrchestrationDispatcher",
item => item == null ? string.Empty : item.InstanceId,
this.OnFetchWorkItemAsync,
this.OnProcessWorkItemSessionAsync)
{
// 初始化 dispatcher 的字段
GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
GetDelayInSecondsAfterOnProcessException = orchestrationService.GetDelayInSecondsAfterOnProcessException,
SafeReleaseWorkItem = orchestrationService.ReleaseTaskOrchestrationWorkItemAsync,
AbortWorkItem = orchestrationService.AbandonTaskOrchestrationWorkItemAsync,
DispatcherCount = orchestrationService.TaskOrchestrationDispatcherCount,
MaxConcurrentWorkItems = orchestrationService.MaxConcurrentTaskOrchestrationWorkItems,
LogHelper = logHelper,
};
......
}
Func<T, string> workItemIdentifier 的实现是:
item => item == null ? string.Empty : item.InstanceId,
fetchWorkItem 的实现是 TaskOrchestrationDispatcher.this.OnFetchWorkItemAsync,fetchWorkItem 的实现是 TaskOrchestrationDispatcher.this.OnProcessWorkItemSessionAsync,
DispatcherCount 由 orchestrationService.TaskOrchestrationDispatcherCount 指定,
MaxConcurrentWorkItems 由 orchestrationService.MaxConcurrentTaskOrchestrationWorkItems 指定。
StartAsync() 方法
Starts the work item dispatcher:
public async Task StartAsync()
{
if (!this.isStarted)
{
await this.initializationLock.WaitAsync();
try
{
if (this.isStarted)
{
throw TraceHelper.TraceException(TraceEventType.Error, "WorkItemDispatcherStart-AlreadyStarted", new InvalidOperationException($"WorkItemDispatcher '{this.name}' has already started"));
}
this.concurrencyLock?.Dispose();
this.concurrencyLock = new SemaphoreSlim(this.MaxConcurrentWorkItems);
this.shutdownCancellationTokenSource?.Dispose();
this.shutdownCancellationTokenSource = new CancellationTokenSource();
this.isStarted = true;
TraceHelper.Trace(TraceEventType.Information, "WorkItemDispatcherStart", $"WorkItemDispatcher('{this.name}') starting. Id {this.id}.");
for (var i = 0; i < this.DispatcherCount; i++)
{
string dispatcherId = i.ToString();
// 创建 context
var context = new WorkItemDispatcherContext(this.name, this.id, dispatcherId);
this.LogHelper.DispatcherStarting(context);
// We just want this to Run we intentionally don't wait
// 我们只是想让它运行起来,我们故意不等待
#pragma warning disable 4014
Task.Run(() => this.DispatchAsync(context));
#pragma warning restore 4014
}
}
finally
{
this.initializationLock.Release();
}
}
}
DispatchAsync() 方法
async Task DispatchAsync(WorkItemDispatcherContext context)
{
string dispatcherId = context.DispatcherId;
bool logThrottle = true;
while (this.isStarted)
{
if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
{
if (logThrottle)
{
// This can happen frequently under heavy load.
// To avoid log spam, we log just once until we can proceed.
this.LogHelper.FetchingThrottled(
context,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-MaxOperations",
this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));
logThrottle = false;
}
continue;
}
logThrottle = true;
var delaySecs = 0;
T workItem = default(T);
try
{
Interlocked.Increment(ref this.activeFetchers);
this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-StartFetch",
this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
Stopwatch timer = Stopwatch.StartNew();
// 在这里开始 fetch workitem
workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);
if (!IsNull(workItem))
{
string workItemId = this.workItemIdentifier(workItem);
this.LogHelper.FetchWorkItemCompleted(
context,
workItemId,
timer.Elapsed,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
}
TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-EndFetch",
this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
}
catch (TimeoutException)
{
delaySecs = 0;
}
catch (TaskCanceledException exception)
{
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherDispatch-TaskCanceledException",
this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
catch (Exception exception)
{
if (!this.isStarted)
{
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherDispatch-HarmlessException",
this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
}
else
{
this.LogHelper.FetchWorkItemFailure(context, exception);
// TODO : dump full node context here
TraceHelper.TraceException(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-Exception",
exception,
this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
}
finally
{
Interlocked.Decrement(ref this.activeFetchers);
}
// 然后开始调度 workitem
var scheduledWorkItem = false;
if (!IsNull(workItem))
{
if (!this.isStarted)
{
if (this.SafeReleaseWorkItem != null)
{
await this.SafeReleaseWorkItem(workItem);
}
}
else
{
Interlocked.Increment(ref this.concurrentWorkItemCount);
// We just want this to Run we intentionally don't wait
#pragma warning disable 4014
// 开始 workitem 的处理
Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
#pragma warning restore 4014
scheduledWorkItem = true;
}
}
delaySecs = Math.Max(this.delayOverrideSecs, delaySecs);
if (delaySecs > 0)
{
await Task.Delay(TimeSpan.FromSeconds(delaySecs));
}
if (!scheduledWorkItem)
{
this.concurrencyLock.Release();
}
}
this.LogHelper.DispatcherStopped(context);
}
这里的 FetchWorkItem() 方法和 ProcessWorkItemAsync() 方法都是由构造函数传递的(实际由 TaskOrchestrationDispatcher 传递)。
StopAsync() 方法
Stops the dispatcher to stop getting and processing orchestration events
停止调度程序,以停止获取和处理协调事件
public async Task StopAsync(bool forced)
{
await this.dispatcher.StopAsync(forced);
}