Introduction

This article is an advanced-level technical elaboration of how an Orleans grain activation process a message.

I will cover request/message queuing, processing, interleaving, the activation task scheduler, processing concurrency, single-threaded execution guarantees, also how a grain method executes and how it all ties back to the NET ThreadPool.

I will not cover how a method call is translated into a message, how it gets serialized, how directory lookup, placement and routing work, and finally how that message gets deserialized and re-transformed into a method call.

I refer to "request" and "message" interchangeably throughout the article. They are the same thing.

Primer Grain

Let's take this simple grain as an example. I invoke the Ping method multiple times, and upon each invocation it increments the counter, calls some asynchronous method, and returns the counter value.


var grain = _grainFactory.GetGrain<IPingGrain>(0);

Console.WriteLine(await grain.Ping()); // prints: 1
Console.WriteLine(await grain.Ping()); // prints: 2
Console.WriteLine(await grain.Ping()); // prints: 3

public interface IPingGrain : IGrainWithIntegerKey
{
Task<int> Ping();
}

public class PingGrain : Grain, IPingGrain
{
private int _counter;

public async Task<int> Ping()
{
_counter++;
await SomeIOAsync();
return _counter;
}
}

Grain Activation

Every message sent to a grain activation is actually a request to invoke a method, on that activation. But what really is an activation?

For the surface, a grain activation looks to be just an instance of the grain class, but that is only partially true. A grain activation is much more than an instance, it is an ensemble of components that work together to achive the traits that make up an actor. There are many components / processes that interplay, but these are the most important once.

  • Message Queuing & Processing
  • System Command Processing
  • Task Scheduling & Execution
  • Lifecycle Participation
  • State Management
  • Migration Management

Request Scheduling

Each grain activation in Orleans is associated with its own message queue. When an activation is created, it kickstarts a task in the background which waits for signals to begin processing one or more messages, and/or system commands. This is the message loop!

When a request is received, it is enqueued in the grain's message queue. Messages can come from different sources such as: direct grain calls, stream message delivery, timer tick delivery, reminder tick delivery, system target commands, etc. But a lot of them are delivered via the same networking components, and fall into one of these buckets:

  • Message Center - It is the entry point for external and inter-silo communications.
  • Hosted Client - Represents a 'client' which is part of the silo itself i.e. IGrainFactory or even IClusterClient (internal client only).
  • Grain Timer - Timers are special, as they do not require any intermediary component to deliver messages (callback executions) to a given activation.

Sources of message queuing.

When a message arrives, it signals that fact! The message loop begins by checking if there are any pending system commands that need to be executed, but only if the activation is not currently executing requests. If that is the case, it first addresses the commands, and than it proceeds to handle the pending messages. This is all assuming that the activation is in a valid state, and is not currently deactivating.

By default, Orleans executes all messages to completion, which means new messages must wait in the queue until the current one finishes, even if the current one is performing an asynchronous operation. This default behavior can be changed if the grain type is marked with the Reentrant attribute, or if the target method is marked with AlwaysInterleave, or the less-commonly used MayInterleave attribute. In those cases the next messages' tasks' will start interleaving with ongoing tasks in a turn-based fashion, which I will explain later on.

It is only now that the real work begins!!

Task Scheduling

See, I mentioned above that a grain activation is much more than an instance of a grain class. As a matter of fact, even saying that it an instance of a grain class, is not actually true!

Our primer PingGrain above, does not need to inherit from the base Grain type. So techically an activation is an instance of the IGrainContext, which represents a grain from the perspective of the runtime + the extra stuff mentioned above.

Two core components of an activation are the WorkItemGroup and the ActivationTaskScheduler. Every activation has its own WorkItemGroup instance. This is "just" a data structure that represents a different thing depending on which prespective it is looked at.

  • From the prespective of the Orleans runtime, it is a place to queue up activation tasks (work items).
  • From the prespective of the .NET runtime, it is a work item which can be scheduled for execution onto the ThreadPool.

In addition to the WorkItemGroup, every activation also has its own custom TaskScheduler (called the ActivationTaskScheduler). To clear out a common misconception: Orleans does not have its own thread pool (it used to though), instead it delegates task execution by enqueing them to the .NET thread pool, and it does so via the ActivationTaskScheduler.

But not directly!!

See, Orleans could enqueue all work items as they come, to the ThreadPool, but the activation must always execute in a single-threaded context. Just randomly enqueing work items, means they can be scheduled to random ThreadPool-threads by the scheduler, which in turn breaks the single-threaded guarantee of activations!

Let's refer back to our PingGrain, specifically the method implementation.

public async Task<int> Ping()
{
_counter++;
await SomeIOAsync();
return _counter;
}

At its core, a Task can represent in of these:

  • Some "work" that may execute on any thread, these are known as delegate tasks.
  • Some kind of "event/signal" to wait upon, these are known as promise tasks.

The execution of a method is broken down by the compiler into discrete steps. These steps or breakpoints usually occurr at await points, but are not limited to those. Task continuations performed via ContinueWith achive the same effect. As techically that is what happens under the hood with async machinery.

So the above code is split into the "counter incrementing" part of the code, which represents a synchronously completing task. The asynchronous IO operation of SomeIOAsync. And the continuation created after SomeIOAsync completes, which is the "method returning" part of the code.

We can write the above method via ContinueWith to have a better grasp, as this will be important moving forwards.

public Task<int> Ping()
{
_counter++; // Task 1: completes synchronously

return SomeIOAsync() // Task2: completes asynchronously
.ContinueWith(
task => // Task 3: The continuation is a
// synchronously completing task itself
{
// Some error-handling code would be here,
// because ContinueWith does not automatically
// propagate exceptions, as await'ing does.

return _counter;
});
}

  • _counter++ is a delegate task, because it represents some "work" that needs to be executed on the CPU (INC instruction)
  • SomeIOAsync() is a promise tasks, because it represents some "event" that needs to happen (no CPU instruction, other than SYSCALL).
  • return _counter is a delegate task, because it too represents some "work" that needs to be executed on the CPU (RET instruction).

The .NET runtime schedules the execution of all of them, but is instructed by the current task scheduler, which in the context of grain code, is the ActivationTaskScheduler.

Grain tasks being scheduled by the activation scheduler.

It is logical to think that the ActivationTaskScheduler enqueues the tasks in the ThreadPool, but that would break the single-threaded guarantees, as random ThreadPool-threads could end up executing those tasks, in any order. As a matter of fact, even the continuation of the promise task by default executes on a different thread than the one the promise task ran on.

The WorkItemGroup is the component that achives that. As mentioned before, every activation has its own, and it is the WorkItemGroup which creates and keeps a strong reference to the ActivationTaskScheduler. Interestingly, the scheduler also keeps a reference to the WorkItemGroup that created it.

When the .NET runtime calls the scheduler's QueueTask method, the scheduler doesn't enqueue that into the ThreadPool, instead it enqueues the task to the parent WorkItemGroup, by means of putting it inside of its internal System.Collections.Generic.Queue<T>.

Why a Queue<T> and not a ConcurrentQueue<T>? Shouldn't this be a thread-safe operation?

Well, it is thread-safe! Thread synchronization is achived via locking, but the reason for picking a regular queue is because WorkItemGroup has a status state associated with it. Namely: Waiting, Runnable, and Running. Which are important to the next step.

A ConcurrentQueue does not provide a way to atomically check the queue's state and update the status, as they are mutually exclusive properties, and locking would be required again.

When the WorkItemGroup enqueues the task, it has a choice to make. If its Runnable, or is currently Running previously enqueued tasks for the activation, it is done and the newly enqueued task has to wait for another round.

Otherwise, if its Waiting, than it queues itself for execution in the ThreadPool. Remember that WorkItemGroup implements the IThreadPoolWorkItem which represents any work item that can be scheduled to run on the ThreadPool.

The way it schedules itself is very interesting! It does so by calling:

ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true);

First of, unsafe because it is faster than the safe variant, as it does not propagate the calling stack by means of not capturing the ExecutionContext. But also note that it hints that it wants to be scheduled in the current threads' local queue. Because work items queued via UnsafeQueueUserWorkItem or QueueUserWorkItem are put in the ThreadPools global queue.

This is an optimization as its beneficial for a thread to continue processing related tasks, as to preserve cache locality. This is not a guaranteed behavior though!

Note that tasks which are spanned from the context of other tasks, such as the promise task SomeIOAsync, always are queued up into the threads' local queue.

WorkItemGroup scheduling itself to the ThreadPool.

In the above picture, focus on the 1 & 2 numbered, green circles, specifically the Schedule yellow block, and the Thread Pool section. After the WorkItemGroup has scheduled itself to the thread pool, whenever there is a thread available, it will pick the WorkItemGroup as a ThreadPool work item, and will begin to execute it.

In the diagram I am depicted that the WorkItemGroup is scheduled in the global queue, and threads #1 & #2 are busy. But thread #3 is free as it doesn't have any work items from its local queue, therefor it goes to the global queue, and "steals" the WorkItemGroup for execution.

Anything that implements IThreadPoolWorkItem must do so by providing a void returning method called Execute.

public interface IThreadPoolWorkItem
{
void Execute();
}

The ThreadPool-thread will call that method, and since we enqueue via UnsafeQueueUserWorkItem, the ExecutionContext is not automatically propagated. Instead we set (and than reset at the end) the IGrainContext, manually within Execute. And it is here, where the single-threaded guarantees happen.

See, the way how WorkItemGroup implements Execute is that the method processes queued up work items sequentially, draining the queue until it is empty, or to a predefined quantum of time (which can be controled via SchedulingOptions.ActivationSchedulingQuantum).

Once execution completes, the method updates its status state, and schedules further execution of work items, if some remain in the queue. It does so by re-scheduling itself, this time without the work items that it has exeucted (thereby also dequeued). And since all those items have been executed by the same thread (thread #3), all operations are thread-safe.

I refer to 'Task' and 'Work Item' interchangeably, but essentially a task is a work item, so is an Action / Action<T>. But an Action / Action<T> gets scheduled for execution in the ThreadPool by being wrapped by a Task, so it is essentially the same thing.

Task Execution

Every work item that is queued up in the WorkItemGroup will be executed sequentially, in the order they have been queued up. The execution of the work items is thread-safe, as only one thread will execute those work items. This is what sometimes is refered to "activation turns", yes, the "turns" are the work items.

This is also where interleaving comes into play. See, when a grain is reentrant, or a method interleaves, work items are enqueued into the WorkItemGroup' internal queue, concurrently!

When a thread executes the WorkItemGroup those interleaved work items execute in the order they where enqueued. With the default message processing behavior, interleaving is not allowed, which just means that works items of two different grain messages, are not allowed to be enqueued concurrently.

A work item is "finished", whenever an execution yields back to the task scheduler, which as mentioned before, occurrs at await points (or continuations, or custom awaitables). For interleaving to work, the "work" that happens at the await point, needs to be asynchronously completing, otherwise no interleaving takes place.

The following code showcases that.

var grain = grainFactory.GetGrain<ITestGrain>(0);

Console.WriteLine("Interleaves:");
await Task.WhenAll([grain.Interleaves(), grain.Interleaves()]);

Console.WriteLine("\nDoesntInterleave:");
await Task.WhenAll([grain.DoesntInterleave(), grain.DoesntInterleave()]);

public interface ITestGrain : IGrainWithIntegerKey
{
[AlwaysInterleave] Task Interleaves();
[AlwaysInterleave] Task DoesntInterleave();
}

public class TestGrain : Grain, ITestGrain
{
public async Task Interleaves()
{
Console.WriteLine("1");
await Task.Delay(100); // Interleaving MAY happen
Console.WriteLine("2");
}

public async Task DoesntInterleave()
{
Console.WriteLine("1");
await SomethingAsync(); // Interleaving WONT happen
Console.WriteLine("2");
}

private Task SomethingAsync() => Task.CompletedTask;
}

The code above, ends up printing the output seen below. This tells that two concurrent requests, interleaved and invoked the work item Console.WriteLine("1") in the case of calling
Interleaves method, while that is not the case for the DoesntInterleave method, even though that method is marked with AlwaysInterleave. This is because the first one calls a truly asynchronous method like Task.Delay, while the latter calls SomethingAsync() which completes synchronously.

Interleaves:
1
1
2
2

DoesntInterleave:
1
2
1
2

Each work item is executed by means of calling the ActivationTaskScheduler by the WorkItemGroup.Execute processing loop. Note that other work items can be part of this "batch". For example, one can directly enqueue a work item via the grain context (unique for all activations).

public async Task<int> Ping()
{
// Manually queuing the work item '_counter++'
GrainContext.Scheduler.QueueAction(() => _counter++);
await SomeIOAsync();
return _counter;
}

There is no guarantee that further work items, even those that were part of the de-queueing round, will be executed on that same thread, the next round.

Below you can see a full end-to-end diagram of an activation processing messages, and the interaction with the thread pool. Click here to enlarge.

End-to-end, activation message processing.

Queue-bypass & Thread-safety

The following grain method code is unsafe.

public async Task<int> Ping()
{
_counter++;

Task.Run(() =>
{
// UNSAFE: runs on the ThreadPoolTaskScheduler
_counter++
});

await SomeIOAsync();
return _counter;
}

Calling upon grain code such as incrementing the counter from scheduling mechanisms like Task.Run is thread-unsafe. This is because Task.Run breaks out of the ActivationTaskScheduler by scheduling the work item in the default task scheduler, which is the ThreadPoolTaskScheduler.

The work item scheduled by Task.Run does not end up in the WorkItemGroup queue, but gets directly scheduled to the thread pool. It is likely possible that while thread #3 is executing the work items from the WorkItemGroup, thread #1 or #2 gets free and it picks up the work item from Task.Run, which means two threads could be modifying _counter at the same time.

Any code that modifies the grains' (in-memory) state, outside of the grains' message queue is unsafe. Right?!

Every so often, we would want to bypass the grains' queue and directly modify its state. Reasons could vary, but lets say that we want to avoid the overhead of getting queued, if we have a special message/work to perform.

For example, take a singleton component called Publisher which exposes an event type property, and grain activations, upon activation (as in the lifecycle stage) subscribe by attaching an EventHandler. Whenever the Publisher invokes the event, all handlers subscribed to it get executed.

Imagine many activations having subscribed to it. We don't want to invoke a massively concurrent call to all activations via something like:

await Task.WhenAll([ 
grainRef1.NotifyPublisherTicked(),
grainRef2.NotifyPublisherTicked(),
...
grainRefN.NotifyPublisherTicked()
]);

Instead we let the .NET runtime invoke all registered event handlers, and bypass all of activation's queues'. Below is some code for a Publisher that periodically does "something" and invokes all event handler registered to OnPublisherTick.

public class Publisher : ILifecycleParticipant<ISiloLifecycle>
{
private Task? _tickTask;
private PeriodicTimer _timer = new(TimeSpan.FromSeconds(10));

public event Action? OnPublisherTick;

public void Participate(ISiloLifecycle observer) =>
observer.Subscribe<Publisher>(
ServiceLifecycleStage.ApplicationServices,
onStart: ct =>
{
_tickTask = TickPeriodically(ct);
return Task.CompletedTask;
},
onStop: async _ =>
{
if (_tickTask != null)
{
await _tickTask.ConfigureAwait(
ConfigureAwaitOptions.SuppressThrowing);
}
});

private async Task TickPeriodically(CancellationToken token)
{
try
{
while (await _timer.WaitForNextTickAsync(token))
{
await SomethingAsync(token);

// We CompareExchange to make sure we get the
// most up to date list of subscribers, as some
// may have subscribed while this cycle was running.

Interlocked
.CompareExchange(ref OnPublisherTick, null, null)?
.Invoke();
}
}
catch (OperationCanceledException)
{
// Ignore
}
}
}

Below is some code for a Subscriber grain that injects the Publisher as a dependency, and registers an event handler upon activation, and since it is a good citizen, it also deregisters it upon deactivation. Pay attention to each of the comments inside the OnPublisherTickHandler method.

I am not necessarily suggesting you to do this, just showing you what can be achived.

public interface ISubscriberGrain : IGrainWithIntegerKey
{
ValueTask<int> GetTicks();
}

public class SubscriberGrain(Publisher publisher) :
Grain, ISubscriberGrain
{
private int _tickCount;

public override Task OnActivateAsync(
CancellationToken cancellationToken)
{
publisher.OnPublisherTick += OnPublisherTickHandler;
return Task.CompletedTask;
}

public override Task OnDeactivateAsync(
DeactivationReason reason, CancellationToken cancellationToken)
{
publisher.OnPublisherTick -= OnPublisherTickHandler;
return Task.CompletedTask;
}

private void OnPublisherTickHandler()
{
// UNSAFE: runs on the ThreadPoolTaskScheduler
_tickCount++;

// SAFE: runs on the ThreadPoolTaskScheduler
// but queues the message - No different from
// calling a grain method from outside the grain
GrainContext.Scheduler.QueueAction(() =>
{
// SAFE: runs on the ActivationTaskScheduler,
// but as mentioned above, it has to go
// through the message queue.
_tickCount++;
});

// SAFE: runs on the ThreadPoolTaskScheduler,
// but performs atomic read-modify-write,
// and skips the message queue.
Interlocked.Increment(ref _tickCount);
}

public ValueTask GetTicks() =>
ValueTask.FromResult(Volatile.Read(ref _tickCount));
}

If you found this article helpful please give it a share in your favorite forums πŸ˜‰.