Introduction
The other day I asked a question on twitter. I was curious if there was a way (other than a wrapping strategy) to convert domain events to integration events.
@oskar_at_net @kamgrzybek
β Ledjon Behluli (@BehluliLedjon) July 24, 2021
Have you guys ever had to convert domain events to integrations event? If yes, is a wrapper class the only way to achieve that?
This was the first response...
I think those two things (Domain and Integration events) should be decoupled. You should not expose the domain event as a part of the integration one.
β π Cezary Piatek (@cezary_piatek) July 24, 2021
Cezary is right here! We all should strive to keep our domain events inside the boundary of the domain they reside in. Ideally, when you want to inform other domains that something significant has happend in your domain, than you should define & publish an integration event for that use case.
But, guess what? ... Software is not ideal, and implementation details matter, regardless how much we like to think they don't!
The twitter thread grew and this was Oskar's point of view.
Itβs hard to give precise advice without knowing the details, but letβs try π You donβt have to use interfaces; you can create custom βroutersβ to decide where to forward events. You can also have explicit Event Handlers. You can also have multiple markers, e.g INoification 1/
β Oskar Dudycz (@oskar_at_net) July 25, 2021
We can extrapolate two very important concepts here, domain event router and domain event mapper.
Two approaches, one outcome, different implications
So, what do we mean by domain event router and domain event mapper, and how do they fit into the picture of domain to integration event conversion.
Domain event router approach
A router is a networking device that forwards data packets between computer networks. A domain event router would be a piece of code that can forward domain events living in the originating domain, to the consuming domain.
This means that those domain events are the ones that actually get send over to other domains, wrapped within an integration event.
Here are some benefits:
- No need for code duplication β You don't have to create an equivalent integration event, because the domain event itself gets serialized and send over via an event bus.
- No need for a mapping logic β You don't have to create a mapping logic to an equivalent integration event, because the domain event is wrapped within a generic integration event and send over via an event bus.
- No need for reflecting field changes β Adding new fields to the domain event doesn't require adding those fields to a corresponding integration event. They will become part of the wrapped domain event, contained within the generic integration event.
Here are some drawbacks:
Internal workings of the domain are exposed β The domain event may very well carry information in its payload, that is needed to react upon by aggregates in the same domain. But the structure of the payload, is not relevant for an external domain. If your internal domain logic changes, you are forced to keep that payload, because someone outside the domain might be using it.
As with any public API, if you expose it, you can bet someone will be using it!!!
Poor data type choices for information carrying β The domain event should carry information via primitive data types only. If the event had value objects that reside in the originating domain as part of its payload, those objects may very well not exist in the consuming domain. This makes the process of deserializing the event problematic, and promotes domain logic leakage (Avoid this!!!)
Domain event evolution is constrained β Since this domain event is being consumed by an other domain, we need to make sure we don't introduce breaking changes by modifying it. Addition of new fields is OK, since deserialization can work if more data fields are present than expected, but modifying field names, or removing them will break the consumers code.
Domain event mapper approach
With this approach we accomplish complete domain event isolation from foreign domains. The idea is to map each domain event, to a corresponding integration event.
Here are some benefits:
Internal workings of the domain are concealed β Because you are keeping your domain events within your domain, and exposing only neccessary information via integration events, you are protecting your internal workings. You are free to change them as you wish, because you know there won't be any side effects.
Rich data type choices for information carrying β The domain event is free to carry information via value objects. This is OK, since this event will be consumed only by aggregates that reside in the same domain this event originates. This means the aggregates have access to those value objects.
Domain event evolution is unconstrained β Making modifications to this domain event, either adding, modifying or removing fields, can easily be fixed since the consumer is an aggregate that resides within the same domain.
If there is a need for such a change, that probably means the domain business logic has changed, and even those consumers would have to adapt to that change. There is a good reason why these aggregates live in the same domain π.
You can implement the domain to integration event mapping via two techniques:
- Distributed mapping.
- Centralized mapping.
Distributed mapping technique
With the distributed mapping technique you create many domain event handlers. These are used to map (regardless if done manually or via a mapping library) the domain event to a corresponding integration event, and publish it via an event bus. Be sure to publish these integration events after the running transaction has been committed!!!
This technique does promote loose coupling. But beware of it as your application may become a "spider web", due to all the places that mappings are happening.
Centralized mapping technique
With the centralized mapping technique you create one mapper (something like a static class, singleton, or class with singleton lifetime), inside which you perform all the event/s mapping logic.
You won't be publishing an integration event from within the mapper, only map it from a domain event and return it. Its the calling code which is responsible to publish it. This could be done somewhere in your code (after committing the transaction), like within a DbContext
, post-processing pipeline, or middleware.
This technique does promote code readability. But make sure to assign one responsibility to it, that is to solely map domain to integration events, because it may become a god object with time.
We have covered the concepts but lets see those in action. After all, I mentioned above that "implementation details matter", so I would be a hypocrite if I didn't come up with an example.
Example
You have been tasked to build the next Facebook (lucky you!). You have heard about Domain-driven design and Event-driven architecture, but you don't have experience with them. You also know right from the get-go that the project is doomed, so your mind switches to Resume-driven development.
The first set of requirements comes along:
- The app is a multi-user platform β β
- Users can create relationships via sending friend requests to other users β β
- Whenever a friend request has been sent, we need to send a real-time notification to the user whom the request is directed to β β
- We need to keep a history log of each sent friend request β β
- We also need the whole app done by Friday β π€
You create your usual base classes and interfaces like Aggregate
, ValueObject
, IDomainEvent
, ICommand
..., and identify based on the requirements that we have two aggregates in place.
One might be the User
aggregate and the other might be an FriendshipHistory
aggregate. It is said that users create relationships via sending friend requests to other users, so you create a method in the User
class called Befriend
. This method accepts a FriendId
object and creates a domain event called FriendRequestSentDomainEvent
.
public abstract class Aggregate
{
private readonly List<IDomainEvent> domainEvents;
public IReadOnlyCollection<IDomainEvent> DomainEvents => domainEvents.AsReadOnly();
protected void AddDomainEvent(IDomainEvent domainEvent)
{
domainEvents.Add(domainEvent);
}
}
public class User : Aggregate
{
public UserId Id { get; }
public string Name { get; }
private readonly List<FriendId> friendIds;
public IReadOnlyCollection<FriendId> FriendIds => friendIds.AsReadOnly();
public User(UserId id, string name)
{
Id = id;
Name = name;
}
public void Befriend(FriendId friendId)
{
friendIds.Add(friendId);
this.AddDomainEvent(new FriendRequestSentDomainEvent(
Id, Name, friendId));
}
}
public interface IDomainEvent
{
}
public class FriendRequestSentDomainEvent : IDomainEvent
{
public UserId From { get; }
public string Name { get; }
public FriendId To { get; }
public FriendRequestSentDomainEvent(
UserId @from, string name, FriendId to)
{
From = @from;
Name = name;
To = to;
}
}
You can see this being one of the use cases, so you create a command for that.
public class SendFriendRequestCommand : ICommand
{
public Guid SenderId { get; set; }
public Guid ReceiverId { get; set; }
public class SendFriendRequestCommandHandler :
ICommandHandler<SendFriendRequestCommand>
{
private readonly IUserRepo userRepo;
public SendFriendRequestCommandHandler(IUserRepo userRepo)
{
this.userRepo = userRepo;
}
public void Handle(SendFriendRequestCommand command)
{
User sender = userRepo.Get(command.SenderId);
FriendId receiverId = new FriendId(command.ReceiverId);
sender.Befriend(receiverId);
userRepo.Save(sender);
}
}
}
Inside IUserRepo.Save
, you check all available domain events and dispatch them (in-memory).
public class UserRepo : IUserRepo
{
private readonly IDbStore dbStore;
private readonly IDomainEventDispatcher dispatcher;
public UserRepo(IDbStore dbStore, IDomainEventDispatcher dispatcher)
{
this.dbStore = dbStore;
this.dispatcher = dispatcher;
}
public void Save(User user)
{
foreach (IDomainEvent domainEvent in user.DomainEvents)
{
dispatcher.Dispatch(domainEvent);
}
// You would do some updates to user here ...
dbStore.Commit();
}
}
Right before the transaction has been commited. The IDomainEventDispatcher
invokes the FriendRequestSentDomainEventHandler
which will create a new FriendshipHistoryRecord
.
If anything fails during this transaction, everything will be rolled back!!!
public interface IDomainEventHandler<T> where T : IDomainEvent
{
void Handle(T @event);
}
public class FriendRequestSentDomainEventHandler :
IDomainEventHandler<FriendRequestSentDomainEvent>
{
private readonly IFriendshipHistoryRepo friendshipHistoryRepo;
public FriendRequestSentDomainEventHandler(
IFriendshipHistoryRepo friendshipHistoryRepo)
{
this.friendshipHistoryRepo = friendshipHistoryRepo;
}
public void Handle(FriendRequestSentDomainEvent @event)
{
// From.Value is the Sender Id
// To.Value is the Receiver Id
FriendshipHistoryRecord record =
new FriendshipHistory(@event.From.Value, @event.To.Value);
friendshipHistoryRepo.Add(record);
}
}
Meantime in a parallel universe we need to notify the receiver of the friend request in real-time. We decide to sepparate responsibilities and create a dedicated "Notifications" service which deals with sending real-time notifications. This service lives in a different process, and we send over the neccessary information as an integration event, via an event bus.
Now the burning question is ... How?
In the current state of app, we can declare an integration event for that, and use the event bus to publish it from within the SendFriendRequestCommandHandler
.
In real-world applications, we would use the Outbox pattern to ensure At-least once delivery and the Inbox pattern to ensure Exactly-once processing. The Outbox & Inbox patterns are out of scope for this article, but I highly recommend Oskar's article on them.
Notice how FriendRequestSentIntegrationEvent
only contains primitive data types, as opposed to the FriendRequestSentDomainEvent
which contains rich value objects. This is to ensure easy deserialization and interoperability from the consumer/s. They might not have our custom UserId
and FriendId
classes defined in their codebase.
The "Notifications" service would pick up this event from the bus, and send out a real-time notification with websockets technology.
public interface IIntegrationEvent
{
Guid Id { get; }
DateTime OccurredOn { get; }
}
public class FriendRequestSentIntegrationEvent : IIntegrationEvent
{
public Guid Id { get; }
public DateTime OccurredOn { get; }
public Guid SenderId { get; }
public string SenderName { get; }
public Guid ReceiverId { get; }
public FriendRequestSentIntegrationEvent(
Guid senderId, string senderName, Guid receiverId)
{
Id = Guid.NewGuid();
OccurredOn = DateTime.UtcNow;
SenderId = senderId;
SenderName = senderName;
ReceiverId = receiverId;
}
}
public class SendFriendRequestCommandHandler :
ICommandHandler<SendFriendRequestCommand>
{
private readonly IEventBus eventBus;
private readonly IUserRepo userRepo;
public SendFriendRequestCommandHandler(
IEventBus eventBus,
IUserRepo userRepo)
{
this.userRepo = userRepo;
}
public void Handle(SendFriendRequestCommand command)
{
User sender = userRepo.Get(command.SenderId);
FriendId receiverId = new FriendId(command.ReceiverId);
sender.Befriend(receiverId);
userRepo.Save(sender);
// In real-world applications, use the Outbox pattern!!!
IIntegrationEvent @event = new FriendRequestSentIntegrationEvent(
senderId: command.SenderId,
senderName: sender.Name,
receiverId: command.ReceiverId
);
// In real-world applications, use an async way to publish the event.
eventBus.Publish(@event);
}
}
If you followed closely, by now you should have a question in mind.
Q: Where's the "conversion" of the domain event?
A: Well, nowhere... yet!
We are sending the FriendRequestSentIntegrationEvent
from within the SendFriendRequestCommandHandler
, because there are no domain invariants in place.
Let's be honest for a moment. If a domain has no invariants (so it is free to be modified as we please), than what is the point in using Domain-driven design?
Ohh, yeah I forgot... Resume-driven development π€¦ββοΈ
Let's make it a bit tougher!
So business comes around and complains that users are seeing the same person multiple times in their friend list. They ask to implement the following requirements:
- User A can not befriend user B, if user B is already in user A's friend list.
- Any user is limited to have up to 5000 friends.
We can do that easily, right? We just have to put some checks in the Befriend
method, inside the User
aggregate.
public class User : Aggregate
{
...
public void Befriend(FriendId friendId)
{
if (!friendIds.Contains(friendId))
{
if (friendIds.Count + 1 <= 5000)
{
friendIds.Add(friendId);
this.AddDomainEvent(new FriendRequestSentDomainEvent(
Id, Name, friendId));
}
}
}
}
The question is, what are the implications of this change in our program flow?
We can not just send out a FriendRequestSentIntegrationEvent
from within the command handler, because it is not guaranteed that a FriendRequestSentDomainEvent
is generated, every time we invoke the method.
The solution is easy in this case, we just change the method return type from void
to bool
. So if it succeeds we sent the integration event, otherwise we don't.
But what if we need to log why we could not send the friend request, was it because the user is already in the list, or because the friendship limit has been exceeded?
You can do many "workarounds" but you will notice that more and more, your domain logic is creeping up into the upper layers, and your making use of if...else
, all over the place (Avoid this!!!).
Domain event conversion
Enough with the build-up. How can domain event conversion help us keep the domain logic in the domain layer, and how to convert these events into integration counterparts?
The answer to the first question is; we don't fire integration events from within command handlers.
This ensures our handlers are "dumb" and don't contain domain logic. Their role is to kick-off domain processes. This is done by collecting the neccessary data, passing it down to the domain objects, and invoking their methods.
And the answer as to "how?"; is via the techniques we mentioned in the introduction part of the article.
- Event Router
- Event Mapper
- Distributed
- Centralized
Event router
Routing can be accomplished by wrapping an IDomainEvent
inside of an IIntegrationEvent
. This can be done per use case, or in a generic fashion. Let's see how a generic approach would look like.
DomainEventWrapper<T>
is an integration event, but it contains a domain event inside.
public class DomainEventWrapper<T> : IIntegrationEvent where T : IDomainEvent
{
public T DomainEvent { get; }
public DomainEventWrapper(T domainEvent)
{
DomainEvent = domainEvent;
}
}
We can do the routing within the UserRepo
.
public class UserRepo : IUserRepo
{
public void Save(User user)
{
...
dbStore.Commit();
foreach (IDomainEvent domainEvent in user.DomainEvents)
{
Type genericType = typeof(DomainEventWrapper<>)
.MakeGenericType(domainEvent.GetType());
IIntegrationEvent integrationEvent = (IIntegrationEvent)Activator
.CreateInstance(genericType, domainEvent);
eventBus.Publish(integrationEvent);
}
}
}
Notice β οΈ
In real-world applications, I suggest you to do the conversion within a DbContext/Unit of Work type of classes, or in a pipeline/middleware.
In real-world applications, I urge you to store these integration events with the transaction into an Outbox table. Afterwards a processor could pick them up and publish them via the event bus.
The processor could be implemented via:
One important point that we should cover with this approach, is the fact that all domain events get converted to integration counterparts. This probably won't be a behaviour that you will want.
To stop this from happening, you can define some marker interface or attribute. This will enable differentiation between domain events that are supposed to be mapped to integration events, versus others that are supposed to live only inside the domain of residence.
public interface IExternalEvent : IDomainEvent
{
}
public class FriendRequestSentDomainEvent : IExternalEvent
{
...
}
public class UserRepo : IUserRepo
{
public void Save(User user)
{
...
dbStore.Commit();
foreach (IDomainEvent domainEvent in
user.DomainEvents.Where(e => e is IExternalEvent))
{
Type genericType = typeof(DomainEventWrapper<>)
.MakeGenericType(domainEvent.GetType());
IIntegrationEvent integrationEvent = (IIntegrationEvent)Activator
.CreateInstance(genericType, domainEvent);
eventBus.Publish(integrationEvent);
}
}
}
Below you can see a JSON representation of the integration event. As we can see the properties From
and To
, which are UserId
and FriendId
respectively, are serialized too! The consumers of the event would have to create classes with this structure, in order for them to consume this event.
{
"Id": "34878764-bb23-4494-b4eb-eee0f21706fd",
"OccurredOn": "7/31/2021 8:55:45 PM"
"DomainEvent": {
"From":{
"Value":"68ff1adb-1148-457d-a05d-651570255d1c"
},
"Name":"Ledjon Behluli",
"To":{
"Value":"7b42fb27-f3ac-40d7-b621-ec2bf72e9ff1"
}
}
}
Event mapper (Distributed)
Distributed mapping can be accomplished by creating domain event handlers, inside of which we can map a domain event to an integration counterpart.
We already saw the FriendRequestSentDomainEventHandler
which gets invoked before the transaction has been commited. Inside of it, we create a new FriendshipHistoryRecord
to log the fact that a new friend request has been sent. We can create a new handler specifically to map the event.
It is perfectly valid to define multiple event handlers when any kind of event gets raised. An event is a fact that has already happend. There can be zero, or many subscribers who are interested on this fact. Just make sure, either the class names or the namespaces of the handlers are unique.
namespace A
{
public class FriendRequestSentDomainEventHandler :
IDomainEventHandler<FriendRequestSentDomainEvent>
{
...
public void Handle(FriendRequestSentDomainEvent @event)
{
// Store friend request history record...
}
}
}
namespace B
{
public class FriendRequestSentDomainEventHandler :
IDomainEventHandler<FriendRequestSentDomainEvent>
{
...
public void Handle(FriendRequestSentDomainEvent domainEvent)
{
// Map the domain event to an integration event.
// Mapping can be done manually, or via a mapping library.
var integrationEvent = new FriendRequestSentIntegrationEvent(
senderId: domainEvent.From.Value,
senderName: domainEvent.Name,
receiverId: domainEvent.To.Value
);
// In real-world applications, store this event in Outbox.
// The Outbox processor will publish it via the event bus.
eventBus.Publish(integrationEvent);
}
}
}
Event mapper (Centralized)
Centralized mapping can be accomplished by creating one mapper (something like a static class, singleton, or class with singleton lifetime), inside which you perform all domain to integration event mapping logic.
In this example we are going to do this via an interface-class combo, with a registered singleton lifetime.
public interface IEventMapper
{
IIntegrationEvent Map(IDomainEvent @event);
IEnumerable<IIntegrationEvent> MapAll(IEnumerable<IDomainEvent> events);
}
public class EventMapper : IEventMapper
{
public IEnumerable<IIntegrationEvent> MapAll(IEnumerable<IDomainEvent> events)
=> events.Select(Map);
public IIntegrationEvent Map(IDomainEvent @event) =>
@event switch
{
FriendRequestSentDomainEvent e =>
new FriendRequestSentIntegrationEvent(
senderId: domainEvent.From.Value,
senderName: domainEvent.Name,
receiverId: domainEvent.To.Value),
// If we had more events to map, we would extend this switch
// statement to include those too. This is why it is called
// "Centralized" mapping ...
_ => null
};
}
Similar to the router approach, we can call the mapper inside the UserRepo
. Although as mentioned above, I suggest you to do this within a DbContext/Unit of Work type of classes, or in a pipeline/middleware.
public class UserRepo : IUserRepo
{
public void Save(User user)
{
...
dbStore.Commit();
foreach(IIntegrationEvent @event in eventMapper.MapAll(user.DomainEvents))
{
eventBus.Publish(@event);
}
}
}
Summary
In this article we have addressed different approaches to convert domain events to integration events.
- We've listed the benefits and drawbacks of each approach, and showcased examples for each one of them.
- We have also gone through a complete use case. From the initial requirements, to the implementation via the above mentioned approaches, and the neccessary adjustments after the requirements have changed.
- We have also seen the impact that domain invariants can have, on the processes of propagating domain events.
I saw the need to write this article because conversion of domain to integration events, is very often a necessity in distributed systems, and deserves proper addressing in my opinion. I also want to give out a thanks to Cezary and Oskar, for always being awesome members of the programming society.
If you found this article helpful please give it a share in your favorite forums π.