Outbox Pattern Implementation
Overview
The Outbox pattern ensures reliable event processing:
- Atomic persistence - Events saved in same transaction as aggregate
- Guaranteed delivery - Events processed even if app crashes
- Eventual consistency - Async processing with retry
- Idempotency - Handle duplicate processing gracefully
Quick Reference
| Component | Purpose |
|---|---|
OutboxMessage | Persisted event entity |
OutboxMessageConfiguration | EF Core mapping |
ProcessOutboxMessagesJob | Background processor (Quartz) |
IdempotentDomainEventHandler | Deduplicated handler wrapper |
OutboxConsumer | Alternative direct DB poller |
Outbox Structure
/Infrastructure/
├── Outbox/
│ ├── OutboxMessage.cs
│ ├── OutboxMessageConfiguration.cs
│ ├── ProcessOutboxMessagesJob.cs
│ ├── ProcessOutboxMessagesJobSetup.cs
│ └── IdempotentDomainEventHandler.cs
└── ApplicationDbContext.cs
Template: Outbox Message Entity
// src/{name}.infrastructure/Outbox/OutboxMessage.cs
namespace {name}.infrastructure.outbox;
/// <summary>
/// Represents a domain event stored for reliable delivery
/// </summary>
public sealed class OutboxMessage
{
public OutboxMessage()
{
}
public OutboxMessage(Guid id, string type, string content, DateTime occurredOnUtc)
{
Id = id;
Type = type;
Content = content;
OccurredOnUtc = occurredOnUtc;
}
/// <summary>
/// Unique identifier for this message
/// </summary>
public Guid Id { get; set; }
/// <summary>
/// Assembly-qualified type name of the domain event
/// </summary>
public string Type { get; set; } = string.Empty;
/// <summary>
/// JSON-serialized event content
/// </summary>
public string Content { get; set; } = string.Empty;
/// <summary>
/// When the event originally occurred
/// </summary>
public DateTime OccurredOnUtc { get; set; }
/// <summary>
/// When the message was successfully processed (null if not yet processed)
/// </summary>
public DateTime? ProcessedOnUtc { get; set; }
/// <summary>
/// Error message if processing failed
/// </summary>
public string? Error { get; set; }
/// <summary>
/// Number of processing attempts
/// </summary>
public int RetryCount { get; set; }
}
Template: EF Core Configuration
// src/{name}.infrastructure/Outbox/OutboxMessageConfiguration.cs
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
namespace {name}.infrastructure.outbox;
internal sealed class OutboxMessageConfiguration
: IEntityTypeConfiguration<OutboxMessage>
{
public void Configure(EntityTypeBuilder<OutboxMessage> builder)
{
builder.ToTable("outbox_message");
builder.HasKey(o => o.Id);
builder.Property(o => o.Id)
.ValueGeneratedNever();
builder.Property(o => o.Type)
.HasMaxLength(500)
.IsRequired();
builder.Property(o => o.Content)
.HasColumnType("jsonb") // PostgreSQL JSONB
.IsRequired();
builder.Property(o => o.OccurredOnUtc)
.IsRequired();
builder.Property(o => o.ProcessedOnUtc);
builder.Property(o => o.Error)
.HasColumnType("text");
builder.Property(o => o.RetryCount)
.HasDefaultValue(0);
// Index for efficient polling of unprocessed messages
builder.HasIndex(o => o.ProcessedOnUtc)
.HasFilter("processed_on_utc IS NULL")
.HasDatabaseName("ix_outbox_message_unprocessed");
// Index for cleanup of old processed messages
builder.HasIndex(o => o.ProcessedOnUtc)
.HasFilter("processed_on_utc IS NOT NULL")
.HasDatabaseName("ix_outbox_message_processed");
}
}
Template: DbContext Integration
// src/{name}.infrastructure/ApplicationDbContext.cs
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using {name}.domain.abstractions;
using {name}.infrastructure.outbox;
namespace {name}.infrastructure;
public sealed class ApplicationDbContext : DbContext, IUnitOfWork
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false
};
public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options)
: base(options)
{
}
public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.ApplyConfigurationsFromAssembly(typeof(ApplicationDbContext).Assembly);
base.OnModelCreating(modelBuilder);
}
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
// ═══════════════════════════════════════════════════════════════
// CRITICAL: Add domain events to outbox BEFORE SaveChanges
// This ensures atomic persistence - events saved in same transaction
// ═══════════════════════════════════════════════════════════════
ConvertDomainEventsToOutboxMessages();
return await base.SaveChangesAsync(cancellationToken);
}
private void ConvertDomainEventsToOutboxMessages()
{
// Get all entities with domain events
var entitiesWithEvents = ChangeTracker
.Entries<Entity>()
.Where(e => e.Entity.GetDomainEvents().Any())
.Select(e => e.Entity)
.ToList();
// Extract all domain events
var domainEvents = entitiesWithEvents
.SelectMany(e => e.GetDomainEvents())
.ToList();
// Clear events from entities (they're now in outbox)
foreach (var entity in entitiesWithEvents)
{
entity.ClearDomainEvents();
}
// Convert to outbox messages
foreach (var domainEvent in domainEvents)
{
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
Type = domainEvent.GetType().AssemblyQualifiedName!,
Content = JsonSerializer.Serialize(
domainEvent,
domainEvent.GetType(),
JsonOptions),
OccurredOnUtc = DateTime.UtcNow
};
OutboxMessages.Add(outboxMessage);
}
}
}
Template: Outbox Processor Job (Quartz)
// src/{name}.infrastructure/Outbox/ProcessOutboxMessagesJob.cs
using System.Text.Json;
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Quartz;
using {name}.domain.abstractions;
namespace {name}.infrastructure.outbox;
/// <summary>
/// Background job that processes outbox messages
/// Uses Quartz for scheduling with configurable interval
/// </summary>
[DisallowConcurrentExecution] // Prevent parallel execution
public sealed class ProcessOutboxMessagesJob : IJob
{
private const int BatchSize = 20;
private const int MaxRetries = 3;
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
private readonly ApplicationDbContext _dbContext;
private readonly IPublisher _publisher;
private readonly ILogger<ProcessOutboxMessagesJob> _logger;
public ProcessOutboxMessagesJob(
ApplicationDbContext dbContext,
IPublisher publisher,
ILogger<ProcessOutboxMessagesJob> logger)
{
_dbContext = dbContext;
_publisher = publisher;
_logger = logger;
}
public async Task Execute(IJobExecutionContext context)
{
_logger.LogDebug("Starting outbox message processing...");
var messages = await GetUnprocessedMessages