This article is meant towards individuals who already have some good understanding of concepts like Event-Driven Architecture, Event Processing and Outbox Pattern.
Introduction
In the world of Distributed Systems, specifically the once adhering to Event-Driven Architecture, one of the key patterns is the Outbox Pattern. It ensures that a message is sent successfully to a message broker, at least once. Instead of directly publishing a message, we store it in a temporary storage, like a database table.
Eventually this message needs to be retreived from this temporary storage and published to the message broker. To accomplish that we use two techniques:
- Polling
- Change Data Capture
Polling
"Polling is the process where the computer or controlling device, waits for an external device to check for its readiness or state." — Wikipedia.
In the context of the outbox pattern. Polling, is refered to the process of continuously checking if the Outbox
table has entries which have not been processed yet. If there are, pick them up, publish them to the message broker and either mark those entries as processed or deleted them.
It is a straight-forward process, guarantees at-least-once delivery, and is applicable to any underlying database provider.
The problem is getting the predetermined polling interval right. To frequent, and you put stress on the database. To rare, and you miss time-critical business requirements.
Use cases like placing an order and than shipping it, is not so time-critical. Users already know that the product probably will not arrive in the next 2 weeks or so. If it takes 2 weeks and 60 seconds ... ain't no big deal. In this case a potential 60 seconds polling interval is acceptable.
In the other hand, when your building a scalping system to perform micro-trading, and it requires you to notify the user on real-time for trend changes. It could turn out that this 60 seconds interval, is simply not acceptable and you have to poll every second.
Even within the same domain, there are different types of messages which intrinsically have a wider/shorter acceptable processing time-window. With the polling technique, you have no choice but to adjust the polling interval, to accomodate the shortest acceptable window.
Change Data Capture
"In databases, Change Data Capture (CDC) is a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data." — Wikipedia.
In the context of the outbox pattern. CDC, is refered to the process of an external tool notifying the application that the Outbox
table has new entries which need to be processed. It does so by monitoring the transaction log of the underlying database engine. It supports everything that Polling can do, but just way more efficient.
The most popular tool for relational table CDC processing is Kafka Connect with Debezium.
Debezium is an open source distributed platform for CDC. At the time of this article, it supports the following database providers (via their connectors):
- MongoDB
- MySQL
- PostgreSQL
- SQL Server
- Oracle
- Db2
- Cassandra (Incubating)
- Vitess (Incubating)
If you use one of the database providers mentioned above, and you also use Kafka as you message bus. Than by all means, go with it!
What if I use Azure SQL Database, and my message bus is not Kafka but maybe RabbitMQ or Azure Service Bus?
You are out of luck with Debezium at the moment!
JIT Outbox Polling
My guess is that people implement this approach, but I have not seen it being elaborated around the internet. If it does not have a name attached to it, than I will coined it with one ... Just-in-time Polling.
The idea behind it, is that you still have your OutboxProcessor
(something like a singleton registered background worker) monitoring unprocessed entries in the Outbox
table. But on a much bigger polling interval. Maybe once every 1 or 5 minutes , as opposed to every second.
The key difference is that you extract an interface for the OutboxProcessor
and make it available to the rest of the codebase, which in turn can invoke its processing functionality on-demand.
Why is it called Just-in-time?
Because it is you, who tells the processor at the last second.
"Hey, a request has generated some messages. I am just letting you know that the transaction has been commtted, and those messages have been stored in the Outbox table. Please take care of them for me."
It is the application itself who is simulating Change Data Capture.
Why not drop polling altogether?
Because bad stuff happens! What if the broker is unreachable when the processor tries to send out the message. You need a mechanism in-place that will guarantee, at-least-once delivery (even if delayed).
What about retries?
Sure, but retries get exhausted.
I could keep trying!
That might work if your issue is transient in nature. But what if its not? You might need to do a patch on your application and restart it. The message still hasn't reach the broker, so what do you do? Wait for an other request to trigger message processing for a previous request? No, you still need a time-based mechanisim to handle those edge cases.
If the above happens, then how does this solve my issue differently from regular polling?
It doesn't! But neither does polling the database every second. Remember, it doesn't matter if the processor picks up the mesage from Outbox
. If the broker is unreachable, than the broker is unreachable!
How would that look like?
Implementation
Let's start off with the OutboxProcessor
. There are various ways you could implement it. I have chosen to use a hosted service.
internal sealed class OutboxProcessor : IHostedService, IDisposable
{
private Timer timer;
private readonly IBusPublisher publisher;
private readonly IOutboxAccessor outboxAccessor;
public TimedHostedService(
IBusPublisher publisher,
IOutboxAccessor outboxAccessor)
{
this.publisher = publisher;
this.outboxAccessor = outboxAccessor;
}
public Task StartAsync(CancellationToken stoppingToken)
{
timer = new Timer(
callback: PublishMessages,
state: null,
dueTime: TimeSpan.Zero,
period: TimeSpan.FromMinutes(5));
return Task.CompletedTask;
}
private void PublishMessages(object state)
{
_ = PublishMessagesAsync();
}
private async Task PublishMessagesAsync()
{
var messages = await outboxAccessor.GetUnprocessedAsync();
foreach (var message in messages)
{
await publisher.PublishAsync(message));
}
}
public Task StopAsync(CancellationToken stoppingToken)
{
timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
}
public void Dispose()
{
timer?.Dispose();
}
}
Notice ⚠️
There are some points to note with the following approach:
- Since the processor is registered as a singleton, we have to register
IOutboxAccessor
as a singleton too (see: Service lifetimes). - The
Timer
doesn't wait for previous executions ofPublishMessages
to finish, so the approach shown is not really suited for real-world applications. - The
TimerCallback
delegate has a return typevoid
, which can not accept an asynchronous method. This is why we createdPublishMessages
and discard the result ofPublishMessagesAsync
. - We are awaiting publishing of a messages one-by-one. In case of an error, you will lose that information if one of the earlier awaits throws.
For this article, I will stick with this implementation, as its not the point of focus. I have provided a fully asynchronous implementation with error recovery included. You can grab it from here.
Registration
The extension method AddHostedService
, automatically registers the service as a Singleton
(At least with .NET Core >= 3.1).
services.AddHostedService(OutboxProcessor);
In order to invoke the OutboxProcessor
when needed, we have to extract an interface for it. The important point is that the interface needs to be public
, as opposed to the processor itself, which can be internal
and sealed
.
The interface has just one method PublishMessagesAsync
because that is the only thing needed from caller/s.
public interface IOutboxProcessor
{
Task PublishMessagesAsync();
}
The processor needs to implement this interface now, and PublishMessagesAsync
has to be converted from private
to public
.
internal sealed class OutboxProcessor :
IOutboxProcessor, IHostedService, IDisposable
{
...
public async Task PublishMessagesAsync()
{
// Same as above.
}
...
}
The "tricky" part to make this work is the service registration flow. We need to register IOutboxProcessor
as the the service type, and OutboxProcessor
as the implementation type. Afterwards we need to register OutboxProcessor
as a hosted service too.
We need to be careful to not register OutboxProcessor
with the default overload of AddHostedService
. We have to make use of the implementation factory overload in conjunction with the service provider. We use the service provider to retrieve our implementation we just registered on the first line.
services.AddSingleton<IOutboxProcessor, OutboxProcessor>();
services.AddHostedService(OutboxProcessor); // Wrong
services.AddHostedService(provider => // Right
(OutboxProcessor)provider.GetRequiredService<IOutboxProcessor>());
The above registration flow allows us to run our OutboxProcessor
as a singleton, but also access it on-demand, through the interface.
Invocation
There are two ways how the processor could be invoked on-demand. In both cases, it has to be done after all changes have been commited to the database.
- Within
DbContext
. - Through a pipeline (Recommended).
Within DbContext
If you are using EF Core as your ORM, you have access to IDbContextTransaction
. This interface represents the current on-going transaction, and exposes varies methods to work with it. The important ones in our case are, BeginAsync
and CommitAsync
. You can invoke the processor right after CommitAsync
has successfully finished. The implementation varies of course, but it could look like this.
public class MyDbContext : DbContext
{
private IDbContextTransaction currentTransaction;
private readonly IOutboxProcessor outboxProcessor;
private readonly List<IMessage> messages;
public MyDbContext(
DbContextOptions options,
IOutboxProcessor outboxProcessor)
: base(options)
{
this.outboxProcessor = outboxProcessor;
}
public async Task BeginTransactionAsync()
{
if (currentTransaction != null)
{
currentTransaction = await Database
.BeginTransactionAsync(IsolationLevel.ReadCommitted);
}
}
public async Task CommitTransactionAsync()
{
try
{
// Save your changes & messages altogether ...
await currentTransaction.CommitAsync();
if (messages.Count > 0)
{
await outboxProcessor.PublishMessagesAsync();
}
}
catch
{
await currentTransaction.RollbackAsync();
}
finally
{
messages.Clear();
if (currentTransaction != null)
{
currentTransaction.Dispose();
currentTransaction = null;
}
}
}
}
Invoking the processor after each database commit (that is messages have been generated) works, but as we have elaborated above not every message has demands to be processed immediately. From my experience, only a few of them have!
So you would invoke the processor for messages which are not time-critical, which in turn means, making an other database call after the transaction has been commited, by means of invoking the processor.
If your application (or suite of applications) don't have many requests, then this might be for you. Otherwise the next approach solves this issue.
Through a pipeline
If you architect your solution via commands. Which means the client interacts with your software via narrow-scoped, specific, task-based operations (as opposed to wide-scoped, generic, CRUD-based operations). You can create pipelines which may (or not), run before/after your commands.
This is very powerful in the context of JIT Outbox Polling because it enables one to invoke the processor based on a per-command basis. Commands that generate messages which have time-critical processing demands, can immediately invoke the processor. Whereas commands that generate messages which don't have time-critical processing demands, don't invoke the processor. These messages will eventually be processed on the next polling cycle. May that be after 60, 120, 240 ... seconds.
We will start off by creating a marker interface.
public interface IGenerateTimeCriticalMessages {}
This is used to distinguish between commands that generate time-critical messages vs others that don't. The pipeline will run only for commands that implement this interface.
Lets create a pipeline called OutboxProcessingBehavior
. You can roll out your own logic for pipelines, or use a library such as MediatR to handle that part for you. The syntax bellow is based on MediatR.
public class OutboxProcessingBehavior<TRequest, TResponse> :
IPipelineBehavior<TRequest, TResponse>
where TRequest : IGenerateTimeCriticalMessages
{
private readonly IOutboxProcessor outboxProcessor;
public OutboxProcessingBehavior(IOutboxProcessor outboxProcessor)
{
this.outboxProcessor = outboxProcessor;
}
public async Task<TResponse> Handle(
TRequest request,
CancellationToken cancellationToken,
RequestHandlerDelegate<TResponse> next)
{
TResponse response = await next();
await outboxProcessor.PublishMessagesAsync();
return response;
}
}
The pipeline has a generic type constraint on TRequest
, which runs only if the request implements our marker interface IGenerateTimeCriticalMessages
. We also inject IOutboxProcessor
and invoke its PublishMessagesAsync
method, just-in-time 😉.
Lets define two commands, one generates time-critical messages, and the other does not.
public class MyCriticalCommad : IRequest<Unit>,
IGenerateTimeCriticalMessages
{
// Some handling logic ...
}
public class MyNormalCommad : IRequest<Unit>
{
// Some handling logic ...
}
After MyCriticalCommad
runs successfully, and the transaction has been commited, then OutboxProcessingBehavior
runs and tells the processor to pick up messages and dispatch them to the message broker.
In the order hand, when MyNormalCommand
runs, the publishing of the messages will wait until the next polling cycle of the processor arrives.
Summary
In this article, we have gone through the process of at-least-once delivery via the outbox pattern, and come to the following conclusions.
- Purley time-based polling, is not an efficient way to handle message processing.
- Change Data Capture is the way to go, but unfortunately the tools out there, don't support every database engine and message broker.
- We have elaborated the idea of the JIT Outbox Polling technique, and its effectiveness.
- Showcased two approaches to implement this technique.
- Via
DbContext
- Which came with the drawback of invoking the processor on every request that generates messages. - Via a pipeline - Which was clearly superior, as it allows independent invocation cycles, based on the business requirements for message processing time-windows.
- Via
- And last but not least, we may have coined a new name for this technique 😀.
If you found this article helpful please give it a share in your favorite forums 😉.