durabletask源码学习
- 1: DurableTask quickstart
- 1.1: DurableTask quickstart
- 1.2: quickstart原始日志
- 1.3: quickstart行为分析
- 1.4: quickstart事件顺序
- 2: DurableTask 主要流程
- 2.1: start instance 流程
- 2.2: Fetch work item 流程
- 2.2.1: OrchestrationSessionManager
- 2.2.2: AzureStorageOrchestrationService
- 3: DurableTask Core源码学习
- 3.1: 核心编程模型
- 3.1.1: Orchestration
- 3.1.1.1: OrchestrationInstance
- 3.1.1.2: OrchestrationContext
- 3.1.1.3: OrchestrationState
- 3.1.2: Activity
- 3.1.2.1: TaskActivity
- 3.1.2.2: TaskContext
- 3.1.3: OrchestrationExecution
- 3.1.3.1: OrchestrationExecutionContext
- 3.1.3.2: OrchestrationExecutionCursor
- 3.1.4: TaskOrchestration
- 3.1.4.1: TaskOrchestration
- 3.1.4.2: TaskOrchestrationContext
- 3.1.5: OrchestrationState
- 3.1.5.1: OrchestrationState
- 3.1.5.2: OrchestrationStateQuery
- 3.1.5.3: OrchestrationStateQueryFilter
- 3.1.5.4: OrchestrationStateInstanceFilter
- 3.1.5.5: OrchestrationStateNameVersionFilter
- 3.1.6: Entity
- 3.1.6.1: TaskEntity
- 3.1.6.2: EntityId
- 3.1.7: History
- 3.1.7.1: History概述
- 3.1.7.2: HistoryEvent事件
- 3.1.7.3: ExecutionStartedEvent事件
- 3.1.7.4: OrchestratorStartedEvent事件
- 3.2: DurableTask Dispatcher源码学习
- 3.2.1: TaskOrchestrationDispatcher源码学习
- 3.2.2: WorkItemDispatcher源码学习
- 3.3: DurableTask Orchestration源码学习
- 3.3.1: IOrchestrationService
- 3.3.2: IOrchestrationServiceClient
- 3.3.3: IOrchestrationServiceInstanceStore
1 - DurableTask quickstart
1.1 - DurableTask quickstart
以 DurableTask.Samples 为例
准备工作
azure storage
为了运行 sample,必须先准备好 azure storage,然后修改 samples\DurableTask.Samples\App.config 文件中的 StorageConnectionString:
<appSettings>
<add key="StorageConnectionString" value="DefaultEndpointsProtocol=https;AccountName=skyversioningdev;AccountKey=5dxW9E4xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx;EndpointSuffix=core.windows.net" />
<add key="taskHubName" value="SamplesHub" />
<add key="SmtpNetworkCredentials" value="" />
</appSettings>
dotnet 4.6.2 版本
打开 samples\DurableTask.Samples\DurableTask.Samples.csproj 可以看到对 .net 的要求是 .net 4.6.2
<PropertyGroup>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<OutputType>Exe</OutputType>
<TargetFramework>net462</TargetFramework>
</PropertyGroup>
用 .net 8.0 可以正常运行,不用单独再安装 .net 4.6.2,但如果要 debug,则必须再单独安装 .net 4.6.2
TBD:要不要安装 .net 4.6.2 后面再验证一下。
运行 samples
构建
samples\DurableTask.Samples\
dotnet build
运行
以 Greetings sample 为例:
dotnet run -s Greetings
注意这个sample要求弹出一个窗口,因此不能在无界面的纯命令行下, 需要在 windows 界面下用 cmd 或者 bash 操作。否则会报错:
EventId : 4, Level : Warning, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : abdb59a4-a456-4b14-b1d6-c77a277f9199] [executionId : d23194
e7550c4f8da41d333911c80647] [sessionId : ] [message : Instance Id '[InstanceId: abdb59a4-a456-4b14-b1d6-c77a277f9199, ExecutionId: d23194e7550c4f8da41d333911c80647]' complet
ed in state Failed with result: Showing a modal dialog box or form when the application is not running in UserInteractive mode is not a valid operation. Specify the ServiceN
otification or DefaultDesktopOnly style to display a notification from a service application.] [exception : ] [eventType : TaskOrchestrationDispatcher-InstanceCompleted] , E
ventName : WarningInfo, Timestamp : 2024-04-28T01:39:38.0885765Z, ProcessId : 5716, ThreadId : 2952, ActivityId : d03ff6a5-eafb-4577-8b61-89cd486db722
"Reason": "Showing a modal dialog box or form when the application is not running in UserInteractive mode is not a valid operation. Specify the ServiceNotification or
DefaultDesktopOnly style to display a notification from a service application.",
1.2 - quickstart原始日志
命令开始执行:
$ dotnet run -c debug -s Greetings
Start Orchestration: Greetings
日志为:
dotnet run -c debug -s Greetings
sleep 30 seconds
waiting for debugger to attach
done waiting for debugger to attach
Start Orchestration: Greetings
Workflow Instance Started: [InstanceId: 16af7b96-9409-4618-b33a-e7cc3a121a1a, ExecutionId: 410f7f47fdd9444ab3fb3ce6d00e9649]
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : WorkItemDispatcher('TaskOrchestrationDispatcher') starting. Id e0bc782c9aa84a0a8a256cf7a5d4e06e.] [info : ] [eventType : WorkItemDispatcherStart] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:56.3468829Z, ProcessId : 6536, ThreadId : 14068
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : WorkItemDispatcher('TaskActivityDispatcher') starting. Id 38b3e9add82043708a8643b81cf2f643.] [info : ] [eventType : WorkItemDispatcherStart] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:56.3625156Z, ProcessId : 6536, ThreadId : 14068
Waiting up to 60 seconds for completion.
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting fetch with timeout of 00:00:30 (0/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:56.3625156Z, ProcessId : 6536, ThreadId : 12032
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Starting fetch with timeout of 00:00:30 (0/10 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:56.3625156Z, ProcessId : 6536, ThreadId : 9632
********* versioning ************* LockNextTaskOrchestrationWorkItemAsync()
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: After fetch (2961 ms) (0/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-EndFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:59.3155825Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting fetch with timeout of 00:00:30 (1/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:59.3155825Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting to process workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-Begin] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.3155825Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing new event with Id -1 and type ExecutionStarted] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessEvent] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.3312077Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executing user orchestration: {
"$type": "DurableTask.Core.OrchestrationRuntimeStateDump, DurableTask.Core",
"Events": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3312077Z"
},
{
"$type": "DurableTask.Core.History.ExecutionStartedEvent, DurableTask.Core",
"OrchestrationInstance": null,
"EventType": 0,
"ParentInstance": null,
"Name": null,
"Version": null,
"Input": "[..snipped..]",
"Tags": null,
"Correlation": null,
"ParentTraceContext": null,
"ScheduledStartTime": null,
"Generation": null,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:54.3937812Z"
}
],
"NewEvents": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3312077Z"
},
{
"$type": "DurableTask.Core.History.ExecutionStartedEvent, DurableTask.Core",
"OrchestrationInstance": null,
"EventType": 0,
"ParentInstance": null,
"Name": null,
"Version": null,
"Input": "[..snipped..]",
"Tags": null,
"Correlation": null,
"ParentTraceContext": null,
"ScheduledStartTime": null,
"Generation": null,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:54.3937812Z"
}
],
"EventCount": 0,
"NewEventsCount": 0
}] [info : ] [eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:59.3312077Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
ProcessEvent: EventType=ExecutionStarted
GreetingsOrchestration.RunTask called
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executed user orchestration. Received 1 orchestrator actions: 0:ScheduleOrchestrator] [info : ] [eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.3468324Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing orchestrator action of type ScheduleOrchestrator] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessOrchestratorAction] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.3468324Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
OnProcessWorkItemAsync - Processing orchestrator action of type ScheduleOrchestrator
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: After fetch (3161 ms) (0/10 max)] [info : ] [eventType : WorkItemDispatcherDispatch-EndFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:59.5187069Z, ProcessId : 6536, ThreadId : 13976, ActivityId : 1e0c99d8-4905-4149-a790-332669c907af
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Starting fetch with timeout of 00:00:30 (1/10 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:59.5187069Z, ProcessId : 6536, ThreadId : 13976, ActivityId : 1e0c99d8-4905-4149-a790-332669c907af
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Starting to process workItem 585e3de9-e953-4f39-af4f-c4a9b7bcd657] [info : ] [eventType : WorkItemDispatcherProcess-Begin] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.5187069Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
Waiting for user to enter name...
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Finished processing workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.8155822Z, ProcessId : 6536, ThreadId : 13976, ActivityId : 1e0c99d8-4905-4149-a790-332669c907af
User Name Entered: sky
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Finished processing workItem 585e3de9-e953-4f39-af4f-c4a9b7bcd657] [info : ] [eventType : WorkItemDispatcherProcess-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.3624392Z, ProcessId : 6536, ThreadId : 11248
********* versioning ************* LockNextTaskOrchestrationWorkItemAsync()
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: After fetch (5214 ms) (0/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-EndFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 14068, ActivityId : f698e9a6-907c-4092-bd48-e19e7ba6a2a9
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting fetch with timeout of 00:00:30 (1/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 14068, ActivityId : f698e9a6-907c-4092-bd48-e19e7ba6a2a9
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting to process workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-Begin] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 9632, ActivityId : fec9f5fa-cfa6-4115-95f1-7f40e3e9d6aa
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing new event with Id -1 and type TaskCompleted] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessEvent] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 9632, ActivityId : fec9f5fa-cfa6-4115-95f1-7f40e3e9d6aa
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executing user orchestration: {
"$type": "DurableTask.Core.OrchestrationRuntimeStateDump, DurableTask.Core",
"Events": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3312077Z"
},
{
"$type": "DurableTask.Core.History.ExecutionStartedEvent, DurableTask.Core",
"OrchestrationInstance": null,
"EventType": 0,
"ParentInstance": null,
"Name": null,
"Version": null,
"Input": "[..snipped..]",
"Tags": null,
"Correlation": null,
"ParentTraceContext": null,
"ScheduledStartTime": null,
"Generation": null,
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:50:54.3937812Z"
},
{
"$type": "DurableTask.Core.History.TaskScheduledEvent, DurableTask.Core",
"EventType": 4,
"Name": "DurableTask.Samples.Greetings.GetUserTask",
"Version": "",
"Input": "[..snipped..]",
"ParentTraceContext": null,
"EventId": 0,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorCompletedEvent, DurableTask.Core",
"EventType": 13,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
},
{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 0,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.1905617Z"
}
],
"NewEvents": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
},
{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 0,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.1905617Z"
}
],
"EventCount": 0,
"NewEventsCount": 0
}] [info : ] [eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 9632, ActivityId : fec9f5fa-cfa6-4115-95f1-7f40e3e9d6aa
ProcessEvent: EventType=ExecutionStarted
GreetingsOrchestration.RunTask called
ProcessEvent: EventType=TaskScheduled
ProcessEvent: EventType=OrchestratorCompleted
ProcessEvent: EventType=TaskCompleted
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executed user orchestration. Received 1 orchestrator actions: 1:ScheduleOrchestrator] [info : ] [eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 9632, ActivityId : fec9f5fa-cfa6-4115-95f1-7f40e3e9d6aa
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing orchestrator action of type ScheduleOrchestrator] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessOrchestratorAction] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 9632, ActivityId : fec9f5fa-cfa6-4115-95f1-7f40e3e9d6aa
OnProcessWorkItemAsync - Processing orchestrator action of type ScheduleOrchestrator
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: After fetch (5183 ms) (0/10 max)] [info : ] [eventType : WorkItemDispatcherDispatch-EndFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:04.7061869Z, ProcessId : 6536, ThreadId : 11248, ActivityId : 413409db-6e47-413a-b78d-319a802d563c
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Starting fetch with timeout of 00:00:30 (1/10 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:04.7061869Z, ProcessId : 6536, ThreadId : 11248, ActivityId : 413409db-6e47-413a-b78d-319a802d563c
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Starting to process workItem d9bb55c6-f35e-495b-994e-b6bd106372b1] [info : ] [eventType : WorkItemDispatcherProcess-Begin] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.7061869Z, ProcessId : 6536, ThreadId : 9632, ActivityId : f698e9a6-907c-4092-bd48-e19e7ba6a2a9
Sending greetings to user: sky...
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Finished processing workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.9093117Z, ProcessId : 6536, ThreadId : 13976, ActivityId : f698e9a6-907c-4092-bd48-e19e7ba6a2a9
Greeting sent to sky
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Finished processing workItem d9bb55c6-f35e-495b-994e-b6bd106372b1] [info : ] [eventType : WorkItemDispatcherProcess-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:09.8780623Z, ProcessId : 6536, ThreadId : 10076, ActivityId : 1946c0da-30f0-4767-9aff-a5ff1e2bb884
********* versioning ************* LockNextTaskOrchestrationWorkItemAsync()
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: After fetch (5515 ms) (0/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-EndFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting fetch with timeout of 00:00:30 (1/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting to process workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-Begin] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing new event with Id -1 and type TaskCompleted] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessEvent] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executing user orchestration: {
"$type": "DurableTask.Core.OrchestrationRuntimeStateDump, DurableTask.Core",
"Events": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3312077Z"
},
{
"$type": "DurableTask.Core.History.ExecutionStartedEvent, DurableTask.Core",
"OrchestrationInstance": null,
"EventType": 0,
"ParentInstance": null,
"Name": null,
"Version": null,
"Input": "[..snipped..]",
"Tags": null,
"Correlation": null,
"ParentTraceContext": null,
"ScheduledStartTime": null,
"Generation": null,
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:50:54.3937812Z"
},
{
"$type": "DurableTask.Core.History.TaskScheduledEvent, DurableTask.Core",
"EventType": 4,
"Name": "DurableTask.Samples.Greetings.GetUserTask",
"Version": "",
"Input": "[..snipped..]",
"ParentTraceContext": null,
"EventId": 0,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorCompletedEvent, DurableTask.Core",
"EventType": 13,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
},
{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 0,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:04.1905617Z"
},
{
"$type": "DurableTask.Core.History.TaskScheduledEvent, DurableTask.Core",
"EventType": 4,
"Name": "DurableTask.Samples.Greetings.SendGreetingTask",
"Version": "",
"Input": "[..snipped..]",
"ParentTraceContext": null,
"EventId": 1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorCompletedEvent, DurableTask.Core",
"EventType": 13,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
},
{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 1,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:09.7222239Z"
}
],
"NewEvents": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
},
{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 1,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:09.7222239Z"
}
],
"EventCount": 0,
"NewEventsCount": 0
}] [info : ] [eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
ProcessEvent: EventType=ExecutionStarted
GreetingsOrchestration.RunTask called
ProcessEvent: EventType=TaskScheduled
ProcessEvent: EventType=OrchestratorCompleted
ProcessEvent: EventType=TaskCompleted
ProcessEvent: EventType=TaskScheduled
ProcessEvent: EventType=OrchestratorCompleted
ProcessEvent: EventType=TaskCompleted
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executed user orchestration. Received 1 orchestrator actions: 2:OrchestrationComplete] [info : ] [eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing orchestrator action of type OrchestrationComplete] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessOrchestratorAction] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
OnProcessWorkItemAsync - Processing orchestrator action of type OrchestrationComplete
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Instance Id '[InstanceId: 16af7b96-9409-4618-b33a-e7cc3a121a1a, ExecutionId: 410f7f47fdd9444ab3fb3ce6d00e9649]' completed in state Completed with result: "Greeting sent to sky"] [info : ] [eventType : TaskOrchestrationDispatcher-InstanceCompleted] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : {{
"$type": "DurableTask.Core.OrchestrationRuntimeStateDump, DurableTask.Core",
"Events": [
{{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3312077Z"
}},
{{
"$type": "DurableTask.Core.History.ExecutionStartedEvent, DurableTask.Core",
"OrchestrationInstance": null,
"EventType": 0,
"ParentInstance": null,
"Name": null,
"Version": null,
"Input": "[..snipped..]",
"Tags": null,
"Correlation": null,
"ParentTraceContext": null,
"ScheduledStartTime": null,
"Generation": null,
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:50:54.3937812Z"
}},
{{
"$type": "DurableTask.Core.History.TaskScheduledEvent, DurableTask.Core",
"EventType": 4,
"Name": "DurableTask.Samples.Greetings.GetUserTask",
"Version": "",
"Input": "[..snipped..]",
"ParentTraceContext": null,
"EventId": 0,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
}},
{{
"$type": "DurableTask.Core.History.OrchestratorCompletedEvent, DurableTask.Core",
"EventType": 13,
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
}},
{{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
}},
{{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 0,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:04.1905617Z"
}},
{{
"$type": "DurableTask.Core.History.TaskScheduledEvent, DurableTask.Core",
"EventType": 4,
"Name": "DurableTask.Samples.Greetings.SendGreetingTask",
"Version": "",
"Input": "[..snipped..]",
"ParentTraceContext": null,
"EventId": 1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
}},
{{
"$type": "DurableTask.Core.History.OrchestratorCompletedEvent, DurableTask.Core",
"EventType": 13,
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
}},
{{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
}},
{{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 1,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:09.7222239Z"
}},
{{
"$type": "DurableTask.Core.History.ExecutionCompletedEvent, DurableTask.Core",
"EventType": 1,
"OrchestrationStatus": 1,
"Result": "[..snipped..]",
"FailureDetails": null,
"EventId": 2,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
}}
],
"NewEvents": [
{{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
}},
{{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 1,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:09.7222239Z"
}},
{{
"$type": "DurableTask.Core.History.ExecutionCompletedEvent, DurableTask.Core",
"EventType": 1,
"OrchestrationStatus": 1,
"Result": "[..snipped..]",
"FailureDetails": null,
"EventId": 2,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
}}
],
"EventCount": 0,
"NewEventsCount": 0
}}] [info : ] [eventType : TaskOrchestrationDispatcher-InstanceCompletionEvents] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [message : Deleting session state] [info : ] [eventType : TaskOrchestrationDispatcher-DeletingSessionState] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Finished processing workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.3624424Z, ProcessId : 6536, ThreadId : 13976, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
Task done: Completed
Press any key to quit.
1.3 - quickstart行为分析
命令开始执行:
$ dotnet run -c debug -s Greetings
Start Orchestration: Greetings
start workflow instance
Workflow Instance 开始,生成 InstanceId 和 ExecutionId:
Workflow Instance Started: [InstanceId: 16af7b96-9409-4618-b33a-e7cc3a121a1a, ExecutionId: 410f7f47fdd9444ab3fb3ce6d00e9649]
然后产生了两个 Event (为什么 EventId 都是 3?)TaskOrchestrationDispatcher 和 TaskActivityDispatcher 启动,eventType 都是 WorkItemDispatcherStart:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : WorkItemDispatcher('TaskOrchestrationDispatcher') starting. Id e0bc782c9aa84a0a8a256cf7a5d4e06e.] [info : ] [eventType : WorkItemDispatcherStart] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:56.3468829Z, ProcessId : 6536, ThreadId : 14068
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : WorkItemDispatcher('TaskActivityDispatcher') starting. Id 38b3e9add82043708a8643b81cf2f643.] [info : ] [eventType : WorkItemDispatcherStart] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:56.3625156Z, ProcessId : 6536, ThreadId : 14068
超时时间为 60 秒:
Waiting up to 60 seconds for completion.
TaskOrchestrationDispatcher 和 TaskActivityDispatcher 开始 fetch:
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting fetch with timeout of 00:00:30 (0/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:56.3625156Z, ProcessId : 6536, ThreadId : 12032
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Starting fetch with timeout of 00:00:30 (0/10 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:56.3625156Z, ProcessId : 6536, ThreadId : 9632
这是我额外增加的日志,LockNextTaskOrchestrationWorkItemAsync 方法被调用了:
********* versioning ************* LockNextTaskOrchestrationWorkItemAsync()
Orchestration第一轮
ExecutionStarted
TaskOrchestrationDispatcher 成功 fetch 到一个 workitem,然后又继续去 fetch 下一个:
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: After fetch (2961 ms) (0/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-EndFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:59.3155825Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting fetch with timeout of 00:00:30 (1/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:59.3155825Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
TaskOrchestrationDispatcher 开始处理刚才 fetch 到的 workItem
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting to process workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-Begin] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.3155825Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
TaskOrchestrationDispatcher 开始处理 ExecutionStarted Event:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing new event with Id -1 and type ExecutionStarted] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessEvent] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.3312077Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
开始执行 user orchestration:
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executing user orchestration:
此时有两个 event ( OrchestratorStartedEvent 和 ExecutionStartedEvent), 这两个 event 都是 newEvent:
{
"$type": "DurableTask.Core.OrchestrationRuntimeStateDump, DurableTask.Core",
"Events": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3312077Z"
},
{
"$type": "DurableTask.Core.History.ExecutionStartedEvent, DurableTask.Core",
"OrchestrationInstance": null,
"EventType": 0,
"ParentInstance": null,
"Name": null,
"Version": null,
"Input": "[..snipped..]",
"Tags": null,
"Correlation": null,
"ParentTraceContext": null,
"ScheduledStartTime": null,
"Generation": null,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:54.3937812Z"
}
],
"NewEvents": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3312077Z"
},
{
"$type": "DurableTask.Core.History.ExecutionStartedEvent, DurableTask.Core",
"OrchestrationInstance": null,
"EventType": 0,
"ParentInstance": null,
"Name": null,
"Version": null,
"Input": "[..snipped..]",
"Tags": null,
"Correlation": null,
"ParentTraceContext": null,
"ScheduledStartTime": null,
"Generation": null,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:54.3937812Z"
}
],
"EventCount": 0,
"NewEventsCount": 0
}
执行用户代码:GreetingsOrchestration
开始执行用户的编排代码,
[eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:59.3312077Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
ProcessEvent: EventType=ExecutionStarted
GreetingsOrchestration.RunTask called
GreetingsOrchestration.RunTask called 是我增加的日志,这是 example 中的 Orchestration 的实现代码:
public class GreetingsOrchestration : TaskOrchestration<string,string>
{
public override async Task<string> RunTask(OrchestrationContext context, string input)
{
Console.WriteLine("GreetingsOrchestration.RunTask called");
string user = await context.ScheduleTask<string>(typeof(GetUserTask));
string greeting = await context.ScheduleTask<string>(typeof(SendGreetingTask), user);
return greeting;
}
}
Orchestration 执行完成,Executed user orchestration ,然后收到一个 orchestrator action ScheduleOrchestrator:
但这个有点质疑, task 还没完成了,可能是 await 退出了??等下翻一下源码:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executed user orchestration. Received 1 orchestrator actions: 0:ScheduleOrchestrator] [info : ] [eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.3468324Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
TaskOrchestrationDispatcher 开始处理 orchestrator action ScheduleOrchestrator:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing orchestrator action of type ScheduleOrchestrator] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessOrchestratorAction] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.3468324Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
OnProcessWorkItemAsync - Processing orchestrator action of type ScheduleOrchestrator
OnProcessWorkItemAsync - Processing orchestrator action of type ScheduleOrchestrator 是我增加的日志。
执行用户代码:GetUserTask
这个时候 TaskActivityDispatcher 终于 fetch 到一个 workitem 了:
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: After fetch (3161 ms) (0/10 max)] [info : ] [eventType : WorkItemDispatcherDispatch-EndFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:59.5187069Z, ProcessId : 6536, ThreadId : 13976, ActivityId : 1e0c99d8-4905-4149-a790-332669c907af
TaskActivityDispatcher 继续 fetch 下一个:
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Starting fetch with timeout of 00:00:30 (1/10 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:50:59.5187069Z, ProcessId : 6536, ThreadId : 13976, ActivityId : 1e0c99d8-4905-4149-a790-332669c907af
TaskActivityDispatcher 开始处理刚才 fetch 到的 work item:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Starting to process workItem 585e3de9-e953-4f39-af4f-c4a9b7bcd657] [info : ] [eventType : WorkItemDispatcherProcess-Begin] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.5187069Z, ProcessId : 6536, ThreadId : 12032, ActivityId : d85171a0-5ef1-4e78-a32b-0d31cb396de7
这个 sample 里面就两个 task,第一个 task 是 GetUserTask:
string user = await context.ScheduleTask<string>(typeof(GetUserTask));
string greeting = await context.ScheduleTask<string>(typeof(SendGreetingTask), user);
return greeting;
会弹出一个窗口要求输入用户的 name,日志如下:
Waiting for user to enter name...
输入名字 sky 之后,这个 task 执行完成,日志显示 Finished processing workItem, 这是 TaskOrchestrationDispatcher 打印的:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Finished processing workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:50:59.8155822Z, ProcessId : 6536, ThreadId : 13976, ActivityId : 1e0c99d8-4905-4149-a790-332669c907af
User Name Entered: sky
TaskActivityDispatcher 又打印了一次 Finished processing workItem 的日志:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Finished processing workItem 585e3de9-e953-4f39-af4f-c4a9b7bcd657] [info : ] [eventType : WorkItemDispatcherProcess-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.3624392Z, ProcessId : 6536, ThreadId : 11248
TaskOrchestrationDispatcher 又一次 fetch 到 workitem:
********* versioning ************* LockNextTaskOrchestrationWorkItemAsync()
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: After fetch (5214 ms) (0/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-EndFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 14068, ActivityId : f698e9a6-907c-4092-bd48-e19e7ba6a2a9
TaskOrchestrationDispatcher 继续 fetch 下一个 workitem :
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting fetch with timeout of 00:00:30 (1/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 14068, ActivityId : f698e9a6-907c-4092-bd48-e19e7ba6a2a9
TaskOrchestrationDispatcher 开始处理刚才 fetch 到的 workitem:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting to process workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-Begin] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 9632, ActivityId : fec9f5fa-cfa6-4115-95f1-7f40e3e9d6aa
原来是 TaskCompleted Event:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing new event with Id -1 and type TaskCompleted] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessEvent] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 9632, ActivityId : fec9f5fa-cfa6-4115-95f1-7f40e3e9d6aa
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executing user orchestration:
user orchestration 除了之前的两个 event(OrchestratorStartedEvent 和 ExecutionStartedEvent)之外,还多了 TaskScheduledEvent(GetUserTask),OrchestratorCompletedEvent , OrchestratorStartedEvent, TaskCompletedEvent。
{
"$type": "DurableTask.Core.OrchestrationRuntimeStateDump, DurableTask.Core",
"Events": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3312077Z"
},
{
"$type": "DurableTask.Core.History.ExecutionStartedEvent, DurableTask.Core",
"OrchestrationInstance": null,
"EventType": 0,
"ParentInstance": null,
"Name": null,
"Version": null,
"Input": "[..snipped..]",
"Tags": null,
"Correlation": null,
"ParentTraceContext": null,
"ScheduledStartTime": null,
"Generation": null,
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:50:54.3937812Z"
},
{
"$type": "DurableTask.Core.History.TaskScheduledEvent, DurableTask.Core",
"EventType": 4,
"Name": "DurableTask.Samples.Greetings.GetUserTask",
"Version": "",
"Input": "[..snipped..]",
"ParentTraceContext": null,
"EventId": 0,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorCompletedEvent, DurableTask.Core",
"EventType": 13,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
},
{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 0,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.1905617Z"
}
],
"NewEvents": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
},
{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 0,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.1905617Z"
}
],
"EventCount": 0,
"NewEventsCount": 0
}
其中有一个 OrchestratorStartedEvent 是新的,还有 TaskCompletedEvent。
Orchestration第二轮
TaskOrchestrationDispatcher 再次 Execute User Orchestration, 这里我增加了日志打印,把 ProcessEvent 的 evnet 都打印了出来:
[info : ] [eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 9632, ActivityId : fec9f5fa-cfa6-4115-95f1-7f40e3e9d6aa
ProcessEvent: EventType=ExecutionStarted
GreetingsOrchestration.RunTask called
ProcessEvent: EventType=TaskScheduled
ProcessEvent: EventType=OrchestratorCompleted
ProcessEvent: EventType=TaskCompleted
Executed user orchestration 执行完之后,又收到了一个 orchestrator action ScheduleOrchestrator:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executed user orchestration. Received 1 orchestrator actions: 1:ScheduleOrchestrator] [info : ] [eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 9632, ActivityId : fec9f5fa-cfa6-4115-95f1-7f40e3e9d6aa
TaskOrchestrationDispatcher 开始执行 ScheduleOrchestrator 这个 orchestrator action:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing orchestrator action of type ScheduleOrchestrator] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessOrchestratorAction] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.5343127Z, ProcessId : 6536, ThreadId : 9632, ActivityId : fec9f5fa-cfa6-4115-95f1-7f40e3e9d6aa
OnProcessWorkItemAsync - Processing orchestrator action of type ScheduleOrchestrator
执行用户代码:SendGreetingTask
TaskActivityDispatcher 又一次 fetch 到 workitem:
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: After fetch (5183 ms) (0/10 max)] [info : ] [eventType : WorkItemDispatcherDispatch-EndFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:04.7061869Z, ProcessId : 6536, ThreadId : 11248, ActivityId : 413409db-6e47-413a-b78d-319a802d563c
TaskActivityDispatcher 继续 fetch 下一个
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Starting fetch with timeout of 00:00:30 (1/10 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:04.7061869Z, ProcessId : 6536, ThreadId : 11248, ActivityId : 413409db-6e47-413a-b78d-319a802d563c
TaskActivityDispatcher 开始处理刚才 fetch 到的 work item
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Starting to process workItem d9bb55c6-f35e-495b-994e-b6bd106372b1] [info : ] [eventType : WorkItemDispatcherProcess-Begin] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.7061869Z, ProcessId : 6536, ThreadId : 9632, ActivityId : f698e9a6-907c-4092-bd48-e19e7ba6a2a9
也就是执行第二个task,SendGreetingTask,表现为打印如下日志:
Sending greetings to user: sky...
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Finished processing workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:04.9093117Z, ProcessId : 6536, ThreadId : 13976, ActivityId : f698e9a6-907c-4092-bd48-e19e7ba6a2a9
Greeting sent to sky
在两个日志打印 Sending greetings to user: sky... 和 Sending greetings to user: sky... 之间,有一个 Thread.Sleep(5 * 1000); :
public sealed class SendGreetingTask : TaskActivity<string, string>
{
protected override string Execute(DurableTask.Core.TaskContext context, string user)
{
string message;
if (!string.IsNullOrWhiteSpace(user) && user.Equals("TimedOut"))
{
message = "GetUser Timed out!!!";
Console.WriteLine(message);
}
else
{
Console.WriteLine("Sending greetings to user: " + user + "...");
Thread.Sleep(5 * 1000);
message = "Greeting sent to " + user;
Console.WriteLine(message);
}
return message;
}
}
TBD: 这个 sleep 有问题,按说不能直接这样 sleep 的。
TaskActivityDispatcher 完成第二个 work item (SendGreetingTask) 的处理
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskActivityDispatcher-38b3e9add82043708a8643b81cf2f643-0: Finished processing workItem d9bb55c6-f35e-495b-994e-b6bd106372b1] [info : ] [eventType : WorkItemDispatcherProcess-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:09.8780623Z, ProcessId : 6536, ThreadId : 10076, ActivityId : 1946c0da-30f0-4767-9aff-a5ff1e2bb884
TaskOrchestrationDispatcher 继续 fetch 到下一个 work item
********* versioning ************* LockNextTaskOrchestrationWorkItemAsync()
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: After fetch (5515 ms) (0/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-EndFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
TaskOrchestrationDispatcher 继续 fetch 下一个 work item:
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting fetch with timeout of 00:00:30 (1/100 max)] [info : ] [eventType : WorkItemDispatcherDispatch-StartFetch] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
TaskOrchestrationDispatcher 开始处理 fetch 到的 work item:
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Starting to process workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-Begin] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
TaskOrchestrationDispatcher 处理 TaskCompleted event
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing new event with Id -1 and type TaskCompleted] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessEvent] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
Orchestration第三轮
EventId : 1, Level : Verbose, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executing user orchestration:
{
"$type": "DurableTask.Core.OrchestrationRuntimeStateDump, DurableTask.Core",
"Events": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3312077Z"
},
{
"$type": "DurableTask.Core.History.ExecutionStartedEvent, DurableTask.Core",
"OrchestrationInstance": null,
"EventType": 0,
"ParentInstance": null,
"Name": null,
"Version": null,
"Input": "[..snipped..]",
"Tags": null,
"Correlation": null,
"ParentTraceContext": null,
"ScheduledStartTime": null,
"Generation": null,
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:50:54.3937812Z"
},
{
"$type": "DurableTask.Core.History.TaskScheduledEvent, DurableTask.Core",
"EventType": 4,
"Name": "DurableTask.Samples.Greetings.GetUserTask",
"Version": "",
"Input": "[..snipped..]",
"ParentTraceContext": null,
"EventId": 0,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorCompletedEvent, DurableTask.Core",
"EventType": 13,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
},
{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 0,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:04.1905617Z"
},
{
"$type": "DurableTask.Core.History.TaskScheduledEvent, DurableTask.Core",
"EventType": 4,
"Name": "DurableTask.Samples.Greetings.SendGreetingTask",
"Version": "",
"Input": "[..snipped..]",
"ParentTraceContext": null,
"EventId": 1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorCompletedEvent, DurableTask.Core",
"EventType": 13,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
},
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
},
{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 1,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:09.7222239Z"
}
],
"NewEvents": [
{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
},
{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 1,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:09.7222239Z"
}
],
"EventCount": 0,
"NewEventsCount": 0
}
[eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin] , EventName : TraceInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
ProcessEvent: EventType=ExecutionStarted
GreetingsOrchestration.RunTask called
ProcessEvent: EventType=TaskScheduled
ProcessEvent: EventType=OrchestratorCompleted
ProcessEvent: EventType=TaskCompleted
ProcessEvent: EventType=TaskScheduled
ProcessEvent: EventType=OrchestratorCompleted
ProcessEvent: EventType=TaskCompleted
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Executed user orchestration. Received 1 orchestrator actions: 2:OrchestrationComplete] [info : ] [eventType : TaskOrchestrationDispatcher-ExecuteUserOrchestration-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Processing orchestrator action of type OrchestrationComplete] [info : ] [eventType : TaskOrchestrationDispatcher-ProcessOrchestratorAction] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
OnProcessWorkItemAsync - Processing orchestrator action of type OrchestrationComplete
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : Instance Id '[InstanceId: 16af7b96-9409-4618-b33a-e7cc3a121a1a, ExecutionId: 410f7f47fdd9444ab3fb3ce6d00e9649]' completed in state Completed with result: "Greeting sent to sky"] [info : ] [eventType : TaskOrchestrationDispatcher-InstanceCompleted] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [executionId : 410f7f47fdd9444ab3fb3ce6d00e9649] [sessionId : ] [message : {{
"$type": "DurableTask.Core.OrchestrationRuntimeStateDump, DurableTask.Core",
"Events": [
{{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:50:59.3312077Z"
}},
{{
"$type": "DurableTask.Core.History.ExecutionStartedEvent, DurableTask.Core",
"OrchestrationInstance": null,
"EventType": 0,
"ParentInstance": null,
"Name": null,
"Version": null,
"Input": "[..snipped..]",
"Tags": null,
"Correlation": null,
"ParentTraceContext": null,
"ScheduledStartTime": null,
"Generation": null,
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:50:54.3937812Z"
}},
{{
"$type": "DurableTask.Core.History.TaskScheduledEvent, DurableTask.Core",
"EventType": 4,
"Name": "DurableTask.Samples.Greetings.GetUserTask",
"Version": "",
"Input": "[..snipped..]",
"ParentTraceContext": null,
"EventId": 0,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
}},
{{
"$type": "DurableTask.Core.History.OrchestratorCompletedEvent, DurableTask.Core",
"EventType": 13,
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:50:59.3468324Z"
}},
{{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
}},
{{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 0,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:04.1905617Z"
}},
{{
"$type": "DurableTask.Core.History.TaskScheduledEvent, DurableTask.Core",
"EventType": 4,
"Name": "DurableTask.Samples.Greetings.SendGreetingTask",
"Version": "",
"Input": "[..snipped..]",
"ParentTraceContext": null,
"EventId": 1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
}},
{{
"$type": "DurableTask.Core.History.OrchestratorCompletedEvent, DurableTask.Core",
"EventType": 13,
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:04.5343127Z"
}},
{{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
}},
{{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 1,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:09.7222239Z"
}},
{{
"$type": "DurableTask.Core.History.ExecutionCompletedEvent, DurableTask.Core",
"EventType": 1,
"OrchestrationStatus": 1,
"Result": "[..snipped..]",
"FailureDetails": null,
"EventId": 2,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
}}
],
"NewEvents": [
{{
"$type": "DurableTask.Core.History.OrchestratorStartedEvent, DurableTask.Core",
"EventType": 12,
"EventId": -1,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
}},
{{
"$type": "DurableTask.Core.History.TaskCompletedEvent, DurableTask.Core",
"EventType": 5,
"TaskScheduledId": 1,
"Result": "[..snipped..]",
"EventId": -1,
"IsPlayed": true,
"Timestamp": "2024-04-28T01:51:09.7222239Z"
}},
{{
"$type": "DurableTask.Core.History.ExecutionCompletedEvent, DurableTask.Core",
"EventType": 1,
"OrchestrationStatus": 1,
"Result": "[..snipped..]",
"FailureDetails": null,
"EventId": 2,
"IsPlayed": false,
"Timestamp": "2024-04-28T01:51:10.0499421Z"
}}
],
"EventCount": 0,
"NewEventsCount": 0
}}] [info : ] [eventType : TaskOrchestrationDispatcher-InstanceCompletionEvents] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : 16af7b96-9409-4618-b33a-e7cc3a121a1a] [message : Deleting session state] [info : ] [eventType : TaskOrchestrationDispatcher-DeletingSessionState] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.0499421Z, ProcessId : 6536, ThreadId : 12032, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
EventId : 3, Level : Informational, Message : , Payload : [source : DurableTask.Core-durabletask.samples] [instanceId : ] [executionId : ] [sessionId : ] [message : TaskOrchestrationDispatcher-e0bc782c9aa84a0a8a256cf7a5d4e06e-0: Finished processing workItem 16af7b96-9409-4618-b33a-e7cc3a121a1a] [info : ] [eventType : WorkItemDispatcherProcess-End] , EventName : InfoInfo, Timestamp : 2024-04-28T01:51:10.3624424Z, ProcessId : 6536, ThreadId : 13976, ActivityId : 4d43f051-56be-45ca-912c-7043390e8cd3
Task done: Completed
Press any key to quit.
1.4 - quickstart事件顺序
| Orchestration Fetch |
Activity Fetch |
执行用户代码 | |
|---|---|---|---|
| Start | Start | ||
| 第一轮 | |||
| StartFetch | StartFetch | ||
| —-» ExecutionStarted | |||
| EndFetch | |||
| StartFetch | |||
| GreetingsOrchestration | |||
| –>. ?? | |||
| EndFetch | |||
| StartFetch | |||
| GetUserTask | |||
| 第二轮 | |||
| —-» TaskCompleted | |||
| EndFetch | |||
| StartFetch | |||
| SendGreetingTask | |||
| 第三轮 | |||
| EndFetch | |||
LockNextTaskOrchestrationWorkItemAsync
LockNextTaskOrchestrationWorkItemAsync 方法被调用的这个日志出现过三次:
********* versioning ************* LockNextTaskOrchestrationWorkItemAsync()
第一次被调用时,event history 列表有:
- OrchestratorStartedEvent (new)
- ExecutionStartedEvent (new)
对应日志:
ProcessEvent: EventType=ExecutionStarted
GreetingsOrchestration.RunTask called
第二次被调用时,event history 列表有:
- OrchestratorStartedEvent
- ExecutionStartedEvent
- TaskScheduledEvent
- OrchestratorCompletedEvent
- OrchestratorStartedEvent (new)
- TaskCompletedEvent (new)
对应日志:
ProcessEvent: EventType=ExecutionStarted
GreetingsOrchestration.RunTask called
ProcessEvent: EventType=TaskScheduled
ProcessEvent: EventType=OrchestratorCompleted
ProcessEvent: EventType=TaskCompleted
第三次被调用时,event history 列表有:
- OrchestratorStartedEvent
- ExecutionStartedEvent
- TaskScheduledEvent
- OrchestratorCompletedEvent
- OrchestratorStartedEvent
- TaskCompletedEvent
- TaskScheduledEvent
- OrchestratorCompletedEvent
- OrchestratorStartedEvent (new)
- TaskCompletedEvent (new)
对应日志:
ProcessEvent: EventType=ExecutionStarted
GreetingsOrchestration.RunTask called
ProcessEvent: EventType=TaskScheduled
ProcessEvent: EventType=OrchestratorCompleted
ProcessEvent: EventType=TaskCompleted
ProcessEvent: EventType=TaskScheduled
ProcessEvent: EventType=OrchestratorCompleted
ProcessEvent: EventType=TaskCompleted
2 - DurableTask 主要流程
2.1 - start instance 流程
入口
instance = taskHubClient.CreateOrchestrationInstanceAsync(typeof(GreetingsOrchestration), instanceId, null).Result;
进入 src\DurableTask.Core\TaskHubClient.cs
async Task<OrchestrationInstance> InternalCreateOrchestrationInstanceWithRaisedEventAsync() {
......
try
{
// Raised events and create orchestration calls use different methods so get handled separately
await this.ServiceClient.CreateTaskOrchestrationAsync(startMessage, dedupeStatuses);
}
}
进入 src\DurableTask.AzureStorage\AzureStorageOrchestrationService.cs
public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
{
ExecutionStartedEvent executionStartedEvent = creationMessage.Event as ExecutionStartedEvent;
......
ControlQueue controlQueue = await this.GetControlQueueAsync(creationMessage.OrchestrationInstance.InstanceId);
//controlQueue 的 name 是类似 sampleshub-control-01
MessageData startMessage = await this.SendTaskOrchestrationMessageInternalAsync(
EmptySourceInstance,
controlQueue,
creationMessage);
}
Task<MessageData> SendTaskOrchestrationMessageInternalAsync(
OrchestrationInstance sourceInstance,
ControlQueue controlQueue,
TaskMessage message)
{
return controlQueue.AddMessageAsync(message, sourceInstance);
}
src\DurableTask.AzureStorage\Messaging\TaskHubQueue.cs
public Task<MessageData> AddMessageAsync(TaskMessage message, OrchestrationInstance sourceInstance)
{
return this.AddMessageAsync(message, sourceInstance, session: null);
}
async Task<MessageData> AddMessageAsync(TaskMessage taskMessage, OrchestrationInstance sourceInstance, SessionBase? session)
{
......
await this.storageQueue.AddMessageAsync(
queueMessage,
GetVisibilityDelay(taskMessage),
session?.TraceActivityId);
}
总结
最后是给 controlQueue 里面发了一个 携带 ExecutionStartedEvent 事件的 Message。
2.2 - Fetch work item 流程
入口
src\DurableTask.Core\TaskOrchestrationDispatcher.cs
protected Task<TaskOrchestrationWorkItem> OnFetchWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
if (this.entityBackendProperties?.UseSeparateQueueForEntityWorkItems == true)
{
// duruale function quickstart 走到这里
// only orchestrations should be served by this dispatcher, so we call
// the method which returns work items for orchestrations only.
Console.WriteLine("OnFetchWorkItemAsync: UseSeparateQueueForEntityWorkItems == true");
Console.WriteLine("OnFetchWorkItemAsync: this.entityOrchestrationService=" + this.entityOrchestrationService?.GetType().FullName);
return this.entityOrchestrationService!.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
else
{
// sample 走到这里
// both entities and orchestrations are served by this dispatcher,
// so we call the method that may return work items for either.
return this.orchestrationService.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
}
AzureStorageOrchestrationService
src\DurableTask.AzureStorage\AzureStorageOrchestrationService.cs
public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
......
return LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: false, cancellationToken);
}
async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(bool entitiesOnly, CancellationToken cancellationToken)
{
// This call will block until the next session is ready
session = await this.orchestrationSessionManager.GetNextSessionAsync(entitiesOnly, linkedCts.Token);
OrchestrationSessionManager
src\DurableTask.AzureStorage\OrchestrationSessionManager.cs
public async Task<OrchestrationSession?> GetNextSessionAsync(bool entitiesOnly, CancellationToken cancellationToken)
{
// bool entitiesOnly,所以这里的 readyForProcessingQueue 是 orchestrationsReadyForProcessingQueue
var readyForProcessingQueue = entitiesOnly? this.entitiesReadyForProcessingQueue : this.orchestrationsReadyForProcessingQueue;
while (!cancellationToken.IsCancellationRequested)
{
// This call will block until:
// 1) a batch of messages has been received for a particular instance and
// 2) the history for that instance has been fetched
// 关键在这里
LinkedListNode<PendingMessageBatch> node = await readyForProcessingQueue.DequeueAsync(cancellationToken);
}
这里的 readyForProcessingQueue (也即是 orchestrationsReadyForProcessingQueue),看看它是怎么 enqueue 的。
orchestrationsReadyForProcessingQueue
orchestrationsReadyForProcessingQueue 定义在这里:
readonly AsyncQueue<LinkedListNode<PendingMessageBatch>> orchestrationsReadyForProcessingQueue = new AsyncQueue<LinkedListNode<PendingMessageBatch>>();
async Task ScheduleOrchestrationStatePrefetch(
LinkedListNode<PendingMessageBatch> node,
Guid traceActivityId,
CancellationToken cancellationToken)
{
PendingMessageBatch batch = node.Value;
AnalyticsEventSource.SetLogicalTraceActivityId(traceActivityId);
try
{
if (batch.OrchestrationState == null)
{
OrchestrationHistory history = await this.trackingStore.GetHistoryEventsAsync(
batch.OrchestrationInstanceId,
batch.OrchestrationExecutionId,
cancellationToken);
batch.OrchestrationState = new OrchestrationRuntimeState(history.Events);
batch.ETag = history.ETag;
batch.LastCheckpointTime = history.LastCheckpointTime;
batch.TrackingStoreContext = history.TrackingStoreContext;
}
if (this.settings.UseSeparateQueueForEntityWorkItems
&& DurableTask.Core.Common.Entities.IsEntityInstance(batch.OrchestrationInstanceId))
{
this.entitiesReadyForProcessingQueue.Enqueue(node);
}
else
{
// 在这里 enqueue
this.orchestrationsReadyForProcessingQueue.Enqueue(node);
}
}
......
这个 ScheduleOrchestrationStatePrefetch 有两处引用:
-
自己调用自己: 如果发生异常,则需要重新调用自身
async Task ScheduleOrchestrationStatePrefetch( LinkedListNode<PendingMessageBatch> node, Guid traceActivityId, CancellationToken cancellationToken) { try { catch (Exception e) { // Sleep briefly to avoid a tight failure loop. // 短暂睡眠,避免出现紧密的故障循环。 await Task.Delay(TimeSpan.FromSeconds(5)); // This is a background operation so failure is not an option. All exceptions must be handled. // To avoid starvation, we need to re-enqueue this async operation instead of retrying in a loop. // 这是一个后台操作,因此不能失败。必须处理所有异常。 // 为避免出现饥饿状态,我们需要重新queue 这个异步操作,而不是循环重试。 await Task.Run(() => this.ScheduleOrchestrationStatePrefetch(node, traceActivityId, cancellationToken)); } -
在AddMessageToPendingOrchestration() 方法中调用
internal void AddMessageToPendingOrchestration( ControlQueue controlQueue, IEnumerable<MessageData> queueMessages, Guid traceActivityId, CancellationToken cancellationToken) { foreach (MessageData data in queueMessages) { ...... // If there is no batch for this instanceID-executionID pair, create one if (targetBatch == null) { targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId); node = this.pendingOrchestrationMessageBatches.AddLast(targetBatch); // Before the batch of messages can be processed, we need to download the latest execution state. // This is done beforehand in the background as a performance optimization. Task.Run(() => this.ScheduleOrchestrationStatePrefetch(node, traceActivityId, cancellationToken)); } } }
而这个 AddMessageToPendingOrchestration() 方法被 DequeueLoop() 方法调用:
async Task DequeueLoop(string partitionId, ControlQueue controlQueue, CancellationToken cancellationToken)
{
......
// Every dequeue operation has a common trace ID so that batches of dequeued messages can be correlated together.
// Both the dequeue traces and the processing traces will share the same "related" trace activity ID.
Guid traceActivityId = AzureStorageOrchestrationService.StartNewLogicalTraceScope(useExisting: false);
// This will block until either new messages arrive or the queue is released.
// 从 command queue 中获取 messages
IReadOnlyList<MessageData> messages = await controlQueue.GetMessagesAsync(cancellationToken);
if (messages.Count > 0)
{
// De-dupe any execution started messages
// 对任何已开始执行的信息进行去磁(De-dupe)
IEnumerable<MessageData> filteredMessages = await this.DedupeExecutionStartedMessagesAsync(
controlQueue,
messages,
traceActivityId,
cancellationToken);
this.AddMessageToPendingOrchestration(controlQueue, filteredMessages, traceActivityId, cancellationToken);
}
......
}
总结
2.2.1 - OrchestrationSessionManager
src\DurableTask.AzureStorage\OrchestrationSessionManager.cs
class OrchestrationSessionManager : IDisposable
{
readonly Dictionary<string, OrchestrationSession> activeOrchestrationSessions = new Dictionary<string, OrchestrationSession>(StringComparer.OrdinalIgnoreCase);
readonly ConcurrentDictionary<string, ControlQueue> ownedControlQueues = new ConcurrentDictionary<string, ControlQueue>();
readonly LinkedList<PendingMessageBatch> pendingOrchestrationMessageBatches = new LinkedList<PendingMessageBatch>();
readonly AsyncQueue<LinkedListNode<PendingMessageBatch>> orchestrationsReadyForProcessingQueue = new AsyncQueue<LinkedListNode<PendingMessageBatch>>();
readonly AsyncQueue<LinkedListNode<PendingMessageBatch>> entitiesReadyForProcessingQueue = new AsyncQueue<LinkedListNode<PendingMessageBatch>>();
}
activeOrchestrationSessions
activeOrchestrationSessions 是一个 Dictionary
readonly Dictionary<string, OrchestrationSession> activeOrchestrationSessions = new Dictionary<string, OrchestrationSession>(StringComparer.OrdinalIgnoreCase);
public bool IsControlQueueProcessingMessages(string partitionId)
{
return this.activeOrchestrationSessions.Values.Where(session => string.Equals(session.ControlQueue.Name, partitionId)).Any();
}
构造函数
public OrchestrationSessionManager(
string queueAccountName,
AzureStorageOrchestrationServiceSettings settings,
AzureStorageOrchestrationServiceStats stats,
ITrackingStore trackingStore)
{
this.storageAccountName = queueAccountName;
this.settings = settings;
this.stats = stats;
this.trackingStore = trackingStore;
this.fetchRuntimeStateQueue = new DispatchQueue(this.settings.MaxStorageOperationConcurrency);
}
ControlQueue 相关的代码
拥有的 ControlQueue,类型为 ConcurrentDictionary,这是要保存多个 ControlQueue:
readonly ConcurrentDictionary<string, ControlQueue> ownedControlQueues = new ConcurrentDictionary<string, ControlQueue>();
key 是 partitionId。
queus 方法返回 ownedControlQueues 的值:
internal IEnumerable<ControlQueue> Queues => this.ownedControlQueues.Values;
AddQueue()
public void AddQueue(string partitionId, ControlQueue controlQueue, CancellationToken cancellationToken)
{
if (this.ownedControlQueues.TryAdd(partitionId, controlQueue))
{
_ = Task.Run(() => this.DequeueLoop(partitionId, controlQueue, cancellationToken));
}
else
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Attempted to add a control queue {controlQueue.Name} multiple times!");
}
}
加入成功后,执行 DequeueLoop() 方法:
async Task DequeueLoop(string partitionId, ControlQueue controlQueue, CancellationToken cancellationToken)
{
this.settings.Logger.PartitionManagerInfo(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Started listening for messages on queue {controlQueue.Name}.");
while (!controlQueue.IsReleased)
{
try
{
// Every dequeue operation has a common trace ID so that batches of dequeued messages can be correlated together.
// Both the dequeue traces and the processing traces will share the same "related" trace activity ID.
Guid traceActivityId = AzureStorageOrchestrationService.StartNewLogicalTraceScope(useExisting: false);
// This will block until either new messages arrive or the queue is released.
IReadOnlyList<MessageData> messages = await controlQueue.GetMessagesAsync(cancellationToken);
if (messages.Count > 0)
{
// De-dupe any execution started messages
IEnumerable<MessageData> filteredMessages = await this.DedupeExecutionStartedMessagesAsync(
controlQueue,
messages,
traceActivityId,
cancellationToken);
this.AddMessageToPendingOrchestration(controlQueue, filteredMessages, traceActivityId, cancellationToken);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// shutting down
break;
}
catch (Exception e)
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Exception in the dequeue loop for control queue {controlQueue.Name}. Exception: {e}");
Thread.Sleep(TimeSpan.FromSeconds(1));
}
}
this.settings.Logger.PartitionManagerInfo(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId,
$"Stopped listening for messages on queue {controlQueue.Name}.");
}
RemoveQueue
public void RemoveQueue(string partitionId, CloseReason? reason, string caller)
{
if (this.ownedControlQueues.TryRemove(partitionId, out ControlQueue controlQueue))
{
controlQueue.Release(reason, caller);
}
}
ReleaseQueue()
public void ReleaseQueue(string partitionId, CloseReason? reason, string caller)
{
if (this.ownedControlQueues.TryGetValue(partitionId, out ControlQueue controlQueue))
{
controlQueue.Release(reason, caller);
}
}
IsControlQueueReceivingMessages()
public bool IsControlQueueReceivingMessages(string partitionId)
{
return this.ownedControlQueues.TryGetValue(partitionId, out ControlQueue controlQueue)
&& !controlQueue.IsReleased;
}
IsControlQueueProcessingMessages()
public bool IsControlQueueProcessingMessages(string partitionId)
{
return this.activeOrchestrationSessions.Values.Where(session => string.Equals(session.ControlQueue.Name, partitionId)).Any();
}
需要仔细理解的方法
GetNextSessionAsync()
public async Task<OrchestrationSession?> GetNextSessionAsync(bool entitiesOnly, CancellationToken cancellationToken)
{
var readyForProcessingQueue = entitiesOnly? this.entitiesReadyForProcessingQueue : this.orchestrationsReadyForProcessingQueue;
while (!cancellationToken.IsCancellationRequested)
{
// This call will block until:
// 1) a batch of messages has been received for a particular instance and
// 2) the history for that instance has been fetched
LinkedListNode<PendingMessageBatch> node = await readyForProcessingQueue.DequeueAsync(cancellationToken);
lock (this.messageAndSessionLock)
{
PendingMessageBatch nextBatch = node.Value;
this.pendingOrchestrationMessageBatches.Remove(node);
if (!this.activeOrchestrationSessions.TryGetValue(nextBatch.OrchestrationInstanceId, out var existingSession))
{
OrchestrationInstance instance = nextBatch.OrchestrationState?.OrchestrationInstance ??
new OrchestrationInstance
{
InstanceId = nextBatch.OrchestrationInstanceId,
ExecutionId = nextBatch.OrchestrationExecutionId,
};
// 如果 activeOrchestrationSessions 中不存在,则在 AzureStorage 中 trace
Guid traceActivityId = AzureStorageOrchestrationService.StartNewLogicalTraceScope(useExisting: true);
OrchestrationSession session = new OrchestrationSession(
this.settings,
this.storageAccountName,
instance,
nextBatch.ControlQueue,
nextBatch.Messages,
nextBatch.OrchestrationState,
nextBatch.ETag,
nextBatch.LastCheckpointTime,
nextBatch.TrackingStoreContext,
this.settings.ExtendedSessionIdleTimeout,
traceActivityId);
// 然后创建一个 session 保存到 activeOrchestrationSessions 中
// 这样就保证了 activeOrchestrationSessions 中的 session 都是被 AzureStorage trace 的
this.activeOrchestrationSessions.Add(instance.InstanceId, session);
return session;
}
else if (nextBatch.OrchestrationExecutionId == existingSession.Instance?.ExecutionId)
{
// there is already an active session with the same execution id.
// The session might be waiting for more messages. If it is, signal them.
// 已经有一个具有相同执行 ID 的活动会话
// 该会话可能正在等待更多信息。 如果是,则发出信号。
existingSession.AddOrReplaceMessages(node.Value.Messages);
}
else
{
// A message arrived for a different generation of an existing orchestration instance.
// Put it back into the ready queue so that it can be processed once the current generation
// is done executing.
// 为现有协调实例的另一代发送消息。
// 将其放回就绪队列,以便在当前一代执行完毕后对其进行处理。
if (readyForProcessingQueue.Count == 0)
{
// To avoid a tight dequeue loop, delay for a bit before putting this node back into the queue.
// This is only necessary when the queue is empty. The main dequeue thread must not be blocked
// by this delay, which is why we use Task.Delay(...).ContinueWith(...) instead of await.
Task.Delay(millisecondsDelay: 200).ContinueWith(_ =>
{
lock (this.messageAndSessionLock)
{
this.pendingOrchestrationMessageBatches.AddLast(node);
readyForProcessingQueue.Enqueue(node);
}
});
}
else
{
this.pendingOrchestrationMessageBatches.AddLast(node);
readyForProcessingQueue.Enqueue(node);
}
}
}
}
return null;
}
TryGetExistingSession()
public bool TryGetExistingSession(string instanceId, out OrchestrationSession session)
{
lock (this.messageAndSessionLock)
{
return this.activeOrchestrationSessions.TryGetValue(instanceId, out session);
}
}
暂时先不深入的方法
AddMessageToPendingOrchestration
Adds history messages to an orchestration for its next replay. “Pending” here is unrelated to the Pending runtimeStatus.
为协调的下一次重放添加历史消息。这里的 “待处理 “与运行时状态 “待处理 “无关。
internal void AddMessageToPendingOrchestration(
ControlQueue controlQueue,
IEnumerable<MessageData> queueMessages,
Guid traceActivityId,
CancellationToken cancellationToken)
{
// Conditions to consider:
// 1. Do we need to create a new orchestration session or does one already exist?
// 2. Do we already have a copy of this message?
// 3. Do we need to add messages to a currently executing orchestration?
lock (this.messageAndSessionLock)
{
var existingSessionMessages = new Dictionary<OrchestrationSession, List<MessageData>>();
foreach (MessageData data in queueMessages)
{
// The instanceID identifies the orchestration across replays and ContinueAsNew generations.
// The executionID identifies a generation of an orchestration instance, doesn't change across replays.
string instanceId = data.TaskMessage.OrchestrationInstance.InstanceId;
string executionId = data.TaskMessage.OrchestrationInstance.ExecutionId;
// If the target orchestration is already in memory, we can potentially add the message to the session directly
// rather than adding it to the pending list. This behavior applies primarily when extended sessions are enabled.
// We can't do this for ExecutionStarted messages - those must *always* go to the pending list since they are for
// creating entirely new orchestration instances.
if (data.TaskMessage.Event.EventType != EventType.ExecutionStarted &&
this.activeOrchestrationSessions.TryGetValue(instanceId, out OrchestrationSession session))
{
// A null executionId value means that this is a management operation, like RaiseEvent or Terminate, which
// should be delivered to the current session.
if (executionId == null || session.Instance.ExecutionId == executionId)
{
List<MessageData> pendingMessages;
if (!existingSessionMessages.TryGetValue(session, out pendingMessages))
{
pendingMessages = new List<MessageData>();
existingSessionMessages.Add(session, pendingMessages);
}
pendingMessages.Add(data);
continue;
}
// Looks like this message is for another generation of the active orchestration. Let it fall
// into the pending list below. If it's a message for an older generation, it will be eventually
// discarded after we discover that we have no state associated with its execution ID. This is
// most common in scenarios involving durable timers and ContinueAsNew. Otherwise, this message
// will be processed after the current session unloads.
}
PendingMessageBatch? targetBatch = null; // batch for the current instanceID-executionID pair
// Unless the message is an ExecutionStarted event, we attempt to assign the current message to an
// existing batch by walking backwards through the list of batches until we find one with a matching InstanceID.
// This is assumed to be more efficient than walking forward if most messages arrive in the queue in groups.
LinkedListNode<PendingMessageBatch> node = this.pendingOrchestrationMessageBatches.Last;
while (node != null && data.TaskMessage.Event.EventType != EventType.ExecutionStarted)
{
PendingMessageBatch batch = node.Value;
if (batch.OrchestrationInstanceId == instanceId)
{
if (executionId == null || batch.OrchestrationExecutionId == executionId)
{
targetBatch = batch;
break;
}
else if (batch.OrchestrationExecutionId == null)
{
targetBatch = batch;
batch.OrchestrationExecutionId = executionId;
break;
}
}
node = node.Previous;
}
// If there is no batch for this instanceID-executionID pair, create one
if (targetBatch == null)
{
targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId);
node = this.pendingOrchestrationMessageBatches.AddLast(targetBatch);
// Before the batch of messages can be processed, we need to download the latest execution state.
// This is done beforehand in the background as a performance optimization.
Task.Run(() => this.ScheduleOrchestrationStatePrefetch(node, traceActivityId, cancellationToken));
}
// New messages are added; duplicate messages are replaced
targetBatch.Messages.AddOrReplace(data);
}
// The session might be waiting for more messages. If it is, signal them.
foreach (var pair in existingSessionMessages)
{
OrchestrationSession session = pair.Key;
List<MessageData> newMessages = pair.Value;
// New messages are added; duplicate messages are replaced
session.AddOrReplaceMessages(newMessages);
}
}
}
其他方法
DrainAsync()
The drain process occurs when the lease is stolen or the worker is shutting down, prompting the worker to cease listening for new messages and to finish processing all the existing information in memory.
当租约被盗或 Worker 即将关闭时,就会发生泄密过程,促使 Worker 停止监听新信息,并完成对内存中所有现有信息的处理。
DedupeExecutionStartedMessagesAsync()
This method enumerates all the provided queue messages looking for ExecutionStarted messages. If any are found, it queries table storage to ensure that each message has a matching record in the Instances table. If not, this method will either asynchronously discard the message or abandon it for reprocessing in case the Instances table record hasn’t been written yet (this happens asynchronously and there is no guaranteed order). Meanwhile, this method will return the list of filtered messages.
此方法会枚举所有提供的队列消息,查找 ExecutionStarted 消息。如果找到,它将查询表存储,确保每条消息在实例表中都有匹配记录。如果没有,该方法会异步丢弃消息,或者在实例表记录尚未写入的情况下放弃消息以便重新处理(这是异步发生的,没有顺序保证)。同时,该方法将返回已过滤的消息列表。
DedupeExecutionStartedMessagesAsync
2.2.2 - AzureStorageOrchestrationService
src\DurableTask.AzureStorage\AzureStorageOrchestrationService.cs
public sealed class AzureStorageOrchestrationService :
IOrchestrationService,
IOrchestrationServiceClient,
IDisposable,
IOrchestrationServiceQueryClient,
IOrchestrationServicePurgeClient,
IEntityOrchestrationService
{
构造函数
public AzureStorageOrchestrationService(AzureStorageOrchestrationServiceSettings settings, IOrchestrationServiceInstanceStore customInstanceStore)
{
if (settings == null)
{
throw new ArgumentNullException(nameof(settings));
}
ValidateSettings(settings);
this.settings = settings;
this.azureStorageClient = new AzureStorageClient(settings);
this.stats = this.azureStorageClient.Stats;
// “sampleshub-largemessages”
string compressedMessageBlobContainerName = $"{settings.TaskHubName.ToLowerInvariant()}-largemessages";
this.messageManager = new MessageManager(this.settings, this.azureStorageClient, compressedMessageBlobContainerName);
// 初始化 allControlQueues
this.allControlQueues = new ConcurrentDictionary<string, ControlQueue>();
for (int index = 0; index < this.settings.PartitionCount; index++)
{
// 分区初始化controlQueue,然后加入 allControlQueues
// controlQueueName = “sampleshub-control-00”
var controlQueueName = GetControlQueueName(this.settings.TaskHubName, index);
ControlQueue controlQueue = new ControlQueue(this.azureStorageClient, controlQueueName, this.messageManager);
this.allControlQueues.TryAdd(controlQueue.Name, controlQueue);
}
// workItemQueueName = “sampleshub-workitems”
var workItemQueueName = GetWorkItemQueueName(this.settings.TaskHubName);
this.workItemQueue = new WorkItemQueue(this.azureStorageClient, workItemQueueName, this.messageManager);
if (customInstanceStore == null)
{
// 默认用 AzureTableTrackingStore
this.trackingStore = new AzureTableTrackingStore(this.azureStorageClient, this.messageManager);
}
else
{
this.trackingStore = new InstanceStoreBackedTrackingStore(customInstanceStore);
}
this.activeActivitySessions = new ConcurrentDictionary<string, ActivitySession>(StringComparer.OrdinalIgnoreCase);
this.hubCreationLock = new object();
this.taskHubCreator = new ResettableLazy<Task>(
this.GetTaskHubCreatorTask,
LazyThreadSafetyMode.ExecutionAndPublication);
this.leaseManager = GetBlobLeaseManager(
this.azureStorageClient,
"default");
this.orchestrationSessionManager = new OrchestrationSessionManager(
this.azureStorageClient.QueueAccountName,
this.settings,
this.stats,
this.trackingStore);
if (this.settings.UseTablePartitionManagement && this.settings.UseLegacyPartitionManagement)
{
throw new ArgumentException("Cannot use both TablePartitionManagement and LegacyPartitionManagement. For improved reliability, consider using the TablePartitionManager.");
}
else if (this.settings.UseTablePartitionManagement)
{
this.partitionManager = new TablePartitionManager(
this,
this.azureStorageClient);
}
else if (this.settings.UseLegacyPartitionManagement)
{
this.partitionManager = new LegacyPartitionManager(
this,
this.azureStorageClient);
}
else
{
this.partitionManager = new SafePartitionManager(
this,
this.azureStorageClient,
this.orchestrationSessionManager);
}
this.appLeaseManager = new AppLeaseManager(
this.azureStorageClient,
this.partitionManager,
this.settings.TaskHubName.ToLowerInvariant() + "-applease",
this.settings.TaskHubName.ToLowerInvariant() + "-appleaseinfo",
this.settings.AppLeaseOptions);
}
GetControlQueuesAsync()
internal static async Task<Queue[]> GetControlQueuesAsync(
AzureStorageClient azureStorageClient,
int defaultPartitionCount)
{
if (azureStorageClient == null)
{
throw new ArgumentNullException(nameof(azureStorageClient));
}
string taskHub = azureStorageClient.Settings.TaskHubName;
// Need to check for leases in Azure Table Storage. Scale Controller calls into this method.
int partitionCount;
Table partitionTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.PartitionTableName);
// Check if table partition manager is used. If so, get partition count from table.
// Else, get the partition count from the blobs.
if (await partitionTable.ExistsAsync())
{
TableEntitiesResponseInfo<DynamicTableEntity> result = await partitionTable.ExecuteQueryAsync(new TableQuery<DynamicTableEntity>());
partitionCount = result.ReturnedEntities.Count;
}
else
{
BlobLeaseManager inactiveLeaseManager = GetBlobLeaseManager(azureStorageClient, "inactive");
TaskHubInfo hubInfo = await inactiveLeaseManager.GetOrCreateTaskHubInfoAsync(
GetTaskHubInfo(taskHub, defaultPartitionCount),
checkIfStale: false);
partitionCount = hubInfo.PartitionCount;
};
var controlQueues = new Queue[partitionCount];
for (int i = 0; i < partitionCount; i++)
{
controlQueues[i] = azureStorageClient.GetQueueReference(GetControlQueueName(taskHub, i));
}
return controlQueues;
}
GetWorkItemQueue()
internal static Queue GetWorkItemQueue(AzureStorageClient azureStorageClient)
{
string queueName = GetWorkItemQueueName(azureStorageClient.Settings.TaskHubName);
return azureStorageClient.GetQueueReference(queueName);
}
LockNextTaskOrchestrationWorkItemAsync()
public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
TimeSpan receiveTimeout,
CancellationToken cancellationToken)
{
return LockNextTaskOrchestrationWorkItemAsync(entitiesOnly: false, cancellationToken);
}
LockNextTaskOrchestrationWorkItemAsync()
async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(bool entitiesOnly, CancellationToken cancellationToken)
{
Guid traceActivityId = StartNewLogicalTraceScope(useExisting: true);
await this.EnsureTaskHubAsync();
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, this.shutdownSource.Token))
{
OrchestrationSession session = null;
TaskOrchestrationWorkItem orchestrationWorkItem = null;
try
{
// This call will block until the next session is ready
// 获得下一个 session,关键在这里
session = await this.orchestrationSessionManager.GetNextSessionAsync(entitiesOnly, linkedCts.Token);
if (session == null)
{
return null;
}
// Make sure we still own the partition. If not, abandon the session.
if (session.ControlQueue.IsReleased)
{
await this.AbandonAndReleaseSessionAsync(session);
return null;
}
session.StartNewLogicalTraceScope();
List<MessageData> outOfOrderMessages = null;
foreach (MessageData message in session.CurrentMessageBatch)
{
if (session.IsOutOfOrderMessage(message))
{
if (outOfOrderMessages == null)
{
outOfOrderMessages = new List<MessageData>();
}
// This can happen if a lease change occurs and a new node receives a message for an
// orchestration that has not yet checkpointed its history. We abandon such messages
// so that they can be reprocessed after the history checkpoint has completed.
this.settings.Logger.ReceivedOutOfOrderMessage(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session.Instance.InstanceId,
session.Instance.ExecutionId,
session.ControlQueue.Name,
message.TaskMessage.Event.EventType.ToString(),
Utils.GetTaskEventId(message.TaskMessage.Event),
message.OriginalQueueMessage.Id,
message.Episode.GetValueOrDefault(-1),
session.LastCheckpointTime);
outOfOrderMessages.Add(message);
}
else
{
session.TraceProcessingMessage(message, isExtendedSession: false);
}
}
if (outOfOrderMessages?.Count > 0)
{
// This will also remove the messages from the current batch.
await this.AbandonMessagesAsync(session, outOfOrderMessages);
}
if (session.CurrentMessageBatch.Count == 0)
{
// All messages were removed. Release the work item.
await this.AbandonAndReleaseSessionAsync(session);
return null;
}
// Create or restore Correlation TraceContext
TraceContextBase currentRequestTraceContext = null;
CorrelationTraceClient.Propagate(
() =>
{
var isReplaying = session.RuntimeState.ExecutionStartedEvent?.IsPlayed ?? false;
TraceContextBase parentTraceContext = GetParentTraceContext(session);
currentRequestTraceContext = GetRequestTraceContext(isReplaying, parentTraceContext);
});
// 创建 orchestrationWorkItem
orchestrationWorkItem = new TaskOrchestrationWorkItem
{
InstanceId = session.Instance.InstanceId,
LockedUntilUtc = session.CurrentMessageBatch.Min(msg => msg.OriginalQueueMessage.NextVisibleTime.Value.UtcDateTime),
NewMessages = session.CurrentMessageBatch.Select(m => m.TaskMessage).ToList(),
OrchestrationRuntimeState = session.RuntimeState,
Session = this.settings.ExtendedSessionsEnabled ? session : null,
TraceContext = currentRequestTraceContext,
};
if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, out string warningMessage))
{
// If all messages belong to the same execution ID, then all of them need to be discarded.
// However, it's also possible to have messages for *any* execution ID batched together with messages
// to a *specific* (non-executable) execution ID. Those messages should *not* be discarded since
// they might be consumable by another orchestration with the same instance id but different execution ID.
var messagesToDiscard = new List<MessageData>();
var messagesToAbandon = new List<MessageData>();
foreach (MessageData msg in session.CurrentMessageBatch)
{
if (msg.TaskMessage.OrchestrationInstance.ExecutionId == session.Instance.ExecutionId)
{
messagesToDiscard.Add(msg);
}
else
{
messagesToAbandon.Add(msg);
}
}
// If no messages have a matching execution ID, then delete all of them. This means all the
// messages are external (external events, termination, etc.) and were sent to an instance that
// doesn't exist or is no longer in a running state.
if (messagesToDiscard.Count == 0)
{
messagesToDiscard.AddRange(messagesToAbandon);
messagesToAbandon.Clear();
}
// Add all abandoned messages to the deferred list. These messages will not be deleted right now.
// If they can be matched with another orchestration, then great. Otherwise they will be deleted
// the next time they are picked up.
messagesToAbandon.ForEach(session.DeferMessage);
var eventListBuilder = new StringBuilder(orchestrationWorkItem.NewMessages.Count * 40);
foreach (MessageData msg in messagesToDiscard)
{
eventListBuilder.Append(msg.TaskMessage.Event.EventType.ToString()).Append(',');
}
this.settings.Logger.DiscardingWorkItem(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session.Instance.InstanceId,
session.Instance.ExecutionId,
orchestrationWorkItem.NewMessages.Count,
session.RuntimeState.Events.Count,
eventListBuilder.ToString(0, eventListBuilder.Length - 1) /* remove trailing comma */,
warningMessage);
// The instance has already completed or never existed. Delete this message batch.
await this.DeleteMessageBatchAsync(session, messagesToDiscard);
await this.ReleaseTaskOrchestrationWorkItemAsync(orchestrationWorkItem);
return null;
}
System.Console.WriteLine("********* versioning ************* LockNextTaskOrchestrationWorkItemAsync()");
//Console.WriteLine("********* versioning *************: orchestrationWorkItem: name=" + orchestrationWorkItem.OrchestrationRuntimeState!.Name + ", version=" + orchestrationWorkItem.OrchestrationRuntimeState!.Version);
return orchestrationWorkItem;
}
catch (OperationCanceledException)
{
if (session != null)
{
// host is shutting down - release any queued messages
await this.AbandonAndReleaseSessionAsync(session);
}
return null;
}
catch (Exception e)
{
// print out the exception
Console.WriteLine("********* versioning ************* LockNextTaskOrchestrationWorkItemAsync() exception: " + e.ToString());
this.settings.Logger.OrchestrationProcessingFailure(
this.azureStorageClient.QueueAccountName,
this.settings.TaskHubName,
session?.Instance.InstanceId ?? string.Empty,
session?.Instance.ExecutionId ?? string.Empty,
e.ToString());
if (orchestrationWorkItem != null)
{
// The work-item needs to be released so that it can be retried later.
await this.ReleaseTaskOrchestrationWorkItemAsync(orchestrationWorkItem);
}
throw;
}
}
}
3 - DurableTask Core源码学习
3.1 - 核心编程模型
3.1.1 - Orchestration
3.1.1.1 - OrchestrationInstance
src\DurableTask.Core\OrchestrationInstance.cs
OrchestrationInstance 中定义了几个属性:
- InstanceId
- ExecutionId()
[DataContract]
public class OrchestrationInstance : IExtensibleDataObject
{
/// <summary>
/// The instance id, assigned as unique to the orchestration
/// </summary>
[DataMember]
public string InstanceId { get; set; }
/// <summary>
/// The execution id, unique to the execution of this instance
/// </summary>
[DataMember]
public string ExecutionId { get; set; }
为了支持 versioning,准备修改 OrchestrationInstance,增加一个 InstanceVersion 字段,类型为 string
/// <summary>
/// The version of this orchestration instance
/// </summary>
[DataMember]
public string InstanceVersion { get; set; }
3.1.1.2 - OrchestrationContext
src\DurableTask.Core\OrchestrationContext.cs
Context for an orchestration containing the instance, replay status, orchestration methods and proxy methods
包含实例、重放状态、协调方法和代理方法的协调上下文
类定义
OrchestrationContext 是一个 抽象类:
public abstract class OrchestrationContext
{
}
实现类有 TaskOrchestrationContext。
OrchestrationContext 定义有以下 (public virtual) 方法:
- CreateClient()
- CreateClientV2()
- CreateRetryableClient()
- ScheduleWithRetry()
- ScheduleTask()
- CreateTimer()
- CreateSubOrchestrationInstance()
- CreateSubOrchestrationInstanceWithRetry()
- SendEvent()
- ContinueAsNew()
ScheduleTask()
/// <param name="version">Name of the orchestration as specified by the ObjectCreator</param>
public abstract Task<TResult> ScheduleTask<TResult>(string name, string version, params object[] parameters);
这里是有 version 概念的。
ScheduleWithRetry()
public virtual Task<T> ScheduleWithRetry<T>(Type taskActivityType, RetryOptions retryOptions,
params object[] parameters)
{
return ScheduleWithRetry<T>(NameVersionHelper.GetDefaultName(taskActivityType),
NameVersionHelper.GetDefaultVersion(taskActivityType),
retryOptions, parameters);
}
NameVersionHelper.GetDefaultVersion(taskActivityType) 目前没有实现,只是写死了返回空字符串:
public static string GetDefaultVersion(object obj)
{
return string.Empty;
}
之后调用带 version 参数的重载方法:
public virtual Task<T> ScheduleWithRetry<T>(string name, string version, RetryOptions retryOptions,
params object[] parameters)
{
Task<T> RetryCall() => ScheduleTask<T>(name, version, parameters);
var retryInterceptor = new RetryInterceptor<T>(this, retryOptions, RetryCall);
return retryInterceptor.Invoke();
}
还是调用到 ScheduleTask() 方法上了。
CreateTimer()
public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state);
public abstract Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken);
CreateSubOrchestrationInstance()
public virtual Task<T> CreateSubOrchestrationInstance<T>(Type orchestrationType, object input)
{
return CreateSubOrchestrationInstance<T>(NameVersionHelper.GetDefaultName(orchestrationType),
NameVersionHelper.GetDefaultVersion(orchestrationType), input);
}
public virtual Task<T> CreateSubOrchestrationInstance<T>(Type orchestrationType, string instanceId, object input)
{
return CreateSubOrchestrationInstance<T>(NameVersionHelper.GetDefaultName(orchestrationType),
NameVersionHelper.GetDefaultVersion(orchestrationType), instanceId, input);
}
public abstract Task<T> CreateSubOrchestrationInstance<T>(string name, string version, object input);
这个方法也是定义有 version 参数的,只是依然是没有被使用。
SendEvent()
public abstract void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData);
在 OrchestrationInstance 增加了 version 参数之后,这个方法也有 version 的概念了。
ContinueAsNew()
public abstract void ContinueAsNew(object input);
没有 version 的概念,最好在实现中重用当前 instance 的 version(如果有指定)。
CreateClient
Create a proxy client class to schedule remote TaskActivities via a strongly typed interface.
创建一个代理客户端类,通过强类型接口调度远程 TaskActivities。
public virtual T CreateClient<T>() where T : class
{
return CreateClient<T>(false);
}
public virtual T CreateClient<T>(bool useFullyQualifiedMethodNames) where T : class
{
return CreateClient<T>(() => new ScheduleProxy(this, useFullyQualifiedMethodNames));
}
private static T CreateClient<T>(Func<IInterceptor> createScheduleProxy) where T : class
{
if (!typeof(T).IsInterface && !typeof(T).IsClass)
{
throw new InvalidOperationException($"{nameof(T)} must be an interface or class.");
}
IInterceptor scheduleProxy = createScheduleProxy();
if (typeof(T).IsClass)
{
if (typeof(T).IsSealed)
{
throw new InvalidOperationException("Class cannot be sealed.");
}
return ProxyGenerator.CreateClassProxy<T>(scheduleProxy);
}
return ProxyGenerator.CreateInterfaceProxyWithoutTarget<T>(scheduleProxy);
}
这里没有 version 的概念。
3.1.1.3 - OrchestrationState
src\DurableTask.Core\OrchestrationState.cs
OrchestrationState 中定义了几个属性:
- CompletedTime
- CompressedSize
- CreatedTime
- Input
- LastUpdatedTime
- Name
- OrchestrationInstance: 包含 InstanceId 和 ExecutionId
- Output
- ParentInstance
- Size
- Status
- Tags
- Version: string 格式,看能否复用。
- Generation
- ScheduledStartTime
- FailureDetails
3.1.2 - Activity
3.1.2.1 - TaskActivity
src\DurableTask.Core\TaskActivity.cs
TaskActivity 中定义了三个方法:
- Run()
- RunAsync()
Run() 方法
public abstract string Run(TaskContext context, string input);
blocked for AsyncTaskActivity:
/// <summary>
/// Synchronous execute method, blocked for AsyncTaskActivity
/// </summary>
/// <returns>string.Empty</returns>
public override string Run(TaskContext context, string input)
{
// will never run
return string.Empty;
}
RunAsync() 方法
public virtual Task<string> RunAsync(TaskContext context, string input)
{
return Task.FromResult(Run(context, input));
}
会被覆盖为:
public override async Task<string> RunAsync(TaskContext context, string input)
{
TInput parameter = default(TInput);
var jArray = Utils.ConvertToJArray(input);
int parameterCount = jArray.Count;
if (parameterCount > 1)
{
throw new TaskFailureException(
"TaskActivity implementation cannot be invoked due to more than expected input parameters. Signature mismatch.");
}
if (parameterCount == 1)
{
JToken jToken = jArray[0];
if (jToken is JValue jValue)
{
parameter = jValue.ToObject<TInput>();
}
else
{
string serializedValue = jToken.ToString();
parameter = DataConverter.Deserialize<TInput>(serializedValue);
}
}
TResult result;
try
{
result = await ExecuteAsync(context, parameter);
}
catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
string details = null;
FailureDetails failureDetails = null;
if (context != null && context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
{
details = Utils.SerializeCause(e, DataConverter);
}
else
{
failureDetails = new FailureDetails(e);
}
throw new TaskFailureException(e.Message, e, details)
.WithFailureDetails(failureDetails);
}
string serializedResult = DataConverter.Serialize(result);
return serializedResult;
}
}
ExecuteAsync() 是一个abstract 方法:
protected abstract Task<TResult> ExecuteAsync(TaskContext context, TInput input);
GetStatus() 方法
public abstract string GetStatus();
3.1.2.2 - TaskContext
src\DurableTask.Core\TaskContext.cs
TaskActivity 中定义了以下属性
- OrchestrationInstance: 包含 InstanceId 和 InstanceId
- ErrorPropagationMode
3.1.3 - OrchestrationExecution
3.1.3.1 - OrchestrationExecutionContext
src\DurableTask.Core\OrchestrationExecutionContext.cs
Context associated with the orchestration being executed.
与正在执行的协调相关的上下文。
[DataContract]
public class OrchestrationExecutionContext
{
}
OrchestrationTags()
这个类就定义了一个 OrchestrationTags 方法:
/// <summary>
/// Gets the orchestration tags
/// </summary>
[DataMember]
public IDictionary<string, string> OrchestrationTags { get; internal set; }
3.1.3.2 - OrchestrationExecutionCursor
src\DurableTask.Core\OrchestrationExecutionCursor.cs
Context associated with the orchestration being executed.
与正在执行的协调相关的上下文。
类定义:
internal class OrchestrationExecutionCursor
{
}
构造函数:
public OrchestrationExecutionCursor(
OrchestrationRuntimeState state,
TaskOrchestration orchestration,
TaskOrchestrationExecutor executor,
IEnumerable<OrchestratorAction> latestDecisions)
{
RuntimeState = state;
TaskOrchestration = orchestration;
OrchestrationExecutor = executor;
LatestDecisions = latestDecisions;
}
get/set 方法:
public OrchestrationRuntimeState RuntimeState { get; }
public TaskOrchestration TaskOrchestration { get; }
public TaskOrchestrationExecutor OrchestrationExecutor { get; }
public IEnumerable<OrchestratorAction> LatestDecisions { get; set; }
就是一个值对象。
Cursor 游标体现在哪里?
3.1.4 - TaskOrchestration
3.1.4.1 - TaskOrchestration
src\DurableTask.Core\TaskOrchestration.cs
TaskOrchestration.cs 中定义了三个方法:
- Execute()
- RaiseEvent()
- GetStatus()
Execute() 方法
public abstract Task<string> Execute(OrchestrationContext context, string input);
方法实现为:
public override async Task<string> Execute(OrchestrationContext context, string input)
{
var parameter = DataConverter.Deserialize<TInput>(input);
TResult result;
try
{
result = await RunTask(context, parameter);
}
catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
string details = null;
FailureDetails failureDetails = null;
if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
{
details = Utils.SerializeCause(e, DataConverter);
}
else
{
failureDetails = new FailureDetails(e);
}
throw new OrchestrationFailureException(e.Message, details)
{
FailureDetails = failureDetails,
};
}
return DataConverter.Serialize(result);
}
RunTask() 方法是个抽象方法。
public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);
默认的 DataConverter 是 json:
public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
{
/// <summary>
/// Creates a new TaskOrchestration with the default DataConverter
/// </summary>
protected TaskOrchestration()
{
DataConverter = JsonDataConverter.Default;
}
/// <summary>
/// The DataConverter to use for input and output serialization/deserialization
/// </summary>
public DataConverter DataConverter { get; protected set; }
RaiseEvent() 方法
public abstract void RaiseEvent(OrchestrationContext context, string name, string input);
方法实现为:
public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
var parameter = DataConverter.Deserialize<TEvent>(input);
OnEvent(context, name, parameter);
}
OnEvent() 是一个空实现。
public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
{
// do nothing
}
GetStatus() 方法
public abstract string GetStatus();
实现
在这个项目中除了 sample 和 test 外没有实现类。
在 samples\Correlation.Samples\HelloOrchestrator.cs 中有一个最简单的实现:
[KnownType(typeof(Hello))]
internal class HelloOrchestrator : TaskOrchestration<string, string>
{
public override async Task<string> RunTask(OrchestrationContext context, string input)
{
// await contextBase.ScheduleTask<string>(typeof(Hello), "world");
// if you pass an empty string it throws an error
return await context.ScheduleTask<string>(typeof(Hello), "world");
}
}
internal class Hello : TaskActivity<string, string>
{
protected override string Execute(TaskContext context, string input)
{
if (string.IsNullOrEmpty(input))
{
throw new ArgumentNullException(nameof(input));
}
Console.WriteLine($"Activity: Hello {input}");
return $"Hello, {input}!";
}
}
备注:这个实现和 Dapr workflow java sdk 中的定义最贴近,也最适合用来增加一个 getVersion() 方法来获取当前 worker 的版本,但是为什么 quickstart 中用的是静态方法?
TBD:请教一下 Chris。
3.1.4.2 - TaskOrchestrationContext
src\DurableTask.Core\TaskOrchestrationContext.cs
类定义
TaskOrchestrationContext 继承自 OrchestrationContext:
internal class TaskOrchestrationContext : OrchestrationContext
{
}
Execute() 方法
public abstract Task<string> Execute(OrchestrationContext context, string input);
方法实现为:
public override async Task<string> Execute(OrchestrationContext context, string input)
{
var parameter = DataConverter.Deserialize<TInput>(input);
TResult result;
try
{
result = await RunTask(context, parameter);
}
catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
string details = null;
FailureDetails failureDetails = null;
if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
{
details = Utils.SerializeCause(e, DataConverter);
}
else
{
failureDetails = new FailureDetails(e);
}
throw new OrchestrationFailureException(e.Message, details)
{
FailureDetails = failureDetails,
};
}
return DataConverter.Serialize(result);
}
RunTask() 方法是个抽象方法。
public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);
默认的 DataConverter 是 json:
public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
{
/// <summary>
/// Creates a new TaskOrchestration with the default DataConverter
/// </summary>
protected TaskOrchestration()
{
DataConverter = JsonDataConverter.Default;
}
/// <summary>
/// The DataConverter to use for input and output serialization/deserialization
/// </summary>
public DataConverter DataConverter { get; protected set; }
RaiseEvent() 方法
public abstract void RaiseEvent(OrchestrationContext context, string name, string input);
方法实现为:
public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
var parameter = DataConverter.Deserialize<TEvent>(input);
OnEvent(context, name, parameter);
}
OnEvent() 是一个空实现。
public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
{
// do nothing
}
GetStatus() 方法
public abstract string GetStatus();
实现
在这个项目中除了 sample 和 test 外没有实现类。
在 samples\Correlation.Samples\HelloOrchestrator.cs 中有一个最简单的实现:
[KnownType(typeof(Hello))]
internal class HelloOrchestrator : TaskOrchestration<string, string>
{
public override async Task<string> RunTask(OrchestrationContext context, string input)
{
// await contextBase.ScheduleTask<string>(typeof(Hello), "world");
// if you pass an empty string it throws an error
return await context.ScheduleTask<string>(typeof(Hello), "world");
}
}
internal class Hello : TaskActivity<string, string>
{
protected override string Execute(TaskContext context, string input)
{
if (string.IsNullOrEmpty(input))
{
throw new ArgumentNullException(nameof(input));
}
Console.WriteLine($"Activity: Hello {input}");
return $"Hello, {input}!";
}
}
备注:这个实现和 Dapr workflow java sdk 中的定义最贴近,也最适合用来增加一个 getVersion() 方法来获取当前 worker 的版本,但是为什么 quickstart 中用的是静态方法?
TBD:请教一下 Chris。
3.1.5 - OrchestrationState
3.1.5.1 - OrchestrationState
src\DurableTask.Core\OrchestrationState.cs
Represents the state of an orchestration
类定义
OrchestrationState 是一个 DataContract :
[DataContract]
public class OrchestrationState : IExtensibleDataObject
{
}
定义有如下 DataMember:
| name | 类型 | 描述 |
|---|---|---|
| Name | string | 协调的名称 |
| Input | string | 协调的序列化输入 |
| Output | string | 协调的序列化输出 |
| OrchestrationInstance | OrchestrationInstance | 该状态代表的协调实例 |
| OrchestrationStatus | OrchestrationStatus | 当前协调状态 |
| Status | string | 字符串格式的当前协调状态 |
| ParentInstance | ParentInstance | 父实例,如果这个协调有 |
| Version | string | 协调版本 |
| Tags | IDictionary<string, string> | 与该协调相关的标记和字符串值字典 |
| Generation | int | 协调的代。重复使用的 instanceIds 将递增该值。 |
| CreatedTime | DateTime | 协调的创建时间 |
| ScheduledStartTime | DateTime | 开始协调的时间 |
| CompletedTime | DateTime | 协调完成时间 |
| LastUpdatedTime | DateTime | 协调的最后更新时间 |
| Size | long | 原始(未压缩)序列化运行时状态的大小 |
| CompressedSize | long | 压缩序列化运行时状态的大小 |
| FailureDetails | FailureDetails | 获取或设置与协调相关的故障详细信息。 |
OrchestrationState有 Version 字段定义,另外 OrchestrationState 的 OrchestrationInstance 字段也带有 version。
3.1.5.2 - OrchestrationStateQuery
src\DurableTask.Core\OrchestrationStateQuery.cs
Query class that can be used to filter results from the Orchestration instance store.
可用于从协调实例存储中过滤结果的查询类。
注意: 实例方法不是线程安全的。
类定义
public class OrchestrationStateQuery {
}
构造函数
public OrchestrationStateQuery()
{
FilterMap = new Dictionary<Type, OrchestrationStateQueryFilter>();
}
FilterMap()
public IDictionary<Type, OrchestrationStateQueryFilter> FilterMap { get; private set; }
GetFilters()
获取查询的 primary_filter、collection_of(secondary_filters)
public Tuple<OrchestrationStateQueryFilter, IEnumerable<OrchestrationStateQueryFilter>> GetFilters()
{
ICollection<OrchestrationStateQueryFilter> filters = FilterMap.Values;
if (filters.Count == 0)
{
return null;
}
var secondaryFilters = new List<OrchestrationStateQueryFilter>();
OrchestrationStateQueryFilter primaryFilter = filters.First();
int primaryFilterPrecedence = SafeGetFilterPrecedence(primaryFilter);
if (filters.Count > 1)
{
foreach (OrchestrationStateQueryFilter filter in filters)
{
int newPrecedence = SafeGetFilterPrecedence(filter);
if (newPrecedence > primaryFilterPrecedence)
{
secondaryFilters.Add(primaryFilter);
primaryFilter = filter;
primaryFilterPrecedence = newPrecedence;
}
else
{
secondaryFilters.Add(filter);
}
}
}
return new Tuple<OrchestrationStateQueryFilter, IEnumerable<OrchestrationStateQueryFilter>>(
primaryFilter, secondaryFilters);
}
3.1.5.3 - OrchestrationStateQueryFilter
src\DurableTask.Core\OrchestrationStateQueryFilter.cs
Abstract class for an orchestration state query filter
协调状态查询过滤器的抽象类
类定义
OrchestrationStateInstanceFilter 实现了 OrchestrationStateQueryFilter
public abstract class OrchestrationStateQueryFilter
{
}
这是一个空的抽象类。
它的实现有:
- OrchestrationStateInstanceFilter
- OrchestrationStateNameVersionFilter: 这个可以用来做版本过滤
3.1.5.4 - OrchestrationStateInstanceFilter
src\DurableTask.Core\OrchestrationStateInstanceFilter.cs
Filter for Orchestration instance filter
协调实例过滤器
类定义
OrchestrationStateInstanceFilter 实现了 OrchestrationStateQueryFilter
public class OrchestrationStateInstanceFilter : OrchestrationStateQueryFilter
{
构造函数
// 使用默认设置创建 OrchestrationStateInstanceFilter 的新实例
public OrchestrationStateInstanceFilter()
{
// default is exact match
StartsWith = false;
}
只定义有两个方法用来存取 InstanceId / ExecutionId 作为过滤器的匹配方式,另外 StartsWith 设置筛选器的匹配类型:
// 获取或设置过滤器的 InstanceId
public string InstanceId { get; set; }
// 获取或设置过滤器的 ExecutionId
public string ExecutionId { get; set; }
// 获取或设置筛选器的匹配类型:以开始或精确匹配开始
public bool StartsWith { get; set; }
3.1.5.5 - OrchestrationStateNameVersionFilter
src\DurableTask.Core\OrchestrationStateInstanceFilter.cs
Filter for Orchestration Name and Version
协调名称和版本过滤器
类定义
OrchestrationStateNameVersionFilter 实现了 OrchestrationStateQueryFilter
public class OrchestrationStateNameVersionFilter : OrchestrationStateQueryFilter
{
}
构造函数
// 使用默认设置创建 OrchestrationStateInstanceFilter 的新实例
public OrchestrationStateInstanceFilter()
{
// default is exact match
StartsWith = false;
}
只定义有两个方法用来存取 InstanceId / ExecutionId 作为过滤器的匹配方式,另外 StartsWith 设置筛选器的匹配类型:
public string Name { get; set; }
public string Version { get; set; }
3.1.6 - Entity
3.1.6.1 - TaskEntity
Abstract base class for entities
src\DurableTask.Core\Entities\TaskEntity.cs
TaskActivity 中定义了三个方法:
- ExecuteOperationBatchAsync()
ExecuteOperationBatchAsync() 方法
public abstract Task<EntityBatchResult> ExecuteOperationBatchAsync(EntityBatchRequest operations);
EnztityBatchRequest 类
A request for execution of a batch of operations on an entity.
- string InstanceId
- string EntityState
List<OperationRequest> Operations
OperationRequest 类
包含属性:
- string Operation
- Guid Id
- string Input
3.1.6.2 - EntityId
A unique identifier for an entity, consisting of entity name and entity key.
src\DurableTask.Core\Entities\EntityId.cs
EntityId 中定义以下属性:
- string Name
- string Key
3.1.7 - History
3.1.7.1 - History概述
介绍
以下介绍来自 README.md
Durable Task Framework History Events
以下是构成协调状态的一些常见历史事件。您可以在 DTFx 的 Azure Storage 和 MSSQL 存储后端的历史记录表中轻松查看这些事件。在使用 DTFx 代码、调试问题或创建直接读取历史记录的诊断工具(如 Durable Functions Monitor 项目)时,了解这些事件非常有用。
| Event Type | Description |
|---|---|
OrchestratorStarted |
协调器函数正在开始新的_执行/execution_。您将在历史记录中看到许多此类事件–每次协调器从 “等待 “状态恢复时都会出现一个。请注意,这并不意味着协调器首次启动–首次执行由 “ExecutionStarted “历史事件表示(见下文)。该事件的 timestamp 时间戳用于填充 CurrentDateTimeUtc 属性。 |
ExecutionStarted |
协调已开始首次执行。该事件包含协调器名称、输入内容和协调器的_scheduled_时间(可能早于历史记录中前面的 OrchestratorStarted事件)。这总是协调历史中的第二个事件。 |
TaskScheduled |
协调器调度了一项活动任务。该事件包括活动名称、输入和一个连续的 “EventId”,可用于将 “TaskScheduled " 事件与相应的 “TaskCompleted “或 “TaskFailed “事件关联起来。请注意,如果一个活动任务被重试,可能会生成多个 Task*** 事件。 |
TaskCompleted |
调度的任务活动已成功完成。TaskScheduledId 字段将与相应 TaskScheduled 事件的 “EventId” 字段匹配。 |
TaskFailed |
计划的任务活动以失败告终。TaskScheduledId 字段将与相应 “TaskScheduled” 事件的 “EventId” 字段匹配。 |
SubOrchestrationInstanceCreated |
协调器已调度子协调器。该事件包含已调度协调器的名称、实例 ID、输入和有序事件 ID,可用于将 SubOrchestrationInstanceCreated 事件与后续的 SubOrchestrationInstanceCompleted 或 SubOrchestrationInstanceFailed 历史事件关联起来。时间戳指的是调度子协调器的时间,它将早于开始执行的时间。请注意,如果一个活动任务被重试,可能会产生多个 SubOrchestrationInstance*** 事件。 |
SubOrchestrationInstanceCompleted |
调度的子协调器已成功完成。TaskScheduledId “字段将与相应 “SubOrchestrationInstanceCreated “事件的 “EventId “字段匹配。 |
SubOrchestrationInstanceFailed |
计划的子协调器已完成,但出现故障。TaskScheduledId 字段将与相应 SubOrchestrationInstanceCreated 事件的 EventId 字段匹配。 |
TimerCreated |
协调器安排了一个持久定时器。FireAt “属性包含定时器启动的日期。 |
TimerFired |
先前安排的持久定时器已启动。TimerId 字段将与相应 TimeCreated 事件的 EventId 字段匹配。 |
EventRaised |
协调(或持久实体中的实体)收到外部事件。该记录包含事件名称、有效载荷和事件_发送_的时间戳(应与历史事件实际被持久化的时间相同或更早)。 |
EventSent |
协调(或entity)向另一个协调(或entity)发送了单向消息。 |
ExecutionCompleted |
协调已完成。该事件包括协调的输出,不区分成功或失败。 |
ExecutionTerminated |
协调被 API 调用强制终止。该事件的时间戳表示计划终止的时间,而不一定是实际终止的时间。 |
OrchestratorCompleted |
协调器函数已等待并提交了任何副作用。您将在历史记录中看到许多此类事件–协调器每次等待时都会出现一个。请注意,这并不意味着协调器已经完成(完成由 ExecutionCompleted 或 ExecutionTerminated 表示)。 |
GenericEvent |
通用历史事件,有一个 Data 字段,但没有特定含义。这种历史事件并不常用。在某些情况下,该事件用于触发空闲协调的全新重放,例如在协调重绕之后。 |
HistoryStateEvent |
包含协调历史快照的历史事件。大多数现代后端类型都不使用这种事件类型。 |
3.1.7.2 - HistoryEvent事件
包含属性:
- int EventId
- EventType EventType
- bool IsPlayed
- DateTime Timestamp
- ExtensionDataObject ExtensionData
这个类也是其他 event 的父类。
3.1.7.3 - ExecutionStartedEvent事件
包含属性:
- string EventId
- string Input
- EventType EventType
- ParentInstance ParentInstance
- string Name
- string Version:可以复用
- IDictionary<string, string> Tags
- string Correlation
- DistributedTraceContext ParentTraceContext
- DateTime ScheduledStartTime
- int Generation
3.1.7.4 - OrchestratorStartedEvent事件
包含属性:
- string EventId
- EventType EventType
3.2 - DurableTask Dispatcher源码学习
3.2.1 - TaskOrchestrationDispatcher源码学习
Dispatcher for orchestrations to handle processing and renewing, completion of orchestration events
协调的调度程序,用于处理、更新和完成协调事件
src\DurableTask.Core\TaskOrchestrationDispatcher.cs
类定义
没任何借口或者基类:
public class TaskOrchestrationDispatcher{}
类字段
static readonly Task CompletedTask = Task.FromResult(0);
readonly INameVersionObjectManager<TaskOrchestration> objectManager;
readonly IOrchestrationService orchestrationService;
readonly WorkItemDispatcher<TaskOrchestrationWorkItem> dispatcher;
readonly DispatchMiddlewarePipeline dispatchPipeline;
readonly LogHelper logHelper;
ErrorPropagationMode errorPropagationMode;
readonly NonBlockingCountdownLock concurrentSessionLock;
readonly IEntityOrchestrationService? entityOrchestrationService;
readonly EntityBackendProperties? entityBackendProperties;
readonly TaskOrchestrationEntityParameters? entityParameters;
主要的功能在 objectManager , orchestrationService , dispatcher 和 DispatchMiddlewarePipeline。
构造函数
internal TaskOrchestrationDispatcher(
IOrchestrationService orchestrationService,
INameVersionObjectManager<TaskOrchestration> objectManager,
DispatchMiddlewarePipeline dispatchPipeline,
LogHelper logHelper,
ErrorPropagationMode errorPropagationMode)
{
// 这几个字段都是传递进来的
this.objectManager = objectManager ?? throw new ArgumentNullException(nameof(objectManager));
this.orchestrationService = orchestrationService ?? throw new ArgumentNullException(nameof(orchestrationService));
this.dispatchPipeline = dispatchPipeline ?? throw new ArgumentNullException(nameof(dispatchPipeline));
this.logHelper = logHelper ?? throw new ArgumentNullException(nameof(logHelper));
this.errorPropagationMode = errorPropagationMode;
// orchestrationService 转为 IEntityOrchestrationService
this.entityOrchestrationService = orchestrationService as IEntityOrchestrationService;
this.entityBackendProperties = this.entityOrchestrationService?.EntityBackendProperties;
this.entityParameters = TaskOrchestrationEntityParameters.FromEntityBackendProperties(this.entityBackendProperties);
// 初始化 dispatcher
this.dispatcher = new WorkItemDispatcher<TaskOrchestrationWorkItem>(
"TaskOrchestrationDispatcher",
item => item == null ? string.Empty : item.InstanceId,
this.OnFetchWorkItemAsync,
this.OnProcessWorkItemSessionAsync)
{
// 初始化 dispatcher 的字段
GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
GetDelayInSecondsAfterOnProcessException = orchestrationService.GetDelayInSecondsAfterOnProcessException,
SafeReleaseWorkItem = orchestrationService.ReleaseTaskOrchestrationWorkItemAsync,
AbortWorkItem = orchestrationService.AbandonTaskOrchestrationWorkItemAsync,
DispatcherCount = orchestrationService.TaskOrchestrationDispatcherCount,
MaxConcurrentWorkItems = orchestrationService.MaxConcurrentTaskOrchestrationWorkItems,
LogHelper = logHelper,
};
// To avoid starvation, we only allow half of all concurrently execution orchestrations to
// leverage extended sessions.
var maxConcurrentSessions = (int)Math.Ceiling(this.dispatcher.MaxConcurrentWorkItems / 2.0);
this.concurrentSessionLock = new NonBlockingCountdownLock(maxConcurrentSessions);
}
StartAsync() 方法
Starts the dispatcher to start getting and processing orchestration events
启动调度程序,开始获取和处理协调事件
public async Task StartAsync()
{
await this.dispatcher.StartAsync();
}
OnFetchWorkItemAsync() 方法
Method to get the next work item to process within supplied timeout
在提供的超时时间内获取下一个要处理的工作项的方法
protected Task<TaskOrchestrationWorkItem> OnFetchWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
{
if (this.entityBackendProperties?.UseSeparateQueueForEntityWorkItems == true)
{
// only orchestrations should be served by this dispatcher, so we call
// the method which returns work items for orchestrations only.
// 该调度器只应为协调提供服务,因此我们调用这个方法,该方法只返回协调的工作项。
Console.WriteLine("OnFetchWorkItemAsync: UseSeparateQueueForEntityWorkItems == true");
Console.WriteLine("OnFetchWorkItemAsync: this.entityOrchestrationService=" + this.entityOrchestrationService?.GetType().FullName);
return this.entityOrchestrationService!.LockNextOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
else
{
// both entities and orchestrations are served by this dispatcher,
// so we call the method that may return work items for either.
// 实体和编排都由该调度器提供服务,因此我们调用的方法可能会返回两者的工作项。
return this.orchestrationService.LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken);
}
}
增加日志打印,看到:
OnFetchWorkItemAsync: UseSeparateQueueForEntityWorkItems == true
OnFetchWorkItemAsync: this.entityOrchestrationService=Microsoft.Azure.WebJobs.Extensions.DurableTask.AzureStorageDurabilityProvider
OnProcessWorkItemSessionAsync() 方法
async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
{
// DTFx history replay expects that ExecutionStarted comes before other events.
// If this is not already the case, due to a race-condition, we re-order the
// messages to enforce this expectation.
EnsureExecutionStartedIsFirst(workItem.NewMessages);
try
{
if (workItem.Session == null)
{
// Legacy behavior
await this.OnProcessWorkItemAsync(workItem);
return;
}
var isExtendedSession = false;
CorrelationTraceClient.Propagate(
() =>
{
// Check if it is extended session.
// TODO: Remove this code - it looks incorrect and dangerous
isExtendedSession = this.concurrentSessionLock.Acquire();
this.concurrentSessionLock.Release();
workItem.IsExtendedSession = isExtendedSession;
});
var processCount = 0;
try
{
while (true)
{
// If the provider provided work items, execute them.
if (workItem.NewMessages?.Count > 0)
{
bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem);
if (isCompletedOrInterrupted)
{
break;
}
processCount++;
}
// Fetches beyond the first require getting an extended session lock, used to prevent starvation.
if (processCount > 0 && !isExtendedSession)
{
isExtendedSession = this.concurrentSessionLock.Acquire();
if (!isExtendedSession)
{
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-MaxOperations", "Failed to acquire concurrent session lock.");
break;
}
}
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-StartFetch", "Starting fetch of existing session.");
Stopwatch timer = Stopwatch.StartNew();
// Wait for new messages to arrive for the session. This call is expected to block (asynchronously)
// until either new messages are available or until a provider-specific timeout has expired.
workItem.NewMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
if (workItem.NewMessages == null)
{
break;
}
TraceHelper.Trace(
TraceEventType.Verbose,
"OnProcessWorkItemSession-EndFetch",
$"Fetched {workItem.NewMessages.Count} new message(s) after {timer.ElapsedMilliseconds} ms from existing session.");
workItem.OrchestrationRuntimeState.NewEvents.Clear();
}
}
finally
{
if (isExtendedSession)
{
TraceHelper.Trace(
TraceEventType.Verbose,
"OnProcessWorkItemSession-Release",
$"Releasing extended session after {processCount} batch(es).");
this.concurrentSessionLock.Release();
}
}
}
catch (SessionAbortedException e)
{
// Either the orchestration or the orchestration service explicitly abandoned the session.
OrchestrationInstance instance = workItem.OrchestrationRuntimeState?.OrchestrationInstance ?? new OrchestrationInstance { InstanceId = workItem.InstanceId };
this.logHelper.OrchestrationAborted(instance, e.Message);
TraceHelper.TraceInstance(TraceEventType.Warning, "TaskOrchestrationDispatcher-ExecutionAborted", instance, "{0}", e.Message);
await this.orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
}
}
OnProcessWorkItemAsync()
protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
var messagesToSend = new List<TaskMessage>();
var timerMessages = new List<TaskMessage>();
var orchestratorMessages = new List<TaskMessage>();
var isCompleted = false;
var continuedAsNew = false;
var isInterrupted = false;
// correlation
CorrelationTraceClient.Propagate(() => CorrelationTraceContext.Current = workItem.TraceContext);
ExecutionStartedEvent? continueAsNewExecutionStarted = null;
TaskMessage? continuedAsNewMessage = null;
IList<HistoryEvent>? carryOverEvents = null;
string? carryOverStatus = null;
workItem.OrchestrationRuntimeState.LogHelper = this.logHelper;
OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
OrchestrationRuntimeState originalOrchestrationRuntimeState = runtimeState;
// Distributed tracing support: each orchestration execution is a trace activity
// that derives from an established parent trace context. It is expected that some
// listener will receive these events and publish them to a distributed trace logger.
ExecutionStartedEvent startEvent =
runtimeState.ExecutionStartedEvent ??
workItem.NewMessages.Select(msg => msg.Event).OfType<ExecutionStartedEvent>().FirstOrDefault();
Activity? traceActivity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent);
OrchestrationState? instanceState = null;
Task? renewTask = null;
using var renewCancellationTokenSource = new CancellationTokenSource();
if (workItem.LockedUntilUtc < DateTime.MaxValue)
{
// start a task to run RenewUntil
renewTask = Task.Factory.StartNew(
() => RenewUntil(workItem, this.orchestrationService, this.logHelper, nameof(TaskOrchestrationDispatcher), renewCancellationTokenSource.Token),
renewCancellationTokenSource.Token);
}
try
{
// Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper))
{
// TODO : mark an orchestration as faulted if there is data corruption
this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
TraceHelper.TraceSession(
TraceEventType.Error,
"TaskOrchestrationDispatcher-DeletedOrchestration",
runtimeState.OrchestrationInstance?.InstanceId!,
"Received work-item for an invalid orchestration");
isCompleted = true;
traceActivity?.Dispose();
}
else
{
do
{
continuedAsNew = false;
continuedAsNewMessage = null;
this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name);
TraceHelper.TraceInstance(
TraceEventType.Verbose,
"TaskOrchestrationDispatcher-ExecuteUserOrchestration-Begin",
runtimeState.OrchestrationInstance!,
"Executing user orchestration: {0}",
JsonDataConverter.Default.Serialize(runtimeState.GetOrchestrationRuntimeStateDump(), true));
if (workItem.Cursor == null)
{
workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem);
}
else
{
await this.ResumeOrchestrationAsync(workItem);
}
IReadOnlyList<OrchestratorAction> decisions = workItem.Cursor.LatestDecisions.ToList();
this.logHelper.OrchestrationExecuted(
runtimeState.OrchestrationInstance!,
runtimeState.Name,
decisions);
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-ExecuteUserOrchestration-End",
runtimeState.OrchestrationInstance!,
"Executed user orchestration. Received {0} orchestrator actions: {1}",
decisions.Count,
string.Join(", ", decisions.Select(d => d.Id + ":" + d.OrchestratorActionType)));
// TODO: Exception handling for invalid decisions, which is increasingly likely
// when custom middleware is involved (e.g. out-of-process scenarios).
foreach (OrchestratorAction decision in decisions)
{
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-ProcessOrchestratorAction",
runtimeState.OrchestrationInstance!,
"Processing orchestrator action of type {0}",
decision.OrchestratorActionType);
switch (decision.OrchestratorActionType)
{
case OrchestratorActionType.ScheduleOrchestrator:
var scheduleTaskAction = (ScheduleTaskOrchestratorAction)decision;
var message = this.ProcessScheduleTaskDecision(
scheduleTaskAction,
runtimeState,
this.IncludeParameters,
traceActivity);
messagesToSend.Add(message);
break;
case OrchestratorActionType.CreateTimer:
var timerOrchestratorAction = (CreateTimerOrchestratorAction)decision;
timerMessages.Add(this.ProcessCreateTimerDecision(
timerOrchestratorAction,
runtimeState,
isInternal: false));
break;
case OrchestratorActionType.CreateSubOrchestration:
var createSubOrchestrationAction = (CreateSubOrchestrationAction)decision;
orchestratorMessages.Add(
this.ProcessCreateSubOrchestrationInstanceDecision(
createSubOrchestrationAction,
runtimeState,
this.IncludeParameters,
traceActivity));
break;
case OrchestratorActionType.SendEvent:
var sendEventAction = (SendEventOrchestratorAction)decision;
orchestratorMessages.Add(
this.ProcessSendEventDecision(sendEventAction, runtimeState));
break;
case OrchestratorActionType.OrchestrationComplete:
OrchestrationCompleteOrchestratorAction completeDecision = (OrchestrationCompleteOrchestratorAction)decision;
TaskMessage? workflowInstanceCompletedMessage =
this.ProcessWorkflowCompletedTaskDecision(completeDecision, runtimeState, this.IncludeDetails, out continuedAsNew);
if (workflowInstanceCompletedMessage != null)
{
// Send complete message to parent workflow or to itself to start a new execution
// Store the event so we can rebuild the state
carryOverEvents = null;
if (continuedAsNew)
{
continuedAsNewMessage = workflowInstanceCompletedMessage;
continueAsNewExecutionStarted = workflowInstanceCompletedMessage.Event as ExecutionStartedEvent;
if (completeDecision.CarryoverEvents.Any())
{
carryOverEvents = completeDecision.CarryoverEvents.ToList();
completeDecision.CarryoverEvents.Clear();
}
}
else
{
orchestratorMessages.Add(workflowInstanceCompletedMessage);
}
}
isCompleted = !continuedAsNew;
break;
default:
throw TraceHelper.TraceExceptionInstance(
TraceEventType.Error,
"TaskOrchestrationDispatcher-UnsupportedDecisionType",
runtimeState.OrchestrationInstance!,
new NotSupportedException($"Decision type '{decision.OrchestratorActionType}' not supported"));
}
// Underlying orchestration service provider may have a limit of messages per call, to avoid the situation
// we keep on asking the provider if message count is ok and stop processing new decisions if not.
//
// We also put in a fake timer to force next orchestration task for remaining messages
int totalMessages = messagesToSend.Count + orchestratorMessages.Count + timerMessages.Count;
if (this.orchestrationService.IsMaxMessageCountExceeded(totalMessages, runtimeState))
{
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-MaxMessageCountReached",
runtimeState.OrchestrationInstance!,
"MaxMessageCount reached. Adding timer to process remaining events in next attempt.");
if (isCompleted || continuedAsNew)
{
TraceHelper.TraceInstance(
TraceEventType.Information,
"TaskOrchestrationDispatcher-OrchestrationAlreadyCompleted",
runtimeState.OrchestrationInstance!,
"Orchestration already completed. Skip adding timer for splitting messages.");
break;
}
var dummyTimer = new CreateTimerOrchestratorAction
{
Id = FrameworkConstants.FakeTimerIdToSplitDecision,
FireAt = DateTime.UtcNow
};
timerMessages.Add(this.ProcessCreateTimerDecision(
dummyTimer,
runtimeState,
isInternal: true));
isInterrupted = true;
break;
}
}
// correlation
CorrelationTraceClient.Propagate(() =>
{
if (runtimeState.ExecutionStartedEvent != null)
runtimeState.ExecutionStartedEvent.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
});
// finish up processing of the work item
if (!continuedAsNew && runtimeState.Events.Last().EventType != EventType.OrchestratorCompleted)
{
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
}
if (isCompleted)
{
TraceHelper.TraceSession(TraceEventType.Information, "TaskOrchestrationDispatcher-DeletingSessionState", workItem.InstanceId, "Deleting session state");
if (runtimeState.ExecutionStartedEvent != null)
{
instanceState = Utils.BuildOrchestrationState(runtimeState);
}
}
else
{
if (continuedAsNew)
{
TraceHelper.TraceSession(
TraceEventType.Information,
"TaskOrchestrationDispatcher-UpdatingStateForContinuation",
workItem.InstanceId,
"Updating state for continuation");
// correlation
CorrelationTraceClient.Propagate(() =>
{
continueAsNewExecutionStarted!.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
});
// Copy the distributed trace context, if any
continueAsNewExecutionStarted!.SetParentTraceContext(runtimeState.ExecutionStartedEvent);
runtimeState = new OrchestrationRuntimeState();
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
runtimeState.AddEvent(continueAsNewExecutionStarted!);
runtimeState.Status = workItem.OrchestrationRuntimeState.Status ?? carryOverStatus;
carryOverStatus = workItem.OrchestrationRuntimeState.Status;
if (carryOverEvents != null)
{
foreach (var historyEvent in carryOverEvents)
{
runtimeState.AddEvent(historyEvent);
}
}
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
workItem.OrchestrationRuntimeState = runtimeState;
workItem.Cursor = null;
}
instanceState = Utils.BuildOrchestrationState(runtimeState);
}
} while (continuedAsNew);
}
}
finally
{
if (renewTask != null)
{
try
{
renewCancellationTokenSource.Cancel();
await renewTask;
}
catch (ObjectDisposedException)
{
// ignore
}
catch (OperationCanceledException)
{
// ignore
}
}
}
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
// some backends expect the original runtime state object
workItem.OrchestrationRuntimeState = originalOrchestrationRuntimeState;
}
runtimeState.Status = runtimeState.Status ?? carryOverStatus;
if (instanceState != null)
{
instanceState.Status = runtimeState.Status;
}
await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
workItem,
runtimeState,
continuedAsNew ? null : messagesToSend,
orchestratorMessages,
continuedAsNew ? null : timerMessages,
continuedAsNewMessage,
instanceState);
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
workItem.OrchestrationRuntimeState = runtimeState;
}
return isCompleted || continuedAsNew || isInterrupted;
}
StopAsync() 方法
Stops the dispatcher to stop getting and processing orchestration events
停止调度程序,以停止获取和处理协调事件
public async Task StopAsync(bool forced)
{
await this.dispatcher.StopAsync(forced);
}
NonBlockingCountdownLock
一个 CountdownLock 的实现:
internal class NonBlockingCountdownLock
{
int available;
public NonBlockingCountdownLock(int available)
{
if (available <= 0)
{
throw new ArgumentOutOfRangeException(nameof(available));
}
this.available = available;
this.Capacity = available;
}
public int Capacity { get; }
public bool Acquire()
{
if (this.available <= 0)
{
return false;
}
if (Interlocked.Decrement(ref this.available) >= 0)
{
return true;
}
// the counter went negative - fix it
Interlocked.Increment(ref this.available);
return false;
}
public void Release()
{
Interlocked.Increment(ref this.available);
}
}
3.2.2 - WorkItemDispatcher源码学习
Dispatcher class for fetching and processing work items of the supplied type
调度程序类,用于获取和处理所提供类型的工作项
src\DurableTask.Core\WorkItemDispatcher.cs
类定义
public class WorkItemDispatcher<T> : IDisposable
类字段
const int DefaultMaxConcurrentWorkItems = 20;
const int DefaultDispatcherCount = 1;
const int BackOffIntervalOnInvalidOperationSecs = 10;
const int CountDownToZeroDelay = 5;
// ReSharper disable once StaticMemberInGenericType
static readonly TimeSpan DefaultReceiveTimeout = TimeSpan.FromSeconds(30);
readonly string id;
readonly string name;
readonly object thisLock = new object();
readonly SemaphoreSlim initializationLock = new SemaphoreSlim(1, 1);
volatile int concurrentWorkItemCount;
volatile int countDownToZeroDelay;
volatile int delayOverrideSecs;
volatile int activeFetchers;
bool isStarted;
SemaphoreSlim concurrencyLock;
CancellationTokenSource shutdownCancellationTokenSource;
方法定义
readonly Func<T, string> workItemIdentifier;
Func<TimeSpan, CancellationToken, Task<T>> FetchWorkItem { get; }
Func<T, Task> ProcessWorkItem { get; }
/// <summary>
/// Method to execute for safely releasing a work item
/// </summary>
public Func<T, Task> SafeReleaseWorkItem;
/// <summary>
/// Method to execute for aborting a work item
/// </summary>
public Func<T, Task> AbortWorkItem;
/// <summary>
/// Method to get a delay to wait after a fetch exception
/// </summary>
public Func<Exception, int> GetDelayInSecondsAfterOnFetchException = (exception) => 0;
/// <summary>
/// Method to get a delay to wait after a process exception
/// </summary>
public Func<Exception, int> GetDelayInSecondsAfterOnProcessException = (exception) => 0;
构造函数
public WorkItemDispatcher(
string name,
Func<T, string> workItemIdentifier,
Func<TimeSpan, CancellationToken, Task<T>> fetchWorkItem,
Func<T, Task> processWorkItem)
{
this.name = name;
this.id = Guid.NewGuid().ToString("N");
this.workItemIdentifier = workItemIdentifier ?? throw new ArgumentNullException(nameof(workItemIdentifier));
this.FetchWorkItem = fetchWorkItem ?? throw new ArgumentNullException(nameof(fetchWorkItem));
this.ProcessWorkItem = processWorkItem ?? throw new ArgumentNullException(nameof(processWorkItem));
}
对照 TaskOrchestrationDispatcher 中的初始化代码:
internal TaskOrchestrationDispatcher(
IOrchestrationService orchestrationService,
INameVersionObjectManager<TaskOrchestration> objectManager,
DispatchMiddlewarePipeline dispatchPipeline,
LogHelper logHelper,
ErrorPropagationMode errorPropagationMode)
{
......
// 初始化 dispatcher
this.dispatcher = new WorkItemDispatcher<TaskOrchestrationWorkItem>(
"TaskOrchestrationDispatcher",
item => item == null ? string.Empty : item.InstanceId,
this.OnFetchWorkItemAsync,
this.OnProcessWorkItemSessionAsync)
{
// 初始化 dispatcher 的字段
GetDelayInSecondsAfterOnFetchException = orchestrationService.GetDelayInSecondsAfterOnFetchException,
GetDelayInSecondsAfterOnProcessException = orchestrationService.GetDelayInSecondsAfterOnProcessException,
SafeReleaseWorkItem = orchestrationService.ReleaseTaskOrchestrationWorkItemAsync,
AbortWorkItem = orchestrationService.AbandonTaskOrchestrationWorkItemAsync,
DispatcherCount = orchestrationService.TaskOrchestrationDispatcherCount,
MaxConcurrentWorkItems = orchestrationService.MaxConcurrentTaskOrchestrationWorkItems,
LogHelper = logHelper,
};
......
}
Func<T, string> workItemIdentifier 的实现是:
item => item == null ? string.Empty : item.InstanceId,
fetchWorkItem 的实现是 TaskOrchestrationDispatcher.this.OnFetchWorkItemAsync,fetchWorkItem 的实现是 TaskOrchestrationDispatcher.this.OnProcessWorkItemSessionAsync,
DispatcherCount 由 orchestrationService.TaskOrchestrationDispatcherCount 指定,
MaxConcurrentWorkItems 由 orchestrationService.MaxConcurrentTaskOrchestrationWorkItems 指定。
StartAsync() 方法
Starts the work item dispatcher:
public async Task StartAsync()
{
if (!this.isStarted)
{
await this.initializationLock.WaitAsync();
try
{
if (this.isStarted)
{
throw TraceHelper.TraceException(TraceEventType.Error, "WorkItemDispatcherStart-AlreadyStarted", new InvalidOperationException($"WorkItemDispatcher '{this.name}' has already started"));
}
this.concurrencyLock?.Dispose();
this.concurrencyLock = new SemaphoreSlim(this.MaxConcurrentWorkItems);
this.shutdownCancellationTokenSource?.Dispose();
this.shutdownCancellationTokenSource = new CancellationTokenSource();
this.isStarted = true;
TraceHelper.Trace(TraceEventType.Information, "WorkItemDispatcherStart", $"WorkItemDispatcher('{this.name}') starting. Id {this.id}.");
for (var i = 0; i < this.DispatcherCount; i++)
{
string dispatcherId = i.ToString();
// 创建 context
var context = new WorkItemDispatcherContext(this.name, this.id, dispatcherId);
this.LogHelper.DispatcherStarting(context);
// We just want this to Run we intentionally don't wait
// 我们只是想让它运行起来,我们故意不等待
#pragma warning disable 4014
Task.Run(() => this.DispatchAsync(context));
#pragma warning restore 4014
}
}
finally
{
this.initializationLock.Release();
}
}
}
DispatchAsync() 方法
async Task DispatchAsync(WorkItemDispatcherContext context)
{
string dispatcherId = context.DispatcherId;
bool logThrottle = true;
while (this.isStarted)
{
if (!await this.concurrencyLock.WaitAsync(TimeSpan.FromSeconds(5)))
{
if (logThrottle)
{
// This can happen frequently under heavy load.
// To avoid log spam, we log just once until we can proceed.
this.LogHelper.FetchingThrottled(
context,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-MaxOperations",
this.GetFormattedLog(dispatcherId, $"Max concurrent operations ({this.concurrentWorkItemCount}) are already in progress. Still waiting for next accept."));
logThrottle = false;
}
continue;
}
logThrottle = true;
var delaySecs = 0;
T workItem = default(T);
try
{
Interlocked.Increment(ref this.activeFetchers);
this.LogHelper.FetchWorkItemStarting(context, DefaultReceiveTimeout, this.concurrentWorkItemCount, this.MaxConcurrentWorkItems);
TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-StartFetch",
this.GetFormattedLog(dispatcherId, $"Starting fetch with timeout of {DefaultReceiveTimeout} ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
Stopwatch timer = Stopwatch.StartNew();
// 在这里开始 fetch workitem
workItem = await this.FetchWorkItem(DefaultReceiveTimeout, this.shutdownCancellationTokenSource.Token);
if (!IsNull(workItem))
{
string workItemId = this.workItemIdentifier(workItem);
this.LogHelper.FetchWorkItemCompleted(
context,
workItemId,
timer.Elapsed,
this.concurrentWorkItemCount,
this.MaxConcurrentWorkItems);
}
TraceHelper.Trace(
TraceEventType.Verbose,
"WorkItemDispatcherDispatch-EndFetch",
this.GetFormattedLog(dispatcherId, $"After fetch ({timer.ElapsedMilliseconds} ms) ({this.concurrentWorkItemCount}/{this.MaxConcurrentWorkItems} max)"));
}
catch (TimeoutException)
{
delaySecs = 0;
}
catch (TaskCanceledException exception)
{
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherDispatch-TaskCanceledException",
this.GetFormattedLog(dispatcherId, $"TaskCanceledException while fetching workItem, should be harmless: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
catch (Exception exception)
{
if (!this.isStarted)
{
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherDispatch-HarmlessException",
this.GetFormattedLog(dispatcherId, $"Harmless exception while fetching workItem after Stop(): {exception.Message}"));
}
else
{
this.LogHelper.FetchWorkItemFailure(context, exception);
// TODO : dump full node context here
TraceHelper.TraceException(
TraceEventType.Warning,
"WorkItemDispatcherDispatch-Exception",
exception,
this.GetFormattedLog(dispatcherId, $"Exception while fetching workItem: {exception.Message}"));
delaySecs = this.GetDelayInSecondsAfterOnFetchException(exception);
}
}
finally
{
Interlocked.Decrement(ref this.activeFetchers);
}
// 然后开始调度 workitem
var scheduledWorkItem = false;
if (!IsNull(workItem))
{
if (!this.isStarted)
{
if (this.SafeReleaseWorkItem != null)
{
await this.SafeReleaseWorkItem(workItem);
}
}
else
{
Interlocked.Increment(ref this.concurrentWorkItemCount);
// We just want this to Run we intentionally don't wait
#pragma warning disable 4014
// 开始 workitem 的处理
Task.Run(() => this.ProcessWorkItemAsync(context, workItem));
#pragma warning restore 4014
scheduledWorkItem = true;
}
}
delaySecs = Math.Max(this.delayOverrideSecs, delaySecs);
if (delaySecs > 0)
{
await Task.Delay(TimeSpan.FromSeconds(delaySecs));
}
if (!scheduledWorkItem)
{
this.concurrencyLock.Release();
}
}
this.LogHelper.DispatcherStopped(context);
}
这里的 FetchWorkItem() 方法和 ProcessWorkItemAsync() 方法都是由构造函数传递的(实际由 TaskOrchestrationDispatcher 传递)。
StopAsync() 方法
Stops the dispatcher to stop getting and processing orchestration events
停止调度程序,以停止获取和处理协调事件
public async Task StopAsync(bool forced)
{
await this.dispatcher.StopAsync(forced);
}
3.3 - DurableTask Orchestration源码学习
3.3.1 - IOrchestrationService
Orchestration Service interface for performing task hub management operations and handling orchestrations and work items’ state
协调服务接口,用于执行任务中心管理操作,处理协调和工作项状态
代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationService.cs
接口定义
IOrchestrationService 是一个接口:
public interface IOrchestrationService
{}
方法定义
Start
/// <summary>
/// Starts the service initializing the required resources
/// </summary>
Task StartAsync();
Stop
/// <summary>
/// Stops the orchestration service gracefully
/// </summary>
Task StopAsync();
/// <summary>
/// Stops the orchestration service with optional forced flag
/// </summary>
Task StopAsync(bool isForced);
Create
/// <summary>
/// Deletes and Creates the necessary resources for the orchestration service and the instance store
/// </summary>
Task CreateAsync();
/// <summary>
/// Deletes and Creates the necessary resources for the orchestration service and optionally the instance store
/// </summary>
Task CreateAsync(bool recreateInstanceStore);
/// <summary>
/// Creates the necessary resources for the orchestration service and the instance store
/// </summary>
Task CreateIfNotExistsAsync();
Delete
/// <summary>
/// Deletes the resources for the orchestration service and the instance store
/// </summary>
Task DeleteAsync();
/// <summary>
/// Deletes the resources for the orchestration service and optionally the instance store
/// </summary>
Task DeleteAsync(bool deleteInstanceStore);
LockNextTaskOrchestrationWorkItem
等待下一个协调工作项并返回协调工作项,这个方法是重点。
/// <summary>
/// Wait for the next orchestration work item and return the orchestration work item
/// </summary>
Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken);
LockNextTaskActivity
/// <summary>
/// Wait for an lock the next task activity to be processed
/// </summary>
Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken);
3.3.2 - IOrchestrationServiceClient
Interface to allow creation of new task orchestrations and query their status.
允许创建新任务协调和查询其状态的界面。
代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationServiceClient.cs
接口定义
IOrchestrationServiceClient 是一个接口:
public interface IOrchestrationServiceClient
{}
方法定义
CreateTaskOrchestration
/// <summary>
/// Creates a new orchestration
/// </summary>
/// <param name="creationMessage">Orchestration creation message</param>
/// <exception cref="OrchestrationAlreadyExistsException">Will throw an OrchestrationAlreadyExistsException exception If any orchestration with the same instance Id exists in the instance store.</exception>
/// <returns></returns>
Task CreateTaskOrchestrationAsync(TaskMessage creationMessage);
/// <summary>
/// Creates a new orchestration and specifies a subset of states which should be de duplicated on in the client side
/// </summary>
/// <param name="creationMessage">Orchestration creation message</param>
/// <param name="dedupeStatuses">States of previous orchestration executions to be considered while de-duping new orchestrations on the client</param>
/// <exception cref="OrchestrationAlreadyExistsException">Will throw an OrchestrationAlreadyExistsException exception If any orchestration with the same instance Id exists in the instance store and it has a status specified in dedupeStatuses.</exception>
/// <returns></returns>
Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses);
创建新的协调,并指定应在客户端去重复的状态子集
SendTaskOrchestrationMessage
为协调发送新信息
TBD: 这个方法不太理解在做什么?
/// <summary>
/// Send a new message for an orchestration
/// </summary>
/// <param name="message">Message to send</param>
/// <returns></returns>
Task SendTaskOrchestrationMessageAsync(TaskMessage message);
/// <summary>
/// Send a new set of messages for an orchestration
/// </summary>
/// <param name="messages">Messages to send</param>
/// <returns></returns>
Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages);
WaitForOrchestration
/// <summary>
/// Wait for an orchestration to reach any terminal state within the given timeout
/// </summary>
/// <param name="instanceId">Instance id of the orchestration</param>
/// <param name="executionId">Execution id of the orchestration</param>
/// <param name="timeout">Maximum amount of time to wait</param>
/// <param name="cancellationToken">Task cancellation token</param>
Task<OrchestrationState> WaitForOrchestrationAsync(
string instanceId,
string executionId,
TimeSpan timeout,
CancellationToken cancellationToken);
ForceTerminateTaskOrchestration
/// <summary>
/// Forcefully terminate the specified orchestration instance
/// </summary>
/// <param name="instanceId">Instance to terminate</param>
/// <param name="reason">Reason for termination</param>
Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason);
GetOrchestrationState
/// <summary>
/// Get a list of orchestration states from the instance storage for the most current execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="allExecutions">True if method should fetch all executions of the instance, false if the method should only fetch the most recent execution</param>
/// <returns>List of OrchestrationState objects that represents the list of orchestrations in the instance store</returns>
Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions);
/// <summary>
/// Get a list of orchestration states from the instance storage for the specified execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>The OrchestrationState of the specified instanceId or null if not found</returns>
Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId);
GetOrchestrationHistory
/// <summary>
/// Get a string dump of the execution history of the specified orchestration instance specified execution (generation) of the specified instance
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>String with formatted JSON representing the execution history</returns>
Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId);
PurgeOrchestrationHistory
清除超过指定阈值时间的协调实例状态和历史记录。还会清除 Blob 存储。
/// <summary>
/// Purges orchestration instance state and history for orchestrations older than the specified threshold time.
/// Also purges the blob storage.
/// </summary>
/// <param name="thresholdDateTimeUtc">Threshold date time in UTC</param>
/// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);
3.3.3 - IOrchestrationServiceInstanceStore
Instance Store provider interface to allow storage and lookup for orchestration state and event history
实例存储提供商接口,允许存储和查找协调状态和事件历史记录
代码实现自在 Durabletask 仓库下的 src\DurableTask.Core\IOrchestrationServiceInstanceStore.cs
接口定义
IOrchestrationServiceInstanceStore 是一个接口:
public interface IOrchestrationServiceInstanceStore
{}
方法定义
生命周期相关的方法
/// <summary>
/// Gets the maximum length a history entry can be so it can be truncated if necessary
/// </summary>
/// <returns>The maximum length</returns>
int MaxHistoryEntryLength { get; }
/// <summary>
/// Runs initialization to prepare the instance store for use
/// </summary>
/// <param name="recreate">Flag to indicate whether the store should be recreated.</param>
Task InitializeStoreAsync(bool recreate);
/// <summary>
/// Deletes instances instance store
/// </summary>
Task DeleteStoreAsync();
Entity 相关的方法
/// <summary>
/// Writes a list of history events to instance store
/// </summary>
/// <param name="entities">List of history events to write</param>
Task<object> WriteEntitiesAsync(IEnumerable<InstanceEntityBase> entities);
/// <summary>
/// Get a list of state events from instance store
/// </summary>
/// <param name="instanceId">The instance id to return state for</param>
/// <param name="executionId">The execution id to return state for</param>
/// <returns>The matching orchestration state or null if not found</returns>
Task<IEnumerable<OrchestrationStateInstanceEntity>> GetEntitiesAsync(string instanceId, string executionId);
/// <summary>
/// Deletes a list of history events from instance store
/// </summary>
/// <param name="entities">List of history events to delete</param>
Task<object> DeleteEntitiesAsync(IEnumerable<InstanceEntityBase> entities);
GetOrchestrationState
获取指定实例的协调状态列表
/// <summary>
/// Gets a list of orchestration states for a given instance
/// </summary>
/// <param name="instanceId">The instance id to return state for</param>
/// <param name="allInstances">Flag indication whether to get all history execution ids or just the most recent</param>
/// <returns>List of matching orchestration states</returns>
Task<IEnumerable<OrchestrationStateInstanceEntity>> GetOrchestrationStateAsync(string instanceId, bool allInstances);
/// <summary>
/// Gets the orchestration state for a given instance and execution id
/// </summary>
/// <param name="instanceId">The instance id to return state for</param>
/// <param name="executionId">The execution id to return state for</param>
/// <returns>The matching orchestration state or null if not found</returns>
Task<OrchestrationStateInstanceEntity> GetOrchestrationStateAsync(string instanceId, string executionId);
OrchestrationStateInstanceEntity 是这样的:
public class OrchestrationStateInstanceEntity : InstanceEntityBase
{
/// <summary>
/// The orchestration state for this instance entity
/// </summary>
public OrchestrationState State;
}
ForceTerminateTaskOrchestration
/// <summary>
/// Forcefully terminate the specified orchestration instance
/// </summary>
/// <param name="instanceId">Instance to terminate</param>
/// <param name="reason">Reason for termination</param>
Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason);
GetOrchestrationState
/// <summary>
/// Get a list of orchestration states from the instance storage for the most current execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="allExecutions">True if method should fetch all executions of the instance, false if the method should only fetch the most recent execution</param>
/// <returns>List of OrchestrationState objects that represents the list of orchestrations in the instance store</returns>
Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions);
/// <summary>
/// Get a list of orchestration states from the instance storage for the specified execution (generation) of the specified instance.
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>The OrchestrationState of the specified instanceId or null if not found</returns>
Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId);
GetOrchestrationHistory
/// <summary>
/// Get a string dump of the execution history of the specified orchestration instance specified execution (generation) of the specified instance
/// </summary>
/// <param name="instanceId">Instance id</param>
/// <param name="executionId">Execution id</param>
/// <returns>String with formatted JSON representing the execution history</returns>
Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId);
PurgeOrchestrationHistory
清除超过指定阈值时间的协调实例状态和历史记录。还会清除 Blob 存储。
/// <summary>
/// Purges orchestration instance state and history for orchestrations older than the specified threshold time.
/// Also purges the blob storage.
/// </summary>
/// <param name="thresholdDateTimeUtc">Threshold date time in UTC</param>
/// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);
OrchestrationHistory 相关的方法
/// <summary>
/// Gets the list of history events for a given instance and execution id
/// </summary>
/// <param name="instanceId">The instance id to return history for</param>
/// <param name="executionId">The execution id to return history for</param>
/// <returns>List of history events</returns>
Task<IEnumerable<OrchestrationWorkItemInstanceEntity>> GetOrchestrationHistoryEventsAsync(string instanceId, string executionId);
/// <summary>
/// Purges history from storage for given time range
/// </summary>
/// <param name="thresholdDateTimeUtc">The datetime in UTC to use as the threshold for purging history</param>
/// <param name="timeRangeFilterType">What to compare the threshold date time against</param>
/// <returns>The number of history events purged.</returns>
Task<int> PurgeOrchestrationHistoryEventsAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType);
JumpStartEntities 相关的方法
/// <summary>
/// Writes a list of jump start events to instance store
/// </summary>
/// <param name="entities">List of jump start events to write</param>
Task<object> WriteJumpStartEntitiesAsync(IEnumerable<OrchestrationJumpStartInstanceEntity> entities);
/// <summary>
/// Deletes a list of jump start events from instance store
/// </summary>
/// <param name="entities">List of jump start events to delete</param>
Task<object> DeleteJumpStartEntitiesAsync(IEnumerable<OrchestrationJumpStartInstanceEntity> entities);
/// <summary>
/// Get a list of jump start events from instance store
/// </summary>
/// <returns>List of jump start events</returns>
Task<IEnumerable<OrchestrationJumpStartInstanceEntity>> GetJumpStartEntitiesAsync(int top);