Pattern 27: Event Sourcing
Intent
Store all state changes as immutable sequence of events rather than updating current state, enabling complete audit trails, point-in-time reconstruction, event replay for debugging, and foundation for event-driven architecture while preventing data loss and supporting regulatory compliance.
Also Known As
- Event Log Architecture
- Append-Only Storage
- Immutable Event Store
- Event Stream Processing
- Audit Log Pattern
Problem
Traditional state-based storage loses history, makes debugging impossible, and creates data loss risks.
The "update in place" disaster:
Traditional database approach:
-- Martinez family payment status
UPDATE families
SET payment_status = 'overdue',
last_payment_date = '2024-10-15'
WHERE family_id = 123;
-- Later that day...
UPDATE families
SET payment_status = 'paid',
last_payment_date = '2024-10-20'
WHERE family_id = 123;
Six months later, Sarah gets audited:
Auditor: "Martinez family shows 'paid' status. When did they pay? Were they ever late?"
Sarah: "The database shows they're paid now... but I can't tell you the history. The old data was overwritten." 😱
Auditor: "You can't prove compliance with payment policies. That's a serious issue."
The problem: History was DESTROYED by updates! No audit trail, no proof, no way to reconstruct what happened!
The "what happened?" debugging nightmare:
Production incident: Chen family complains: "We got locked out of portal but we paid on time!"
Developer: "Let me check the database..."
SELECT * FROM families WHERE family_id = 456;
-- Result: payment_status = 'paid', portal_access = 'locked'
Developer: "Uh... you show as paid but locked. I don't know WHY you're locked. The data doesn't tell me WHAT HAPPENED, only CURRENT STATE."
Questions we CAN'T answer: - When did they get locked? - What triggered the lock? - Was it automated or manual? - Did payment clear AFTER lock? - What's the sequence of events?
The database only shows NOW, not HOW WE GOT HERE! 😱
The "can't rollback" problem:
Coordinator error: Sarah accidentally marks 50 families as "withdrawn" instead of "on vacation"
UPDATE families SET status = 'withdrawn' WHERE status = 'on_vacation';
-- OH NO! Wrong update!
Sarah: "Can we undo this?"
Developer: "No. The old data is gone. We overwrote it. I can change them back to 'on_vacation' but I can't prove that's what they were before. Maybe some were actually withdrawn?"
Can't rollback because history was destroyed! 😱
The "concurrent update" race condition:
Two processes updating same family simultaneously:
Process A: Read payment_status = 'pending'
Process B: Read payment_status = 'pending'
Process A: Update payment_status = 'paid'
Process B: Update payment_status = 'overdue'
Final state: 'overdue' (Process B overwrote Process A!)
Process A's update was LOST! Race condition destroyed data! 😱
The "data loss" horror:
Hardware failure: Database crashes during UPDATE operation
UPDATE families SET engagement_score = 85 WHERE family_id = 789;
-- CRASH! Power failure!
After recovery: Family 789 has corrupt engagement_score or old value
Was the update applied? Partially applied? Lost? Nobody knows! Data integrity destroyed! 😱
What we need: Event Sourcing
Instead of updating state, APPEND events:
// Don't update state
❌ UPDATE families SET payment_status = 'paid'
// Instead, append event
✅ INSERT INTO events (event_type, data)
VALUES ('PaymentReceived', {family_id: 123, amount: 450, date: '2024-10-20'})
Current state = replay all events:
events = [
{type: 'FamilyEnrolled', date: '2024-09-01', ...},
{type: 'PaymentDue', date: '2024-10-01', amount: 450, ...},
{type: 'PaymentOverdue', date: '2024-10-16', ...},
{type: 'PaymentReceived', date: '2024-10-20', amount: 450, ...},
{type: 'PortalAccessRestored', date: '2024-10-20', ...}
]
current_state = replayEvents(events)
// → payment_status: 'paid', portal_access: 'active'
Benefits:
1. Complete Audit Trail: - Every change recorded forever - Can prove compliance - Can reconstruct any moment in time
2. Debuggable: - "What happened?" → Replay events, see exact sequence - "When did this start?" → Find first relevant event - "Who did this?" → Events include actor
3. No Data Loss: - Events immutable (never deleted) - Append-only (no overwrites) - Crash-safe (either event persisted or not, no partial state)
4. Rollback/Replay: - Can undo by adding compensating event - Can replay to test different scenarios - Can rebuild state from scratch
5. Event-Driven Architecture: - Events feed Pattern 23 (Triggers) - Events feed Pattern 26 (Learning) - Events enable real-time processing
Without event sourcing: - Lost history (updates destroy it) - Impossible debugging (only see current state) - Data loss (overwrites, race conditions, crashes) - No audit trail (can't prove compliance) - Concurrent update problems (race conditions)
With event sourcing: - Complete history (all events preserved) - Perfect debugging (replay events to see what happened) - No data loss (immutable, append-only) - Complete audit trail (every change recorded) - No race conditions (events sequenced, never overwritten)
Context
When this pattern applies:
- Audit trail required (compliance, legal)
- Need to debug complex state changes
- High value of historical data
- Event-driven architecture desired
- Regulatory requirements for data retention
- Need point-in-time reconstruction
When this pattern may not be appropriate:
- Simple CRUD with no history needs
- Storage extremely constrained
- Read-heavy with rare writes
- No compliance requirements
- Throwaway/ephemeral data
Forces
Competing concerns:
1. Complete History vs Storage Cost - History = store every event forever (large) - Current state only = minimal storage (small) - Balance: Snapshots + events
2. Rebuild State vs Query Performance - Rebuild = replay events (slow for reads) - Direct state = query fast (but no history) - Balance: CQRS (Pattern 28) - separate read/write
3. Immutability vs Corrections - Immutable = can't fix mistakes in events - Mutable = can correct but lose audit trail - Balance: Compensating events (add correction, keep mistake visible)
4. Event Granularity - Fine-grained = many small events (detailed but verbose) - Coarse-grained = fewer big events (simple but less detail) - Balance: Domain events (meaningful business events)
5. Complexity vs Benefits - Event sourcing = more complex than CRUD - Benefits = audit, debug, replay (valuable) - Balance: Use for core domain, CRUD for peripherals
Solution
Build event-sourced system with:
1. Event Store
Event = {
event_id: UUID,
event_type: "PaymentReceived",
aggregate_id: "family_123", // What entity
aggregate_type: "Family",
event_data: {...}, // Event-specific payload
metadata: {
timestamp: ISO_timestamp,
user_id: "sarah",
correlation_id: UUID, // Link related events
causation_id: UUID // What caused this event
},
version: 5 // Event sequence number for this aggregate
}
2. Event Stream per Aggregate
All events for entity stored in order:
FamilyEvents(family_123) = [
{version: 1, type: "FamilyEnrolled", ...},
{version: 2, type: "ChildAdded", ...},
{version: 3, type: "PaymentDue", ...},
{version: 4, type: "PaymentOverdue", ...},
{version: 5, type: "PaymentReceived", ...}
]
3. State Reconstruction
Rebuild current state by replaying events:
function reconstructFamily(familyId) {
const events = getEvents(familyId);
let state = createEmptyState();
for (const event of events) {
state = applyEvent(state, event);
}
return state;
}
4. Snapshots for Performance
Cache state at intervals to avoid full replay:
Snapshot = {
aggregate_id: "family_123",
version: 100, // Events 1-100 applied
state: {...}, // Cached state
created_at: timestamp
}
// Rebuild from snapshot
function reconstructFamilyFast(familyId) {
const snapshot = getLatestSnapshot(familyId);
let state = snapshot.state;
// Only replay events AFTER snapshot
const events = getEventsSince(familyId, snapshot.version);
for (const event of events) {
state = applyEvent(state, event);
}
return state;
}
5. Event Versioning
Handle event schema changes over time:
// Version 1
PaymentReceived_v1 = {amount, date}
// Version 2 (added payment method)
PaymentReceived_v2 = {amount, date, method}
// Upcaster converts old events to new schema
function upcastEvent(event) {
if (event.type === "PaymentReceived_v1") {
return {
type: "PaymentReceived_v2",
amount: event.amount,
date: event.date,
method: "unknown" // Default for old events
};
}
return event;
}
6. Compensating Events
Correct mistakes by adding new events (not editing old):
// Mistake: Marked wrong family as withdrawn
{type: "FamilyWithdrawn", family_id: 123}
// Correction: Add compensating event
{type: "FamilyWithdrawalReversed",
family_id: 123,
reason: "Administrative error",
original_event_id: "abc-123"}
// History shows BOTH - mistake visible but corrected
Structure
Event Store Tables
-- Core event store
CREATE TABLE events (
event_id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT NEWID(),
-- What happened
event_type VARCHAR(200) NOT NULL,
event_version INT DEFAULT 1,
-- To what entity
aggregate_id VARCHAR(200) NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
aggregate_version BIGINT NOT NULL, -- Event sequence for this aggregate
-- Event payload
event_data NVARCHAR(MAX) NOT NULL, -- JSON
-- Metadata
event_timestamp DATETIME2 DEFAULT GETDATE(),
user_id VARCHAR(100),
correlation_id UNIQUEIDENTIFIER, -- Group related events
causation_id UNIQUEIDENTIFIER, -- What event caused this
-- Storage
created_date DATETIME2 DEFAULT GETDATE(),
-- Ensure events are sequenced per aggregate
CONSTRAINT UQ_aggregate_version UNIQUE (aggregate_id, aggregate_version)
);
-- Index for reading event streams
CREATE INDEX IX_events_aggregate ON events(aggregate_id, aggregate_version);
-- Index for event type queries
CREATE INDEX IX_events_type ON events(event_type, event_timestamp);
-- Index for correlation (find related events)
CREATE INDEX IX_events_correlation ON events(correlation_id);
-- Snapshots for performance
CREATE TABLE snapshots (
snapshot_id INT PRIMARY KEY IDENTITY(1,1),
aggregate_id VARCHAR(200) NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
aggregate_version BIGINT NOT NULL, -- Events up to this version applied
-- Cached state
snapshot_data NVARCHAR(MAX) NOT NULL, -- JSON
created_date DATETIME2 DEFAULT GETDATE(),
CONSTRAINT UQ_snapshot_aggregate UNIQUE (aggregate_id, aggregate_version)
);
-- Index for finding latest snapshot
CREATE INDEX IX_snapshots_latest ON snapshots(aggregate_id, aggregate_version DESC);
-- Event subscriptions (for Pattern 23 triggers, Pattern 26 learning)
CREATE TABLE event_subscriptions (
subscription_id INT PRIMARY KEY IDENTITY(1,1),
subscription_name VARCHAR(200) NOT NULL,
event_types NVARCHAR(500), -- Comma-separated or JSON
-- Position tracking
last_processed_event_id UNIQUEIDENTIFIER,
last_processed_version BIGINT,
last_processed_timestamp DATETIME2,
-- Handler
handler_type VARCHAR(100), -- 'trigger', 'projection', 'process_manager'
handler_config NVARCHAR(MAX), -- JSON
active BIT DEFAULT 1,
created_date DATETIME2 DEFAULT GETDATE()
);
-- Projections (read models from events - Pattern 28)
CREATE TABLE projection_checkpoints (
projection_name VARCHAR(200) PRIMARY KEY,
last_event_version BIGINT,
last_updated DATETIME2,
status VARCHAR(50) DEFAULT 'running', -- 'running', 'rebuilding', 'stopped'
error_count INT DEFAULT 0,
last_error NVARCHAR(MAX)
);
Implementation
Event Store
class EventStore {
constructor(db) {
this.db = db;
}
// Append new event to stream
async appendEvent(aggregateId, aggregateType, eventType, eventData, metadata = {}) {
// Get current version for this aggregate
const currentVersion = await this.getCurrentVersion(aggregateId);
const newVersion = currentVersion + 1;
// Create event
const event = {
event_id: this.generateId(),
event_type: eventType,
event_version: 1,
aggregate_id: aggregateId,
aggregate_type: aggregateType,
aggregate_version: newVersion,
event_data: JSON.stringify(eventData),
user_id: metadata.user_id || 'system',
correlation_id: metadata.correlation_id || this.generateId(),
causation_id: metadata.causation_id
};
try {
// Append to event store
await this.db.query(`
INSERT INTO events (
event_id,
event_type,
event_version,
aggregate_id,
aggregate_type,
aggregate_version,
event_data,
user_id,
correlation_id,
causation_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, [
event.event_id,
event.event_type,
event.event_version,
event.aggregate_id,
event.aggregate_type,
event.aggregate_version,
event.event_data,
event.user_id,
event.correlation_id,
event.causation_id
]);
console.log(`Event appended: ${eventType} for ${aggregateId} (v${newVersion})`);
// Publish event to subscribers (Pattern 23)
await this.publishEvent(event);
return event;
} catch (error) {
if (error.message.includes('UQ_aggregate_version')) {
// Concurrency conflict - someone else appended event with same version
throw new Error('Concurrency conflict - aggregate was modified');
}
throw error;
}
}
async getCurrentVersion(aggregateId) {
const result = await this.db.query(`
SELECT MAX(aggregate_version) as current_version
FROM events
WHERE aggregate_id = ?
`, [aggregateId]);
return result[0]?.current_version || 0;
}
// Get all events for an aggregate
async getEvents(aggregateId, fromVersion = 0) {
const events = await this.db.query(`
SELECT *
FROM events
WHERE aggregate_id = ?
AND aggregate_version > ?
ORDER BY aggregate_version ASC
`, [aggregateId, fromVersion]);
return events.map(e => this.deserializeEvent(e));
}
// Get events by type (for projections)
async getEventsByType(eventType, fromTimestamp) {
const events = await this.db.query(`
SELECT *
FROM events
WHERE event_type = ?
AND event_timestamp >= ?
ORDER BY event_timestamp ASC
`, [eventType, fromTimestamp]);
return events.map(e => this.deserializeEvent(e));
}
// Get events by correlation (find related events)
async getCorrelatedEvents(correlationId) {
const events = await this.db.query(`
SELECT *
FROM events
WHERE correlation_id = ?
ORDER BY event_timestamp ASC
`, [correlationId]);
return events.map(e => this.deserializeEvent(e));
}
deserializeEvent(row) {
return {
event_id: row.event_id,
event_type: row.event_type,
event_version: row.event_version,
aggregate_id: row.aggregate_id,
aggregate_type: row.aggregate_type,
aggregate_version: row.aggregate_version,
event_data: JSON.parse(row.event_data),
event_timestamp: row.event_timestamp,
user_id: row.user_id,
correlation_id: row.correlation_id,
causation_id: row.causation_id
};
}
// Reconstruct aggregate state from events
async reconstructAggregate(aggregateId, aggregateType) {
// Try to load from snapshot first
const snapshot = await this.getLatestSnapshot(aggregateId);
let state;
let fromVersion;
if (snapshot) {
state = JSON.parse(snapshot.snapshot_data);
fromVersion = snapshot.aggregate_version;
} else {
state = this.createEmptyState(aggregateType);
fromVersion = 0;
}
// Get events since snapshot
const events = await this.getEvents(aggregateId, fromVersion);
// Apply each event to rebuild state
for (const event of events) {
state = this.applyEvent(state, event);
}
return state;
}
createEmptyState(aggregateType) {
// Create initial state based on aggregate type
switch (aggregateType) {
case 'Family':
return {
family_id: null,
family_name: null,
status: 'unknown',
enrollment_date: null,
children: [],
payments: [],
engagement_score: 0
};
default:
return {};
}
}
applyEvent(state, event) {
// Apply event to state based on event type
switch (event.event_type) {
case 'FamilyEnrolled':
return {
...state,
family_id: event.event_data.family_id,
family_name: event.event_data.family_name,
status: 'enrolled',
enrollment_date: event.event_data.enrollment_date
};
case 'ChildAdded':
return {
...state,
children: [...state.children, event.event_data.child]
};
case 'PaymentReceived':
return {
...state,
payments: [...state.payments, {
amount: event.event_data.amount,
date: event.event_data.date
}]
};
case 'EngagementScoreUpdated':
return {
...state,
engagement_score: event.event_data.new_score
};
case 'FamilyWithdrawn':
return {
...state,
status: 'withdrawn',
withdrawal_date: event.event_data.withdrawal_date
};
case 'FamilyWithdrawalReversed':
return {
...state,
status: 'enrolled',
withdrawal_date: null
};
default:
console.warn(`Unknown event type: ${event.event_type}`);
return state;
}
}
// Create snapshot for performance
async createSnapshot(aggregateId, state, version) {
await this.db.query(`
INSERT INTO snapshots (
aggregate_id,
aggregate_type,
aggregate_version,
snapshot_data
) VALUES (?, ?, ?, ?)
`, [
aggregateId,
state.aggregate_type || 'Family',
version,
JSON.stringify(state)
]);
console.log(`Snapshot created for ${aggregateId} at version ${version}`);
}
async getLatestSnapshot(aggregateId) {
const result = await this.db.query(`
SELECT TOP 1 *
FROM snapshots
WHERE aggregate_id = ?
ORDER BY aggregate_version DESC
`, [aggregateId]);
return result[0];
}
// Publish event to subscribers
async publishEvent(event) {
// Get active subscriptions for this event type
const subscriptions = await this.db.query(`
SELECT *
FROM event_subscriptions
WHERE active = 1
AND (event_types LIKE ? OR event_types = '*')
`, [`%${event.event_type}%`]);
for (const sub of subscriptions) {
await this.notifySubscriber(sub, event);
}
}
async notifySubscriber(subscription, event) {
// Dispatch to appropriate handler
switch (subscription.handler_type) {
case 'trigger':
// Pattern 23: Triggered Interventions
await this.handleTrigger(event);
break;
case 'projection':
// Pattern 28: Update read model
await this.handleProjection(subscription, event);
break;
case 'learning':
// Pattern 26: Feed learning system
await this.handleLearning(event);
break;
default:
console.warn(`Unknown handler type: ${subscription.handler_type}`);
}
// Update subscription position
await this.db.query(`
UPDATE event_subscriptions
SET
last_processed_event_id = ?,
last_processed_version = ?,
last_processed_timestamp = GETDATE()
WHERE subscription_id = ?
`, [event.event_id, event.aggregate_version, subscription.subscription_id]);
}
async handleTrigger(event) {
// Integration with Pattern 23
const TriggerEngine = require('./trigger-engine');
const triggers = new TriggerEngine(this.db);
await triggers.handleEvent(event.event_type, {
family_id: event.aggregate_id,
event_type: event.event_type,
event_data: event.event_data
});
}
async handleProjection(subscription, event) {
// Integration with Pattern 28 (CQRS)
// Update read models based on events
console.log(`Updating projection: ${subscription.subscription_name}`);
}
async handleLearning(event) {
// Integration with Pattern 26
// Feed events to learning system
console.log(`Learning from event: ${event.event_type}`);
}
generateId() {
// Generate UUID
return require('crypto').randomUUID();
}
}
module.exports = EventStore;
Usage Example
const eventStore = new EventStore(db);
// Scenario: Family enrolls
await eventStore.appendEvent(
'family_123', // aggregate_id
'Family', // aggregate_type
'FamilyEnrolled', // event_type
{
family_id: 'family_123',
family_name: 'Martinez Family',
enrollment_date: '2024-09-01',
parent_name: 'Maria Martinez',
children_count: 2
},
{
user_id: 'sarah',
correlation_id: 'enrollment_abc123'
}
);
// Add child
await eventStore.appendEvent(
'family_123',
'Family',
'ChildAdded',
{
child_name: 'Sofia Martinez',
age: 8,
grade: 3
},
{
user_id: 'sarah',
correlation_id: 'enrollment_abc123'
}
);
// Payment received
await eventStore.appendEvent(
'family_123',
'Family',
'PaymentReceived',
{
amount: 450,
date: '2024-09-15',
payment_method: 'credit_card'
},
{
user_id: 'system',
correlation_id: 'payment_xyz789'
}
);
// Later: Reconstruct current state
const currentState = await eventStore.reconstructAggregate('family_123', 'Family');
console.log(currentState);
// {
// family_id: 'family_123',
// family_name: 'Martinez Family',
// status: 'enrolled',
// enrollment_date: '2024-09-01',
// children: [{child_name: 'Sofia Martinez', age: 8, grade: 3}],
// payments: [{amount: 450, date: '2024-09-15'}],
// engagement_score: 0
// }
// Audit: Show all events
const events = await eventStore.getEvents('family_123');
events.forEach(e => {
console.log(`${e.event_timestamp}: ${e.event_type} by ${e.user_id}`);
});
// Time travel: What was state on 2024-09-20?
const eventsUntil = events.filter(e => new Date(e.event_timestamp) <= new Date('2024-09-20'));
const historicalState = eventsUntil.reduce(
(state, event) => eventStore.applyEvent(state, event),
eventStore.createEmptyState('Family')
);
Variations
By Storage
SQL Database: - Events table with good indexing - Works well, familiar - Transactions ensure consistency
Specialized Event Store: - EventStoreDB, Apache Kafka - Optimized for event streams - Better performance at scale
Hybrid: - Events in specialized store - Snapshots in SQL - Best of both
By Snapshot Strategy
No Snapshots: - Always replay all events - Simple but slow for large streams
Periodic Snapshots: - Every N events (e.g., every 100) - Balance performance and complexity
On-Demand Snapshots: - Create when aggregate read frequently - Adaptive optimization
By Event Granularity
Fine-Grained: - Many small events (FieldChanged) - Very detailed audit trail - Large event volume
Coarse-Grained: - Fewer big events (OrderCompleted) - Less detail - Smaller volume
Domain Events: - Business-meaningful events - Good balance
Consequences
Benefits
1. Complete audit trail Every change recorded forever. Can prove compliance.
2. Time travel debugging Replay events to see exact sequence. "What happened?" always answerable.
3. No data loss Immutable, append-only. Crash-safe.
4. Event-driven architecture foundation Events feed triggers, learning, integrations.
5. Rollback via compensation Can undo by adding compensating events.
6. Flexible projections Can create new read models from event history.
7. Business insight Event log is business process log.
Costs
1. Storage overhead Store all events forever (large).
2. Complexity More complex than CRUD.
3. Eventual consistency Projections may lag events.
4. Learning curve Team needs to understand event sourcing.
5. Event versioning Need to handle schema changes.
6. Query complexity Must rebuild state from events.
Sample Code
Find all events for specific correlation:
// All events related to Martinez enrollment
const enrollmentEvents = await eventStore.getCorrelatedEvents('enrollment_abc123');
enrollmentEvents.forEach(e => {
console.log(`${e.event_type}: ${JSON.stringify(e.event_data)}`);
});
// FamilyEnrolled: {family_name: "Martinez Family", ...}
// ChildAdded: {child_name: "Sofia Martinez", ...}
Known Uses
Homeschool Co-Op Platform - All state changes as events - Complete audit trail for compliance - Time-travel debugging for support - Events feed triggers and learning
Banking/Finance - Account transactions as events - Regulatory compliance - Fraud detection - Never delete financial history
E-Commerce - Order lifecycle as events - Inventory management - Customer journey analysis
Healthcare - Patient record changes as events - HIPAA audit requirements - Medical history reconstruction
Related Patterns
Foundation For: - Pattern 28: CQRS - separate read/write models - Pattern 23: Triggered Interventions - events trigger actions - Pattern 26: Feedback Loop - events feed learning
Requires: - Pattern 1: Universal Event Log - similar concept, different scale
Enables: - Pattern 29: Real-Time Processing - process event streams - Complete audit compliance - Point-in-time reconstruction
References
Academic Foundations
- Fowler, Martin (2005). "Event Sourcing." https://martinfowler.com/eaaDev/EventSourcing.html - Canonical pattern description
- Vernon, Vaughn (2013). Implementing Domain-Driven Design. Addison-Wesley. ISBN: 978-0321834577 - DDD with event sourcing
- Young, Greg (2010). "CQRS and Event Sourcing." https://cqrs.files.wordpress.com/2010/11/cqrs_documents.pdf - Foundational paper
- Kleppmann, Martin (2017). Designing Data-Intensive Applications. O'Reilly. ISBN: 978-1449373320 - Chapter on event logs
Event Store Implementations
- EventStore: https://www.eventstore.com/ - Purpose-built event sourcing database (formerly Event Store DB)
- Marten: https://martendb.io/ - .NET event sourcing with PostgreSQL
- Axon Framework: https://www.axoniq.io/ - Event sourcing and CQRS for Java
- Akka Persistence: https://doc.akka.io/docs/akka/current/typed/persistence.html - Event sourcing in Akka
Kafka as Event Log
- Apache Kafka: https://kafka.apache.org/ - Distributed event streaming platform
- Kafka Streams: https://kafka.apache.org/documentation/streams/ - Stream processing with Kafka
- Confluent: https://www.confluent.io/blog/event-sourcing-cqrs-stream-processing-apache-kafka-whats-connection/ - Event sourcing with Kafka
- Jay Kreps Blog: https://www.confluent.io/blog/author/jay-kreps/ - Kafka creator on event logs
Versioning & Schema Evolution
- Schema Registry: https://docs.confluent.io/platform/current/schema-registry/index.html - Confluent schema management
- Avro: https://avro.apache.org/ - Data serialization with schema evolution
- Protobuf: https://developers.google.com/protocol-buffers - Google's serialization format
- Event Versioning: Young, G. "Versioning in an Event Sourced System." https://leanpub.com/esversioning
Related Trilogy Patterns
- Pattern 1: Universal Event Log - Foundation for event sourcing
- Pattern 18: Audit Trail - Event sourcing provides complete audit
- Pattern 19: Version Control - Event sourcing is version control for data
- Pattern 27: Event Sourcing - Store state as sequence of events
- Pattern 28: CQRS - Often paired with event sourcing
- Volume 3, Pattern 24: Webhooks & Event Streaming - Event delivery
Practical Implementation
- EventSourcing.js: https://github.com/adrai/node-eventstore - Node.js event sourcing
- Equinox: https://github.com/jet/equinox - .NET event sourcing library
- EventFlow: https://github.com/eventflow/EventFlow - .NET event sourcing and CQRS
- Lagom: https://www.lagomframework.com/ - Reactive microservices with event sourcing
Tools & Resources
- Event Modeling: https://eventmodeling.org/ - Methodology for event-driven systems
- Event Storming: https://www.eventstorming.com/ - Collaborative modeling technique
- Domain Storytelling: https://domainstorytelling.org/ - Visual event discovery