WorkItemDispatcher源码学习
Dispatcher class for fetching and processing work items of the supplied type
调度程序类,用于获取和处理所提供类型的工作项
src\DurableTask.Core\WorkItemDispatcher.cs
类定义
public class WorkItemDispatcher<T> : IDisposable
类字段
const int DefaultMaxConcurrentWorkItems = 20;
const int DefaultDispatcherCount = 1;
const int BackOffIntervalOnInvalidOperationSecs = 10;
const int CountDownToZeroDelay = 5;
// ReSharper disable once StaticMemberInGenericType
static readonly TimeSpan DefaultReceiveTimeout = TimeSpan.FromSeconds(30);
readonly string id;
readonly string name;
readonly object thisLock = new object();
readonly SemaphoreSlim initializationLock = new SemaphoreSlim(1, 1);
volatile int concurrentWorkItemCount;
volatile int countDownToZeroDelay;
volatile int delayOverrideSecs;
volatile int activeFetchers;
bool isStarted;
SemaphoreSlim concurrencyLock;
CancellationTokenSource shutdownCancellationTokenSource;
方法定义
readonly Func<T, string> workItemIdentifier;
Func<TimeSpan, CancellationToken, Task<T>> FetchWorkItem { get; }
Func<T, Task> ProcessWorkItem { get; }
/// <summary>
/// Method to execute for safely releasing a work item
/// </summary>
public Func<T, Task> SafeReleaseWorkItem;
/// <summary>
/// Method to execute for aborting a work item
/// </summary>
public Func<T, Task> AbortWorkItem;
/// <summary>
/// Method to get a delay to wait after a fetch exception
/// </summary>
public Func<Exception, int> GetDelayInSecondsAfterOnFetchException = (exception) => 0;
/// <summary>
/// Method to get a delay to wait after a process exception
/// </summary>
public Func<Exception, int> GetDelayInSecondsAfterOnProcessException = (exception) => 0;
构造函数
public WorkItemDispatcher(
string name,
Func<T, string> workItemIdentifier,
Func<TimeSpan, CancellationToken, Task<T>> fetchWorkItem,
Func<T, Task> processWorkItem)
{
this.name = name;
this.id = Guid.NewGuid().ToString("N");
this.workItemIdentifier = workItemIdentifier ?? throw new ArgumentNullException(nameof(workItemIdentifier));
this.FetchWorkItem = fetchWorkItem ?? throw new ArgumentNullException(nameof(fetchWorkItem));
this.ProcessWorkItem = processWorkItem ?? throw new ArgumentNullException(nameof(processWorkItem));
}
对照 TaskOrchestrationDispatcher 中的初始化代码:
internal TaskOrchestrationDispatcher(
IOrchestrationService orchestrationService,
INameVersionObjectManager<TaskOrchestration> objectManager,
DispatchMiddlewarePipeline dispatchPipeline,
LogHelper logHelper,
ErrorPropagationMode errorPropagationMode)
{
......
// 初始化 dispatcher
this.dispatcher = new WorkItemDispatcher<TaskOrchestrationWorkItem>(
"TaskOrchestrationDispatcher",
item => item == null ? string.Empty : item.InstanceId,
this.OnFetchWorkItemAsync,
this.OnProcessWorkItemSessionAsync)
{
// 初始化 dispatcher 的字段
GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
GetDelayInSecondsAfterOnProcessException = orchestrationService.GetDelayInSecondsAfterOnProcessException,
SafeReleaseWorkItem = orchestrationService.ReleaseTaskOrchestrationWorkItemAsync,
AbortWorkItem = orchestrationService.AbandonTaskOrchestrationWorkItemAsync,
DispatcherCount = orchestrationService.TaskOrchestrationDispatcherCount,
MaxConcurrentWorkItems = orchestrationService.MaxConcurrentTaskOrchestrationWorkItems,
LogHelper = logHelper,
};
......
}
Func<T, string> workItemIdentifier 的实现是:
item => item == null ? string.Empty : item.InstanceId,
fetchWorkItem 的实现是 TaskOrchestrationDispatcher.this.OnFetchWorkItemAsync,fetchWorkItem 的实现是 TaskOrchestrationDispatcher.this.OnProcessWorkItemSessionAsync,
DispatcherCount 由 orchestrationService.TaskOrchestrationDispatcherCount 指定,
MaxConcurrentWorkItems 由 orchestrationService.MaxConcurrentTaskOrchestrationWorkItems 指定。
StartAsync() 方法
Starts the work item dispatcher:
public async Task StartAsync()
{
if (!this.isStarted)
{
await this.initializationLock.WaitAsync();
try
{
if (this.isStarted)
{
throw TraceHelper.TraceException(TraceEventType.Error, "WorkItemDispatcherStart-AlreadyStarted", new InvalidOperationException($"WorkItemDispatcher '{this.name}' has already started"));
}
this.concurrencyLock?.Dispose();
this.concurrencyLock = new SemaphoreSlim(this.MaxConcurrentWorkItems);
this.shutdownCancellationTokenSource?.Dispose();
this.shutdownCancellationTokenSource = new CancellationTokenSource();
this.isStarted = true;
TraceHelper.Trace(TraceEventType.Information, "WorkItemDispatcherStart", $"WorkItemDispatcher('{this.name}') starting. Id {this.id}.");
for (var i = 0; i < this.DispatcherCount; i++)
{
string dispatcherId = i.ToString();
// 创建 context
var context = new WorkItemDispatcherContext(this.name, this.id, dispatcherId);
this.LogHelper.DispatcherStarting(context);
// We just want this to Run we intentionally don't wait
// 我们只是想让它运行起来,我们故意不等待
#pragma warning disable 4014
Task.Run(() => this.DispatchAsync(context));
#pragma warning restore 4014
}
}
finally
{
this.initializationLock.Release();
}
}
}
DispatchAsync() 方法
async Task DispatchAsync(WorkItemDispatcherContext context)
{
string dispatcherId = context.DispatcherId;
bool logThrottle = true;
while (this.isStarted)
{
if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
{
if (logThrottle)
{
// This can happen frequently under heavy load.
// To avoid log spam, we log just once until we can proceed.
this.LogHelper.FetchingThrottled(
context,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-MaxOperations",
this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));
logThrottle = false;
}
continue;
}
logThrottle = true;
var delaySecs = 0;
T workItem = default(T);
try
{
Interlocked.Increment(ref this.activeFetchers);
this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-StartFetch",
this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
Stopwatch timer = Stopwatch.StartNew();
// 在这里开始 fetch workitem
workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);
if (!IsNull(workItem))
{
string workItemId = this.workItemIdentifier(workItem);
this.LogHelper.FetchWorkItemCompleted(
context,
workItemId,
timer.Elapsed,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
}
TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-EndFetch",
this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
}
catch (TimeoutException)
{
delaySecs = 0;
}
catch (TaskCanceledException exception)
{
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherDispatch-TaskCanceledException",
this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
catch (Exception exception)
{
if (!this.isStarted)
{
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherDispatch-HarmlessException",
this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
}
else
{
this.LogHelper.FetchWorkItemFailure(context, exception);
// TODO : dump full node context here
TraceHelper.TraceException(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-Exception",
exception,
this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
}
finally
{
Interlocked.Decrement(ref this.activeFetchers);
}
// 然后开始调度 workitem
var scheduledWorkItem = false;
if (!IsNull(workItem))
{
if (!this.isStarted)
{
if (this.SafeReleaseWorkItem != null)
{
await this.SafeReleaseWorkItem(workItem);
}
}
else
{
Interlocked.Increment(ref this.concurrentWorkItemCount);
// We just want this to Run we intentionally don't wait
#pragma warning disable 4014
// 开始 workitem 的处理
Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
#pragma warning restore 4014
scheduledWorkItem = true;
}
}
delaySecs = Math.Max(this.delayOverrideSecs, delaySecs);
if (delaySecs > 0)
{
await Task.Delay(TimeSpan.FromSeconds(delaySecs));
}
if (!scheduledWorkItem)
{
this.concurrencyLock.Release();
}
}
this.LogHelper.DispatcherStopped(context);
}
这里的 FetchWorkItem() 方法和 ProcessWorkItemAsync() 方法都是由构造函数传递的(实际由 TaskOrchestrationDispatcher 传递)。
StopAsync() 方法
Stops the dispatcher to stop getting and processing orchestration events
停止调度程序,以停止获取和处理协调事件
public async Task StopAsync(bool forced)
{
await this.dispatcher.StopAsync(forced);
}