Introduction
This article is an advanced-level technical elaboration of how an Orleans grain activation process a message.
I will cover request/message queuing, processing, interleaving, the activation task scheduler, processing concurrency, single-threaded execution guarantees, also how a grain method executes and how it all ties back to the NET ThreadPool.
I will not cover how a method call is translated into a message, how it gets serialized, how directory lookup, placement and routing work, and finally how that message gets deserialized and re-transformed into a method call.
I refer to "request" and "message" interchangeably throughout the article. They are the same thing.
Primer Grain
Let's take this simple grain as an example. I invoke the Ping
method multiple times, and upon each invocation it increments the counter, calls some asynchronous method, and returns the counter value.
var grain = _grainFactory.GetGrain<IPingGrain>(0);
Console.WriteLine(await grain.Ping()); // prints: 1
Console.WriteLine(await grain.Ping()); // prints: 2
Console.WriteLine(await grain.Ping()); // prints: 3
public interface IPingGrain : IGrainWithIntegerKey
{
Task<int> Ping();
}
public class PingGrain : Grain, IPingGrain
{
private int _counter;
public async Task<int> Ping()
{
_counter++;
await SomeIOAsync();
return _counter;
}
}
Grain Activation
Every message sent to a grain activation is actually a request to invoke a method, on that activation. But what really is an activation?
For the surface, a grain activation looks to be just an instance of the grain class, but that is only partially true. A grain activation is much more than an instance, it is an ensemble of components that work together to achive the traits that make up an actor. There are many components / processes that interplay, but these are the most important once.
- Message Queuing & Processing
- System Command Processing
- Task Scheduling & Execution
- Lifecycle Participation
- State Management
- Migration Management
Request Scheduling
Each grain activation in Orleans is associated with its own message queue. When an activation is created, it kickstarts a task in the background which waits for signals to begin processing one or more messages, and/or system commands. This is the message loop!
When a request is received, it is enqueued in the grain's message queue. Messages can come from different sources such as: direct grain calls, stream message delivery, timer tick delivery, reminder tick delivery, system target commands, etc. But a lot of them are delivered via the same networking components, and fall into one of these buckets:
- Message Center - It is the entry point for external and inter-silo communications.
- Hosted Client - Represents a 'client' which is part of the silo itself i.e.
IGrainFactory
or evenIClusterClient
(internal client only). - Grain Timer - Timers are special, as they do not require any intermediary component to deliver messages (callback executions) to a given activation.
When a message arrives, it signals that fact! The message loop begins by checking if there are any pending system commands that need to be executed, but only if the activation is not currently executing requests. If that is the case, it first addresses the commands, and than it proceeds to handle the pending messages. This is all assuming that the activation is in a valid state, and is not currently deactivating.
By default, Orleans executes all messages to completion, which means new messages must wait in the queue until the current one finishes, even if the current one is performing an asynchronous operation. This default behavior can be changed if the grain type is marked with the Reentrant
attribute, or if the target method is marked with AlwaysInterleave
, or the less-commonly used MayInterleave
attribute. In those cases the next messages' tasks' will start interleaving with ongoing tasks in a turn-based fashion, which I will explain later on.
It is only now that the real work begins!!
Task Scheduling
See, I mentioned above that a grain activation is much more than an instance of a grain class. As a matter of fact, even saying that it an instance of a grain class, is not actually true!
Our primer PingGrain
above, does not need to inherit from the base Grain
type. So techically an activation is an instance of the IGrainContext
, which represents a grain from the perspective of the runtime + the extra stuff mentioned above.
Two core components of an activation are the WorkItemGroup
and the ActivationTaskScheduler
. Every activation has its own WorkItemGroup
instance. This is "just" a data structure that represents a different thing depending on which prespective it is looked at.
- From the prespective of the Orleans runtime, it is a place to queue up activation tasks (work items).
- From the prespective of the .NET runtime, it is a work item which can be scheduled for execution onto the
ThreadPool
.
In addition to the WorkItemGroup
, every activation also has its own custom TaskScheduler
(called the ActivationTaskScheduler
). To clear out a common misconception: Orleans does not have its own thread pool (it used to though), instead it delegates task execution by enqueing them to the .NET thread pool, and it does so via the ActivationTaskScheduler
.
But not directly!!
See, Orleans could enqueue all work items as they come, to the ThreadPool
, but the activation must always execute in a single-threaded context. Just randomly enqueing work items, means they can be scheduled to random ThreadPool
-threads by the scheduler, which in turn breaks the single-threaded guarantee of activations!
Let's refer back to our PingGrain
, specifically the method implementation.
public async Task<int> Ping()
{
_counter++;
await SomeIOAsync();
return _counter;
}
At its core, a Task
can represent in of these:
- Some "work" that may execute on any thread, these are known as delegate tasks.
- Some kind of "event/signal" to wait upon, these are known as promise tasks.
The execution of a method is broken down by the compiler into discrete steps. These steps or breakpoints usually occurr at await
points, but are not limited to those. Task continuations performed via ContinueWith
achive the same effect. As techically that is what happens under the hood with async
machinery.
So the above code is split into the "counter incrementing" part of the code, which represents a synchronously completing task. The asynchronous IO operation of SomeIOAsync
. And the continuation created after SomeIOAsync
completes, which is the "method returning" part of the code.
We can write the above method via ContinueWith
to have a better grasp, as this will be important moving forwards.
public Task<int> Ping()
{
_counter++; // Task 1: completes synchronously
return SomeIOAsync() // Task2: completes asynchronously
.ContinueWith(
task => // Task 3: The continuation is a
// synchronously completing task itself
{
// Some error-handling code would be here,
// because ContinueWith does not automatically
// propagate exceptions, as await'ing does.
return _counter;
});
}
_counter++
is a delegate task, because it represents some "work" that needs to be executed on the CPU (INC instruction)SomeIOAsync()
is a promise tasks, because it represents some "event" that needs to happen (no CPU instruction, other than SYSCALL).return _counter
is a delegate task, because it too represents some "work" that needs to be executed on the CPU (RET instruction).
The .NET runtime schedules the execution of all of them, but is instructed by the current task scheduler, which in the context of grain code, is the ActivationTaskScheduler
.
It is logical to think that the ActivationTaskScheduler
enqueues the tasks in the ThreadPool
, but that would break the single-threaded guarantees, as random ThreadPool
-threads could end up executing those tasks, in any order. As a matter of fact, even the continuation of the promise task by default executes on a different thread than the one the promise task ran on.
The WorkItemGroup
is the component that achives that. As mentioned before, every activation has its own, and it is the WorkItemGroup
which creates and keeps a strong reference to the ActivationTaskScheduler
. Interestingly, the scheduler also keeps a reference to the WorkItemGroup
that created it.
When the .NET runtime calls the scheduler's QueueTask
method, the scheduler doesn't enqueue that into the ThreadPool
, instead it enqueues the task to the parent WorkItemGroup
, by means of putting it inside of its internal System.Collections.Generic.Queue<T>
.
Why a
Queue<T>
and not aConcurrentQueue<T>
? Shouldn't this be a thread-safe operation?
Well, it is thread-safe! Thread synchronization is achived via locking, but the reason for picking a regular queue is because WorkItemGroup
has a status state associated with it. Namely: Waiting
, Runnable
, and Running
. Which are important to the next step.
A ConcurrentQueue
does not provide a way to atomically check the queue's state and update the status, as they are mutually exclusive properties, and locking would be required again.
When the WorkItemGroup
enqueues the task, it has a choice to make. If its Runnable
, or is currently Running
previously enqueued tasks for the activation, it is done and the newly enqueued task has to wait for another round.
Otherwise, if its Waiting
, than it queues itself for execution in the ThreadPool
. Remember that WorkItemGroup
implements the IThreadPoolWorkItem
which represents any work item that can be scheduled to run on the ThreadPool
.
The way it schedules itself is very interesting! It does so by calling:
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true);
First of, unsafe because it is faster than the safe variant, as it does not propagate the calling stack by means of not capturing the ExecutionContext
. But also note that it hints that it wants to be scheduled in the current threads' local queue. Because work items queued via UnsafeQueueUserWorkItem
or QueueUserWorkItem
are put in the ThreadPool
s global queue.
This is an optimization as its beneficial for a thread to continue processing related tasks, as to preserve cache locality. This is not a guaranteed behavior though!
Note that tasks which are spanned from the context of other tasks, such as the promise task SomeIOAsync
, always are queued up into the threads' local queue.
WorkItemGroup
scheduling itself to the ThreadPool
.In the above picture, focus on the 1 & 2 numbered, green circles, specifically the Schedule yellow block, and the Thread Pool section. After the WorkItemGroup
has scheduled itself to the thread pool, whenever there is a thread available, it will pick the WorkItemGroup
as a ThreadPool
work item, and will begin to execute it.
In the diagram I am depicted that the WorkItemGroup
is scheduled in the global queue, and threads #1 & #2 are busy. But thread #3 is free as it doesn't have any work items from its local queue, therefor it goes to the global queue, and "steals" the WorkItemGroup
for execution.
Anything that implements IThreadPoolWorkItem
must do so by providing a void
returning method called Execute
.
public interface IThreadPoolWorkItem
{
void Execute();
}
The ThreadPool
-thread will call that method, and since we enqueue via UnsafeQueueUserWorkItem
, the ExecutionContext
is not automatically propagated. Instead we set (and than reset at the end) the IGrainContext
, manually within Execute
. And it is here, where the single-threaded guarantees happen.
See, the way how WorkItemGroup
implements Execute
is that the method processes queued up work items sequentially, draining the queue until it is empty, or to a predefined quantum of time (which can be controled via SchedulingOptions.ActivationSchedulingQuantum
).
Once execution completes, the method updates its status state, and schedules further execution of work items, if some remain in the queue. It does so by re-scheduling itself, this time without the work items that it has exeucted (thereby also dequeued). And since all those items have been executed by the same thread (thread #3), all operations are thread-safe.
I refer to 'Task' and 'Work Item' interchangeably, but essentially a task is a work item, so is an
Action / Action<T>
. But anAction / Action<T>
gets scheduled for execution in theThreadPool
by being wrapped by aTask
, so it is essentially the same thing.
Task Execution
Every work item that is queued up in the WorkItemGroup
will be executed sequentially, in the order they have been queued up. The execution of the work items is thread-safe, as only one thread will execute those work items. This is what sometimes is refered to "activation turns", yes, the "turns" are the work items.
This is also where interleaving comes into play. See, when a grain is reentrant, or a method interleaves, work items are enqueued into the WorkItemGroup
' internal queue, concurrently!
When a thread executes the WorkItemGroup
those interleaved work items execute in the order they where enqueued. With the default message processing behavior, interleaving is not allowed, which just means that works items of two different grain messages, are not allowed to be enqueued concurrently.
A work item is "finished", whenever an execution yields back to the task scheduler, which as mentioned before, occurrs at await
points (or continuations, or custom awaitables). For interleaving to work, the "work" that happens at the await
point, needs to be asynchronously completing, otherwise no interleaving takes place.
The following code showcases that.
var grain = grainFactory.GetGrain<ITestGrain>(0);
Console.WriteLine("Interleaves:");
await Task.WhenAll([grain.Interleaves(), grain.Interleaves()]);
Console.WriteLine("\nDoesntInterleave:");
await Task.WhenAll([grain.DoesntInterleave(), grain.DoesntInterleave()]);
public interface ITestGrain : IGrainWithIntegerKey
{
[AlwaysInterleave] Task Interleaves();
[AlwaysInterleave] Task DoesntInterleave();
}
public class TestGrain : Grain, ITestGrain
{
public async Task Interleaves()
{
Console.WriteLine("1");
await Task.Delay(100); // Interleaving MAY happen
Console.WriteLine("2");
}
public async Task DoesntInterleave()
{
Console.WriteLine("1");
await SomethingAsync(); // Interleaving WONT happen
Console.WriteLine("2");
}
private Task SomethingAsync() => Task.CompletedTask;
}
The code above, ends up printing the output seen below. This tells that two concurrent requests, interleaved and invoked the work item Console.WriteLine("1")
in the case of callingInterleaves
method, while that is not the case for the DoesntInterleave
method, even though that method is marked with AlwaysInterleave
. This is because the first one calls a truly asynchronous method like Task.Delay
, while the latter calls SomethingAsync()
which completes synchronously.
Interleaves:
1
1
2
2
DoesntInterleave:
1
2
1
2
Each work item is executed by means of calling the ActivationTaskScheduler
by the WorkItemGroup.Execute
processing loop. Note that other work items can be part of this "batch". For example, one can directly enqueue a work item via the grain context (unique for all activations).
public async Task<int> Ping()
{
// Manually queuing the work item '_counter++'
GrainContext.Scheduler.QueueAction(() => _counter++);
await SomeIOAsync();
return _counter;
}
There is no guarantee that further work items, even those that were part of the de-queueing round, will be executed on that same thread, the next round.
Below you can see a full end-to-end diagram of an activation processing messages, and the interaction with the thread pool. Click here to enlarge.
Queue-bypass & Thread-safety
The following grain method code is unsafe.
public async Task<int> Ping()
{
_counter++;
Task.Run(() =>
{
// UNSAFE: runs on the ThreadPoolTaskScheduler
_counter++
});
await SomeIOAsync();
return _counter;
}
Calling upon grain code such as incrementing the counter from scheduling mechanisms like Task.Run
is thread-unsafe. This is because Task.Run
breaks out of the ActivationTaskScheduler
by scheduling the work item in the default task scheduler, which is the ThreadPoolTaskScheduler
.
The work item scheduled by Task.Run
does not end up in the WorkItemGroup
queue, but gets directly scheduled to the thread pool. It is likely possible that while thread #3 is executing the work items from the WorkItemGroup
, thread #1 or #2 gets free and it picks up the work item from Task.Run
, which means two threads could be modifying _counter
at the same time.
Any code that modifies the grains' (in-memory) state, outside of the grains' message queue is unsafe. Right?!
Every so often, we would want to bypass the grains' queue and directly modify its state. Reasons could vary, but lets say that we want to avoid the overhead of getting queued, if we have a special message/work to perform.
For example, take a singleton component called Publisher
which exposes an event
type property, and grain activations, upon activation (as in the lifecycle stage) subscribe by attaching an EventHandler
. Whenever the Publisher
invokes the event, all handlers subscribed to it get executed.
Imagine many activations having subscribed to it. We don't want to invoke a massively concurrent call to all activations via something like:
await Task.WhenAll([
grainRef1.NotifyPublisherTicked(),
grainRef2.NotifyPublisherTicked(),
...
grainRefN.NotifyPublisherTicked()
]);
Instead we let the .NET runtime invoke all registered event handlers, and bypass all of activation's queues'. Below is some code for a Publisher
that periodically does "something" and invokes all event handler registered to OnPublisherTick
.
public class Publisher : ILifecycleParticipant<ISiloLifecycle>
{
private Task? _tickTask;
private PeriodicTimer _timer = new(TimeSpan.FromSeconds(10));
public event Action? OnPublisherTick;
public void Participate(ISiloLifecycle observer) =>
observer.Subscribe<Publisher>(
ServiceLifecycleStage.ApplicationServices,
onStart: ct =>
{
_tickTask = TickPeriodically(ct);
return Task.CompletedTask;
},
onStop: async _ =>
{
if (_tickTask != null)
{
await _tickTask.ConfigureAwait(
ConfigureAwaitOptions.SuppressThrowing);
}
});
private async Task TickPeriodically(CancellationToken token)
{
try
{
while (await _timer.WaitForNextTickAsync(token))
{
await SomethingAsync(token);
// We CompareExchange to make sure we get the
// most up to date list of subscribers, as some
// may have subscribed while this cycle was running.
Interlocked
.CompareExchange(ref OnPublisherTick, null, null)?
.Invoke();
}
}
catch (OperationCanceledException)
{
// Ignore
}
}
}
Below is some code for a Subscriber
grain that injects the Publisher
as a dependency, and registers an event handler upon activation, and since it is a good citizen, it also deregisters it upon deactivation. Pay attention to each of the comments inside the OnPublisherTickHandler
method.
I am not necessarily suggesting you to do this, just showing you what can be achived.
public interface ISubscriberGrain : IGrainWithIntegerKey
{
ValueTask<int> GetTicks();
}
public class SubscriberGrain(Publisher publisher) :
Grain, ISubscriberGrain
{
private int _tickCount;
public override Task OnActivateAsync(
CancellationToken cancellationToken)
{
publisher.OnPublisherTick += OnPublisherTickHandler;
return Task.CompletedTask;
}
public override Task OnDeactivateAsync(
DeactivationReason reason, CancellationToken cancellationToken)
{
publisher.OnPublisherTick -= OnPublisherTickHandler;
return Task.CompletedTask;
}
private void OnPublisherTickHandler()
{
// UNSAFE: runs on the ThreadPoolTaskScheduler
_tickCount++;
// SAFE: runs on the ThreadPoolTaskScheduler
// but queues the message - No different from
// calling a grain method from outside the grain
GrainContext.Scheduler.QueueAction(() =>
{
// SAFE: runs on the ActivationTaskScheduler,
// but as mentioned above, it has to go
// through the message queue.
_tickCount++;
});
// SAFE: runs on the ThreadPoolTaskScheduler,
// but performs atomic read-modify-write,
// and skips the message queue.
Interlocked.Increment(ref _tickCount);
}
public ValueTask GetTicks() =>
ValueTask.FromResult(Volatile.Read(ref _tickCount));
}
If you found this article helpful please give it a share in your favorite forums π.