Pattern 27: Event Sourcing

Intent

Store all state changes as immutable sequence of events rather than updating current state, enabling complete audit trails, point-in-time reconstruction, event replay for debugging, and foundation for event-driven architecture while preventing data loss and supporting regulatory compliance.

Also Known As

  • Event Log Architecture
  • Append-Only Storage
  • Immutable Event Store
  • Event Stream Processing
  • Audit Log Pattern

Problem

Traditional state-based storage loses history, makes debugging impossible, and creates data loss risks.

The "update in place" disaster:

Traditional database approach:

-- Martinez family payment status
UPDATE families 
SET payment_status = 'overdue', 
    last_payment_date = '2024-10-15'
WHERE family_id = 123;

-- Later that day...
UPDATE families 
SET payment_status = 'paid', 
    last_payment_date = '2024-10-20'
WHERE family_id = 123;

Six months later, Sarah gets audited:

Auditor: "Martinez family shows 'paid' status. When did they pay? Were they ever late?"

Sarah: "The database shows they're paid now... but I can't tell you the history. The old data was overwritten." 😱

Auditor: "You can't prove compliance with payment policies. That's a serious issue."

The problem: History was DESTROYED by updates! No audit trail, no proof, no way to reconstruct what happened!

The "what happened?" debugging nightmare:

Production incident: Chen family complains: "We got locked out of portal but we paid on time!"

Developer: "Let me check the database..."

SELECT * FROM families WHERE family_id = 456;
-- Result: payment_status = 'paid', portal_access = 'locked'

Developer: "Uh... you show as paid but locked. I don't know WHY you're locked. The data doesn't tell me WHAT HAPPENED, only CURRENT STATE."

Questions we CAN'T answer: - When did they get locked? - What triggered the lock? - Was it automated or manual? - Did payment clear AFTER lock? - What's the sequence of events?

The database only shows NOW, not HOW WE GOT HERE! 😱

The "can't rollback" problem:

Coordinator error: Sarah accidentally marks 50 families as "withdrawn" instead of "on vacation"

UPDATE families SET status = 'withdrawn' WHERE status = 'on_vacation';
-- OH NO! Wrong update!

Sarah: "Can we undo this?"

Developer: "No. The old data is gone. We overwrote it. I can change them back to 'on_vacation' but I can't prove that's what they were before. Maybe some were actually withdrawn?"

Can't rollback because history was destroyed! 😱

The "concurrent update" race condition:

Two processes updating same family simultaneously:

Process A: Read payment_status = 'pending'
Process B: Read payment_status = 'pending'
Process A: Update payment_status = 'paid'
Process B: Update payment_status = 'overdue'

Final state: 'overdue' (Process B overwrote Process A!)

Process A's update was LOST! Race condition destroyed data! 😱

The "data loss" horror:

Hardware failure: Database crashes during UPDATE operation

UPDATE families SET engagement_score = 85 WHERE family_id = 789;
-- CRASH! Power failure!

After recovery: Family 789 has corrupt engagement_score or old value

Was the update applied? Partially applied? Lost? Nobody knows! Data integrity destroyed! 😱

What we need: Event Sourcing

Instead of updating state, APPEND events:

// Don't update state
❌ UPDATE families SET payment_status = 'paid'

// Instead, append event
✅ INSERT INTO events (event_type, data) 
   VALUES ('PaymentReceived', {family_id: 123, amount: 450, date: '2024-10-20'})

Current state = replay all events:

events = [
  {type: 'FamilyEnrolled', date: '2024-09-01', ...},
  {type: 'PaymentDue', date: '2024-10-01', amount: 450, ...},
  {type: 'PaymentOverdue', date: '2024-10-16', ...},
  {type: 'PaymentReceived', date: '2024-10-20', amount: 450, ...},
  {type: 'PortalAccessRestored', date: '2024-10-20', ...}
]

current_state = replayEvents(events)
// → payment_status: 'paid', portal_access: 'active'

Benefits:

1. Complete Audit Trail: - Every change recorded forever - Can prove compliance - Can reconstruct any moment in time

2. Debuggable: - "What happened?" → Replay events, see exact sequence - "When did this start?" → Find first relevant event - "Who did this?" → Events include actor

3. No Data Loss: - Events immutable (never deleted) - Append-only (no overwrites) - Crash-safe (either event persisted or not, no partial state)

4. Rollback/Replay: - Can undo by adding compensating event - Can replay to test different scenarios - Can rebuild state from scratch

5. Event-Driven Architecture: - Events feed Pattern 23 (Triggers) - Events feed Pattern 26 (Learning) - Events enable real-time processing

Without event sourcing: - Lost history (updates destroy it) - Impossible debugging (only see current state) - Data loss (overwrites, race conditions, crashes) - No audit trail (can't prove compliance) - Concurrent update problems (race conditions)

With event sourcing: - Complete history (all events preserved) - Perfect debugging (replay events to see what happened) - No data loss (immutable, append-only) - Complete audit trail (every change recorded) - No race conditions (events sequenced, never overwritten)

Context

When this pattern applies:

  • Audit trail required (compliance, legal)
  • Need to debug complex state changes
  • High value of historical data
  • Event-driven architecture desired
  • Regulatory requirements for data retention
  • Need point-in-time reconstruction

When this pattern may not be appropriate:

  • Simple CRUD with no history needs
  • Storage extremely constrained
  • Read-heavy with rare writes
  • No compliance requirements
  • Throwaway/ephemeral data

Forces

Competing concerns:

1. Complete History vs Storage Cost - History = store every event forever (large) - Current state only = minimal storage (small) - Balance: Snapshots + events

2. Rebuild State vs Query Performance - Rebuild = replay events (slow for reads) - Direct state = query fast (but no history) - Balance: CQRS (Pattern 28) - separate read/write

3. Immutability vs Corrections - Immutable = can't fix mistakes in events - Mutable = can correct but lose audit trail - Balance: Compensating events (add correction, keep mistake visible)

4. Event Granularity - Fine-grained = many small events (detailed but verbose) - Coarse-grained = fewer big events (simple but less detail) - Balance: Domain events (meaningful business events)

5. Complexity vs Benefits - Event sourcing = more complex than CRUD - Benefits = audit, debug, replay (valuable) - Balance: Use for core domain, CRUD for peripherals

Solution

Build event-sourced system with:

1. Event Store

Event = {
  event_id: UUID,
  event_type: "PaymentReceived",
  aggregate_id: "family_123",  // What entity
  aggregate_type: "Family",
  event_data: {...},  // Event-specific payload
  metadata: {
    timestamp: ISO_timestamp,
    user_id: "sarah",
    correlation_id: UUID,  // Link related events
    causation_id: UUID     // What caused this event
  },
  version: 5  // Event sequence number for this aggregate
}

2. Event Stream per Aggregate

All events for entity stored in order:

FamilyEvents(family_123) = [
  {version: 1, type: "FamilyEnrolled", ...},
  {version: 2, type: "ChildAdded", ...},
  {version: 3, type: "PaymentDue", ...},
  {version: 4, type: "PaymentOverdue", ...},
  {version: 5, type: "PaymentReceived", ...}
]

3. State Reconstruction

Rebuild current state by replaying events:

function reconstructFamily(familyId) {
  const events = getEvents(familyId);
  let state = createEmptyState();

  for (const event of events) {
    state = applyEvent(state, event);
  }

  return state;
}

4. Snapshots for Performance

Cache state at intervals to avoid full replay:

Snapshot = {
  aggregate_id: "family_123",
  version: 100,  // Events 1-100 applied
  state: {...},  // Cached state
  created_at: timestamp
}

// Rebuild from snapshot
function reconstructFamilyFast(familyId) {
  const snapshot = getLatestSnapshot(familyId);
  let state = snapshot.state;

  // Only replay events AFTER snapshot
  const events = getEventsSince(familyId, snapshot.version);

  for (const event of events) {
    state = applyEvent(state, event);
  }

  return state;
}

5. Event Versioning

Handle event schema changes over time:

// Version 1
PaymentReceived_v1 = {amount, date}

// Version 2 (added payment method)
PaymentReceived_v2 = {amount, date, method}

// Upcaster converts old events to new schema
function upcastEvent(event) {
  if (event.type === "PaymentReceived_v1") {
    return {
      type: "PaymentReceived_v2",
      amount: event.amount,
      date: event.date,
      method: "unknown"  // Default for old events
    };
  }
  return event;
}

6. Compensating Events

Correct mistakes by adding new events (not editing old):

// Mistake: Marked wrong family as withdrawn
{type: "FamilyWithdrawn", family_id: 123}

// Correction: Add compensating event
{type: "FamilyWithdrawalReversed", 
 family_id: 123, 
 reason: "Administrative error",
 original_event_id: "abc-123"}

// History shows BOTH - mistake visible but corrected

Structure

Event Store Tables

-- Core event store
CREATE TABLE events (
  event_id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT NEWID(),

  -- What happened
  event_type VARCHAR(200) NOT NULL,
  event_version INT DEFAULT 1,

  -- To what entity
  aggregate_id VARCHAR(200) NOT NULL,
  aggregate_type VARCHAR(100) NOT NULL,
  aggregate_version BIGINT NOT NULL,  -- Event sequence for this aggregate

  -- Event payload
  event_data NVARCHAR(MAX) NOT NULL,  -- JSON

  -- Metadata
  event_timestamp DATETIME2 DEFAULT GETDATE(),
  user_id VARCHAR(100),
  correlation_id UNIQUEIDENTIFIER,  -- Group related events
  causation_id UNIQUEIDENTIFIER,    -- What event caused this

  -- Storage
  created_date DATETIME2 DEFAULT GETDATE(),

  -- Ensure events are sequenced per aggregate
  CONSTRAINT UQ_aggregate_version UNIQUE (aggregate_id, aggregate_version)
);

-- Index for reading event streams
CREATE INDEX IX_events_aggregate ON events(aggregate_id, aggregate_version);

-- Index for event type queries
CREATE INDEX IX_events_type ON events(event_type, event_timestamp);

-- Index for correlation (find related events)
CREATE INDEX IX_events_correlation ON events(correlation_id);

-- Snapshots for performance
CREATE TABLE snapshots (
  snapshot_id INT PRIMARY KEY IDENTITY(1,1),

  aggregate_id VARCHAR(200) NOT NULL,
  aggregate_type VARCHAR(100) NOT NULL,
  aggregate_version BIGINT NOT NULL,  -- Events up to this version applied

  -- Cached state
  snapshot_data NVARCHAR(MAX) NOT NULL,  -- JSON

  created_date DATETIME2 DEFAULT GETDATE(),

  CONSTRAINT UQ_snapshot_aggregate UNIQUE (aggregate_id, aggregate_version)
);

-- Index for finding latest snapshot
CREATE INDEX IX_snapshots_latest ON snapshots(aggregate_id, aggregate_version DESC);

-- Event subscriptions (for Pattern 23 triggers, Pattern 26 learning)
CREATE TABLE event_subscriptions (
  subscription_id INT PRIMARY KEY IDENTITY(1,1),

  subscription_name VARCHAR(200) NOT NULL,
  event_types NVARCHAR(500),  -- Comma-separated or JSON

  -- Position tracking
  last_processed_event_id UNIQUEIDENTIFIER,
  last_processed_version BIGINT,
  last_processed_timestamp DATETIME2,

  -- Handler
  handler_type VARCHAR(100),  -- 'trigger', 'projection', 'process_manager'
  handler_config NVARCHAR(MAX),  -- JSON

  active BIT DEFAULT 1,
  created_date DATETIME2 DEFAULT GETDATE()
);

-- Projections (read models from events - Pattern 28)
CREATE TABLE projection_checkpoints (
  projection_name VARCHAR(200) PRIMARY KEY,

  last_event_version BIGINT,
  last_updated DATETIME2,

  status VARCHAR(50) DEFAULT 'running',  -- 'running', 'rebuilding', 'stopped'

  error_count INT DEFAULT 0,
  last_error NVARCHAR(MAX)
);

Implementation

Event Store

class EventStore {
  constructor(db) {
    this.db = db;
  }

  // Append new event to stream
  async appendEvent(aggregateId, aggregateType, eventType, eventData, metadata = {}) {
    // Get current version for this aggregate
    const currentVersion = await this.getCurrentVersion(aggregateId);
    const newVersion = currentVersion + 1;

    // Create event
    const event = {
      event_id: this.generateId(),
      event_type: eventType,
      event_version: 1,
      aggregate_id: aggregateId,
      aggregate_type: aggregateType,
      aggregate_version: newVersion,
      event_data: JSON.stringify(eventData),
      user_id: metadata.user_id || 'system',
      correlation_id: metadata.correlation_id || this.generateId(),
      causation_id: metadata.causation_id
    };

    try {
      // Append to event store
      await this.db.query(`
        INSERT INTO events (
          event_id,
          event_type,
          event_version,
          aggregate_id,
          aggregate_type,
          aggregate_version,
          event_data,
          user_id,
          correlation_id,
          causation_id
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
      `, [
        event.event_id,
        event.event_type,
        event.event_version,
        event.aggregate_id,
        event.aggregate_type,
        event.aggregate_version,
        event.event_data,
        event.user_id,
        event.correlation_id,
        event.causation_id
      ]);

      console.log(`Event appended: ${eventType} for ${aggregateId} (v${newVersion})`);

      // Publish event to subscribers (Pattern 23)
      await this.publishEvent(event);

      return event;

    } catch (error) {
      if (error.message.includes('UQ_aggregate_version')) {
        // Concurrency conflict - someone else appended event with same version
        throw new Error('Concurrency conflict - aggregate was modified');
      }
      throw error;
    }
  }

  async getCurrentVersion(aggregateId) {
    const result = await this.db.query(`
      SELECT MAX(aggregate_version) as current_version
      FROM events
      WHERE aggregate_id = ?
    `, [aggregateId]);

    return result[0]?.current_version || 0;
  }

  // Get all events for an aggregate
  async getEvents(aggregateId, fromVersion = 0) {
    const events = await this.db.query(`
      SELECT *
      FROM events
      WHERE aggregate_id = ?
        AND aggregate_version > ?
      ORDER BY aggregate_version ASC
    `, [aggregateId, fromVersion]);

    return events.map(e => this.deserializeEvent(e));
  }

  // Get events by type (for projections)
  async getEventsByType(eventType, fromTimestamp) {
    const events = await this.db.query(`
      SELECT *
      FROM events
      WHERE event_type = ?
        AND event_timestamp >= ?
      ORDER BY event_timestamp ASC
    `, [eventType, fromTimestamp]);

    return events.map(e => this.deserializeEvent(e));
  }

  // Get events by correlation (find related events)
  async getCorrelatedEvents(correlationId) {
    const events = await this.db.query(`
      SELECT *
      FROM events
      WHERE correlation_id = ?
      ORDER BY event_timestamp ASC
    `, [correlationId]);

    return events.map(e => this.deserializeEvent(e));
  }

  deserializeEvent(row) {
    return {
      event_id: row.event_id,
      event_type: row.event_type,
      event_version: row.event_version,
      aggregate_id: row.aggregate_id,
      aggregate_type: row.aggregate_type,
      aggregate_version: row.aggregate_version,
      event_data: JSON.parse(row.event_data),
      event_timestamp: row.event_timestamp,
      user_id: row.user_id,
      correlation_id: row.correlation_id,
      causation_id: row.causation_id
    };
  }

  // Reconstruct aggregate state from events
  async reconstructAggregate(aggregateId, aggregateType) {
    // Try to load from snapshot first
    const snapshot = await this.getLatestSnapshot(aggregateId);

    let state;
    let fromVersion;

    if (snapshot) {
      state = JSON.parse(snapshot.snapshot_data);
      fromVersion = snapshot.aggregate_version;
    } else {
      state = this.createEmptyState(aggregateType);
      fromVersion = 0;
    }

    // Get events since snapshot
    const events = await this.getEvents(aggregateId, fromVersion);

    // Apply each event to rebuild state
    for (const event of events) {
      state = this.applyEvent(state, event);
    }

    return state;
  }

  createEmptyState(aggregateType) {
    // Create initial state based on aggregate type
    switch (aggregateType) {
      case 'Family':
        return {
          family_id: null,
          family_name: null,
          status: 'unknown',
          enrollment_date: null,
          children: [],
          payments: [],
          engagement_score: 0
        };
      default:
        return {};
    }
  }

  applyEvent(state, event) {
    // Apply event to state based on event type
    switch (event.event_type) {
      case 'FamilyEnrolled':
        return {
          ...state,
          family_id: event.event_data.family_id,
          family_name: event.event_data.family_name,
          status: 'enrolled',
          enrollment_date: event.event_data.enrollment_date
        };

      case 'ChildAdded':
        return {
          ...state,
          children: [...state.children, event.event_data.child]
        };

      case 'PaymentReceived':
        return {
          ...state,
          payments: [...state.payments, {
            amount: event.event_data.amount,
            date: event.event_data.date
          }]
        };

      case 'EngagementScoreUpdated':
        return {
          ...state,
          engagement_score: event.event_data.new_score
        };

      case 'FamilyWithdrawn':
        return {
          ...state,
          status: 'withdrawn',
          withdrawal_date: event.event_data.withdrawal_date
        };

      case 'FamilyWithdrawalReversed':
        return {
          ...state,
          status: 'enrolled',
          withdrawal_date: null
        };

      default:
        console.warn(`Unknown event type: ${event.event_type}`);
        return state;
    }
  }

  // Create snapshot for performance
  async createSnapshot(aggregateId, state, version) {
    await this.db.query(`
      INSERT INTO snapshots (
        aggregate_id,
        aggregate_type,
        aggregate_version,
        snapshot_data
      ) VALUES (?, ?, ?, ?)
    `, [
      aggregateId,
      state.aggregate_type || 'Family',
      version,
      JSON.stringify(state)
    ]);

    console.log(`Snapshot created for ${aggregateId} at version ${version}`);
  }

  async getLatestSnapshot(aggregateId) {
    const result = await this.db.query(`
      SELECT TOP 1 *
      FROM snapshots
      WHERE aggregate_id = ?
      ORDER BY aggregate_version DESC
    `, [aggregateId]);

    return result[0];
  }

  // Publish event to subscribers
  async publishEvent(event) {
    // Get active subscriptions for this event type
    const subscriptions = await this.db.query(`
      SELECT *
      FROM event_subscriptions
      WHERE active = 1
        AND (event_types LIKE ? OR event_types = '*')
    `, [`%${event.event_type}%`]);

    for (const sub of subscriptions) {
      await this.notifySubscriber(sub, event);
    }
  }

  async notifySubscriber(subscription, event) {
    // Dispatch to appropriate handler
    switch (subscription.handler_type) {
      case 'trigger':
        // Pattern 23: Triggered Interventions
        await this.handleTrigger(event);
        break;

      case 'projection':
        // Pattern 28: Update read model
        await this.handleProjection(subscription, event);
        break;

      case 'learning':
        // Pattern 26: Feed learning system
        await this.handleLearning(event);
        break;

      default:
        console.warn(`Unknown handler type: ${subscription.handler_type}`);
    }

    // Update subscription position
    await this.db.query(`
      UPDATE event_subscriptions
      SET 
        last_processed_event_id = ?,
        last_processed_version = ?,
        last_processed_timestamp = GETDATE()
      WHERE subscription_id = ?
    `, [event.event_id, event.aggregate_version, subscription.subscription_id]);
  }

  async handleTrigger(event) {
    // Integration with Pattern 23
    const TriggerEngine = require('./trigger-engine');
    const triggers = new TriggerEngine(this.db);

    await triggers.handleEvent(event.event_type, {
      family_id: event.aggregate_id,
      event_type: event.event_type,
      event_data: event.event_data
    });
  }

  async handleProjection(subscription, event) {
    // Integration with Pattern 28 (CQRS)
    // Update read models based on events
    console.log(`Updating projection: ${subscription.subscription_name}`);
  }

  async handleLearning(event) {
    // Integration with Pattern 26
    // Feed events to learning system
    console.log(`Learning from event: ${event.event_type}`);
  }

  generateId() {
    // Generate UUID
    return require('crypto').randomUUID();
  }
}

module.exports = EventStore;

Usage Example

const eventStore = new EventStore(db);

// Scenario: Family enrolls
await eventStore.appendEvent(
  'family_123',           // aggregate_id
  'Family',               // aggregate_type
  'FamilyEnrolled',       // event_type
  {
    family_id: 'family_123',
    family_name: 'Martinez Family',
    enrollment_date: '2024-09-01',
    parent_name: 'Maria Martinez',
    children_count: 2
  },
  {
    user_id: 'sarah',
    correlation_id: 'enrollment_abc123'
  }
);

// Add child
await eventStore.appendEvent(
  'family_123',
  'Family',
  'ChildAdded',
  {
    child_name: 'Sofia Martinez',
    age: 8,
    grade: 3
  },
  {
    user_id: 'sarah',
    correlation_id: 'enrollment_abc123'
  }
);

// Payment received
await eventStore.appendEvent(
  'family_123',
  'Family',
  'PaymentReceived',
  {
    amount: 450,
    date: '2024-09-15',
    payment_method: 'credit_card'
  },
  {
    user_id: 'system',
    correlation_id: 'payment_xyz789'
  }
);

// Later: Reconstruct current state
const currentState = await eventStore.reconstructAggregate('family_123', 'Family');

console.log(currentState);
// {
//   family_id: 'family_123',
//   family_name: 'Martinez Family',
//   status: 'enrolled',
//   enrollment_date: '2024-09-01',
//   children: [{child_name: 'Sofia Martinez', age: 8, grade: 3}],
//   payments: [{amount: 450, date: '2024-09-15'}],
//   engagement_score: 0
// }

// Audit: Show all events
const events = await eventStore.getEvents('family_123');
events.forEach(e => {
  console.log(`${e.event_timestamp}: ${e.event_type} by ${e.user_id}`);
});

// Time travel: What was state on 2024-09-20?
const eventsUntil = events.filter(e => new Date(e.event_timestamp) <= new Date('2024-09-20'));
const historicalState = eventsUntil.reduce(
  (state, event) => eventStore.applyEvent(state, event),
  eventStore.createEmptyState('Family')
);

Variations

By Storage

SQL Database: - Events table with good indexing - Works well, familiar - Transactions ensure consistency

Specialized Event Store: - EventStoreDB, Apache Kafka - Optimized for event streams - Better performance at scale

Hybrid: - Events in specialized store - Snapshots in SQL - Best of both

By Snapshot Strategy

No Snapshots: - Always replay all events - Simple but slow for large streams

Periodic Snapshots: - Every N events (e.g., every 100) - Balance performance and complexity

On-Demand Snapshots: - Create when aggregate read frequently - Adaptive optimization

By Event Granularity

Fine-Grained: - Many small events (FieldChanged) - Very detailed audit trail - Large event volume

Coarse-Grained: - Fewer big events (OrderCompleted) - Less detail - Smaller volume

Domain Events: - Business-meaningful events - Good balance

Consequences

Benefits

1. Complete audit trail Every change recorded forever. Can prove compliance.

2. Time travel debugging Replay events to see exact sequence. "What happened?" always answerable.

3. No data loss Immutable, append-only. Crash-safe.

4. Event-driven architecture foundation Events feed triggers, learning, integrations.

5. Rollback via compensation Can undo by adding compensating events.

6. Flexible projections Can create new read models from event history.

7. Business insight Event log is business process log.

Costs

1. Storage overhead Store all events forever (large).

2. Complexity More complex than CRUD.

3. Eventual consistency Projections may lag events.

4. Learning curve Team needs to understand event sourcing.

5. Event versioning Need to handle schema changes.

6. Query complexity Must rebuild state from events.

Sample Code

Find all events for specific correlation:

// All events related to Martinez enrollment
const enrollmentEvents = await eventStore.getCorrelatedEvents('enrollment_abc123');

enrollmentEvents.forEach(e => {
  console.log(`${e.event_type}: ${JSON.stringify(e.event_data)}`);
});
// FamilyEnrolled: {family_name: "Martinez Family", ...}
// ChildAdded: {child_name: "Sofia Martinez", ...}

Known Uses

Homeschool Co-Op Platform - All state changes as events - Complete audit trail for compliance - Time-travel debugging for support - Events feed triggers and learning

Banking/Finance - Account transactions as events - Regulatory compliance - Fraud detection - Never delete financial history

E-Commerce - Order lifecycle as events - Inventory management - Customer journey analysis

Healthcare - Patient record changes as events - HIPAA audit requirements - Medical history reconstruction

Foundation For: - Pattern 28: CQRS - separate read/write models - Pattern 23: Triggered Interventions - events trigger actions - Pattern 26: Feedback Loop - events feed learning

Requires: - Pattern 1: Universal Event Log - similar concept, different scale

Enables: - Pattern 29: Real-Time Processing - process event streams - Complete audit compliance - Point-in-time reconstruction

References

Academic Foundations

Event Store Implementations

Kafka as Event Log

Versioning & Schema Evolution

  • Pattern 1: Universal Event Log - Foundation for event sourcing
  • Pattern 18: Audit Trail - Event sourcing provides complete audit
  • Pattern 19: Version Control - Event sourcing is version control for data
  • Pattern 27: Event Sourcing - Store state as sequence of events
  • Pattern 28: CQRS - Often paired with event sourcing
  • Volume 3, Pattern 24: Webhooks & Event Streaming - Event delivery

Practical Implementation

Tools & Resources