Volume 2: Organizational Intelligence Platforms

Pattern 28: CQRS (Command Query Responsibility Segregation)

Intent

Separate write operations (commands that change state) from read operations (queries that retrieve data) into distinct models optimized for their specific purposes, enabling event-sourced writes with perfect audit trails while maintaining denormalized read models for sub-millisecond query performance.

Also Known As

  • Command Query Separation
  • Read/Write Model Separation
  • Polyglot Persistence
  • Dual-Model Architecture
  • Optimized Query Models

Problem

A single model can't be optimized for both writes and reads. Event sourcing gives perfect writes but terrible read performance.

The event sourcing read problem:

Pattern 27 (Event Sourcing) stores everything as events - PERFECT for writes:

// Write: Append event (1ms, fast!) ✅
await eventStore.appendEvent('FamilyEnrolled', {...});

// Read: Replay 10,000 events to get current state (5000ms, SLOW!) ❌
const state = await eventStore.reconstructAggregate('family_123');
// Must apply 10,000 events one by one... 5 seconds!

The problem: Reads are 5000× slower than writes! Unacceptable for user-facing queries!

The dashboard disaster:

Sarah opens coordinator dashboard: "Show me all at-risk families"

Naive event sourcing approach:

// Get all families (500 families)
for (const familyId of allFamilyIds) {
  // Reconstruct each family's state from events
  const state = await reconstructAggregate(familyId);
  if (state.risk_score > 70) {
    atRiskFamilies.push(state);
  }
}

// Time: 500 families × 2 seconds = 16 MINUTES! 😱

Sarah quits using the dashboard because it's too slow!

The complex query problem:

CEO wants report: "Show me families by risk tier, grouped by enrollment month, with average engagement score per tier"

With event sourcing only:

// Must reconstruct ALL families
// Then filter/group/aggregate in memory
// Time: MANY minutes for complex queries

Traditional SQL would be instant:

SELECT 
  DATE_TRUNC('month', enrollment_date) as month,
  risk_tier,
  COUNT(*) as families,
  AVG(engagement_score) as avg_engagement
FROM families
GROUP BY month, risk_tier
-- Time: 50ms! ⚡

The problem: Event sourcing can't do complex queries efficiently!

The "two different needs" problem:

Writes need: - Audit trail (every change recorded) - Integrity (no data loss) - Versioning (concurrency control) - Events (for triggers, learning) - Don't care about query speed

Reads need: - SPEED (sub-100ms response) - Complex queries (JOIN, GROUP BY, aggregations) - Denormalized data (no expensive JOINs) - Indexes (for fast lookups) - Don't care about write history

A single model can't optimize for both! 😱

The "normalized vs denormalized" problem:

Normalized (good for writes):

families (id, name)
children (id, family_id, name)
payments (id, family_id, amount, date)

Query: "Get family with all children and payment history"

SELECT * FROM families
JOIN children ON families.id = children.family_id
JOIN payments ON families.id = payments.family_id
WHERE families.id = 123
-- Time: 50ms (3 JOINs) 😐

Denormalized (good for reads):

family_view (id, name, children_json, payments_json, total_paid, last_payment_date)

Query: "Get family with all children and payment history"

SELECT * FROM family_view WHERE id = 123
-- Time: 2ms (no JOINs!) ⚡

25× faster with denormalized! But denormalized is terrible for writes (duplicated data, update anomalies)!

What we need: CQRS

Separate models for writes and reads:

Write Model (Command Side): - Event sourcing (Pattern 27) - Append-only, perfect audit - Optimized for integrity - Handles: EnrollFamily, ReceivePayment, UpdateEngagement

Read Model (Query Side): - Denormalized projections - Optimized for speed - Pre-computed aggregations - Handles: GetFamily, ListAtRiskFamilies, GenerateReport

Event flow:

Command → Event Store → Events → Projections → Read Models

Write path:

EnrollFamily command
→ Append 'FamilyEnrolled' event (1ms)
→ Event published
→ Update read model async (10ms later)
→ Command complete! (Total: 1ms from user perspective!)

Read path:

GetFamily query
→ SELECT from denormalized read model (2ms)
→ Return data (Total: 2ms!)

Benefits:

1. Write performance: 1ms (append event only) 2. Read performance: 2ms (query optimized model) 3. Write integrity: Event sourcing (perfect audit) 4. Read flexibility: Can create MANY read models for different use cases 5. Scale independently: More reads? Add read replicas. More writes? Add event store capacity.

Without CQRS: - Single model = compromise (okay at both, great at neither) - Event sourcing only = slow reads (unacceptable) - Traditional DB only = no audit trail (compliance fail)

With CQRS: - Write model = perfect integrity (event sourcing) - Read models = blazing speed (denormalized, indexed) - Best of both worlds! ⚡🛡️

Context

When this pattern applies:

  • Complex domain with different read/write needs
  • High read volume (100:1 or more read-to-write ratio)
  • Complex reporting requirements
  • Need event sourcing for writes but fast queries
  • Can tolerate eventual consistency (reads lag writes by milliseconds)

When this pattern may not be needed:

  • Simple CRUD (reads and writes similar)
  • Real-time consistency required (no lag acceptable)
  • Very low volume (< 100 requests/day)
  • No complex queries needed

Forces

Competing concerns:

1. Write Integrity vs Read Speed - Integrity = event sourcing (slow reads) - Speed = denormalized (no audit trail) - Balance: CQRS gives BOTH

2. Consistency vs Performance - Strong consistency = wait for read model update - Eventual consistency = return immediately, update later - Balance: Eventual (milliseconds lag acceptable)

3. Simplicity vs Optimization - Single model = simple, compromised performance - Dual model = complex, optimized performance - Balance: CQRS worth it for high-traffic systems

4. One Read Model vs Many - One model = simple, may be slow for some queries - Many models = optimized for each use case, more complexity - Balance: Start with one, add as needed

5. Sync vs Async Updates - Sync = immediate consistency, slower writes - Async = fast writes, slight lag in reads - Balance: Async (milliseconds lag fine)

Solution

Build CQRS system with:

1. Command Side (Writes)

// Commands change state
class EnrollFamilyCommand {
  familyId: string;
  familyName: string;
  enrollmentDate: Date;
}

// Command handler appends events
async function handleEnrollFamily(command) {
  // Validate
  if (!command.familyName) throw new Error('Name required');

  // Append event
  await eventStore.appendEvent(
    command.familyId,
    'Family',
    'FamilyEnrolled',
    {
      family_name: command.familyName,
      enrollment_date: command.enrollmentDate
    }
  );

  // Done! (Event will trigger read model update async)
  return {success: true, familyId: command.familyId};
}

2. Query Side (Reads)

// Queries retrieve from optimized read models
class GetFamilyQuery {
  familyId: string;
}

// Query handler reads from projection
async function handleGetFamily(query) {
  // Query denormalized read model (FAST!)
  const family = await db.query(`
    SELECT * FROM family_read_model WHERE family_id = ?
  `, [query.familyId]);

  return family[0];  // 2ms!
}

3. Projections (Event Handlers)

// Listen to events and update read models
eventStore.subscribe('FamilyEnrolled', async (event) => {
  // Update read model
  await db.query(`
    INSERT INTO family_read_model (
      family_id,
      family_name,
      enrollment_date,
      status,
      children_count,
      total_paid,
      engagement_score
    ) VALUES (?, ?, ?, 'enrolled', 0, 0, 0)
  `, [
    event.aggregate_id,
    event.event_data.family_name,
    event.event_data.enrollment_date
  ]);
});

eventStore.subscribe('PaymentReceived', async (event) => {
  // Update denormalized payment totals
  await db.query(`
    UPDATE family_read_model
    SET 
      total_paid = total_paid + ?,
      last_payment_date = ?,
      payment_count = payment_count + 1
    WHERE family_id = ?
  `, [
    event.event_data.amount,
    event.event_data.date,
    event.aggregate_id
  ]);
});

4. Multiple Read Models for Different Use Cases

// Read Model 1: Family detail (for dashboards)
family_read_model {
  family_id,
  family_name,
  enrollment_date,
  children_count,
  engagement_score,
  risk_score,
  total_paid,
  last_payment_date
}

// Read Model 2: At-risk families (for intervention lists)
at_risk_families_view {
  family_id,
  family_name,
  risk_score,
  risk_factors,  // JSON
  recommended_intervention,
  last_contact_date
}

// Read Model 3: Payment analytics (for CFO reports)
payment_analytics_view {
  month,
  families_paid,
  families_overdue,
  total_revenue,
  avg_payment_days,
  payment_method_breakdown  // JSON
}

5. Read Model Rebuilding

// Rebuild read model from events (if corrupted or new model added)
async function rebuildReadModel(modelName) {
  console.log(`Rebuilding ${modelName}...`);

  // Clear existing
  await db.query(`TRUNCATE TABLE ${modelName}`);

  // Replay ALL events
  const events = await eventStore.getAllEvents();

  for (const event of events) {
    await applyEventToReadModel(event, modelName);
  }

  console.log(`${modelName} rebuilt from ${events.length} events`);
}

Structure

CQRS Tables

-- READ MODEL 1: Family details (denormalized for speed)
CREATE TABLE family_read_model (
  family_id VARCHAR(200) PRIMARY KEY,

  -- Basic info
  family_name VARCHAR(200),
  enrollment_date DATE,
  status VARCHAR(50),

  -- Denormalized child info
  children_count INT DEFAULT 0,
  children_names NVARCHAR(MAX),  -- JSON array
  oldest_child_age INT,

  -- Denormalized payment info
  total_paid DECIMAL(10,2) DEFAULT 0,
  payment_count INT DEFAULT 0,
  last_payment_date DATE,
  avg_payment_days_late INT,

  -- Denormalized engagement
  engagement_score INT DEFAULT 0,
  last_login_date DATE,
  volunteer_hours INT DEFAULT 0,
  event_attendance_rate DECIMAL(5,2),

  -- Denormalized risk
  risk_score INT DEFAULT 0,
  risk_tier VARCHAR(50),
  risk_factors NVARCHAR(MAX),  -- JSON

  -- Meta
  last_updated DATETIME2 DEFAULT GETDATE(),
  event_version BIGINT  -- Which event version is this projection at?
);

-- Indexes for fast queries
CREATE INDEX IX_family_status ON family_read_model(status);
CREATE INDEX IX_family_risk ON family_read_model(risk_tier, risk_score DESC);
CREATE INDEX IX_family_engagement ON family_read_model(engagement_score DESC);

-- READ MODEL 2: At-risk families (optimized for intervention dashboard)
CREATE TABLE at_risk_families_view (
  family_id VARCHAR(200) PRIMARY KEY,
  family_name VARCHAR(200),

  -- Risk
  risk_score INT,
  risk_tier VARCHAR(50),
  risk_factors NVARCHAR(MAX),  -- JSON: detailed risk breakdown

  -- Intervention
  recommended_intervention VARCHAR(200),
  intervention_priority INT,
  days_until_predicted_withdrawal INT,

  -- Contact
  last_contact_date DATE,
  last_contact_method VARCHAR(50),
  days_since_contact INT,
  preferred_contact_channel VARCHAR(50),

  -- Coordinator
  assigned_coordinator VARCHAR(100),

  last_updated DATETIME2 DEFAULT GETDATE()
);

CREATE INDEX IX_atrisk_priority ON at_risk_families_view(intervention_priority DESC, risk_score DESC);

-- READ MODEL 3: Payment analytics (optimized for CFO reports)
CREATE TABLE payment_analytics_view (
  month DATE PRIMARY KEY,

  -- Volume
  families_active INT,
  families_paid_on_time INT,
  families_paid_late INT,
  families_overdue INT,

  -- Revenue
  revenue_collected DECIMAL(10,2),
  revenue_expected DECIMAL(10,2),
  revenue_outstanding DECIMAL(10,2),

  -- Timing
  avg_payment_days_late DECIMAL(5,2),
  median_payment_days_late INT,

  -- Methods
  payment_method_breakdown NVARCHAR(MAX),  -- JSON

  last_updated DATETIME2 DEFAULT GETDATE()
);

-- Projection checkpoints (track which events processed)
CREATE TABLE projection_checkpoints (
  projection_name VARCHAR(200) PRIMARY KEY,

  last_event_id UNIQUEIDENTIFIER,
  last_event_version BIGINT,
  last_event_timestamp DATETIME2,

  events_processed BIGINT,

  status VARCHAR(50) DEFAULT 'running',  -- 'running', 'rebuilding', 'stopped', 'error'
  last_error NVARCHAR(MAX),

  updated_date DATETIME2 DEFAULT GETDATE()
);

Implementation

Command Handler

class CommandHandler {
  constructor(eventStore) {
    this.eventStore = eventStore;
  }

  // Commands modify state via events
  async enrollFamily(command) {
    // Validation
    this.validateEnrollFamily(command);

    // Business logic
    const familyId = command.familyId || this.generateId();

    // Append event (write path)
    await this.eventStore.appendEvent(
      familyId,
      'Family',
      'FamilyEnrolled',
      {
        family_name: command.familyName,
        enrollment_date: command.enrollmentDate,
        parent_name: command.parentName,
        email: command.email,
        phone: command.phone
      },
      {
        user_id: command.userId,
        correlation_id: this.generateId()
      }
    );

    console.log(`Family enrolled: ${familyId}`);

    return {success: true, familyId};
  }

  async receivePayment(command) {
    this.validateReceivePayment(command);

    await this.eventStore.appendEvent(
      command.familyId,
      'Family',
      'PaymentReceived',
      {
        amount: command.amount,
        date: command.date,
        payment_method: command.paymentMethod,
        transaction_id: command.transactionId
      },
      {
        user_id: command.userId || 'system'
      }
    );

    console.log(`Payment received: ${command.familyId}, $${command.amount}`);

    return {success: true};
  }

  async updateEngagementScore(command) {
    await this.eventStore.appendEvent(
      command.familyId,
      'Family',
      'EngagementScoreUpdated',
      {
        old_score: command.oldScore,
        new_score: command.newScore,
        factors: command.factors
      },
      {
        user_id: 'system'
      }
    );

    return {success: true};
  }

  validateEnrollFamily(command) {
    if (!command.familyName) throw new Error('Family name required');
    if (!command.enrollmentDate) throw new Error('Enrollment date required');
  }

  validateReceivePayment(command) {
    if (!command.familyId) throw new Error('Family ID required');
    if (!command.amount || command.amount <= 0) throw new Error('Valid amount required');
  }

  generateId() {
    return require('crypto').randomUUID();
  }
}

module.exports = CommandHandler;

Query Handler

class QueryHandler {
  constructor(db) {
    this.db = db;
  }

  // Queries read from optimized read models
  async getFamily(familyId) {
    const result = await this.db.query(`
      SELECT * FROM family_read_model WHERE family_id = ?
    `, [familyId]);

    if (!result.length) {
      throw new Error('Family not found');
    }

    return this.deserializeFamily(result[0]);
  }

  async listAtRiskFamilies(minRiskScore = 70) {
    const result = await this.db.query(`
      SELECT *
      FROM at_risk_families_view
      WHERE risk_score >= ?
      ORDER BY intervention_priority DESC, risk_score DESC
    `, [minRiskScore]);

    return result;
  }

  async getFamiliesByRiskTier() {
    const result = await this.db.query(`
      SELECT 
        risk_tier,
        COUNT(*) as count,
        AVG(engagement_score) as avg_engagement,
        AVG(risk_score) as avg_risk
      FROM family_read_model
      WHERE status = 'enrolled'
      GROUP BY risk_tier
      ORDER BY 
        CASE risk_tier
          WHEN 'critical' THEN 1
          WHEN 'high' THEN 2
          WHEN 'medium' THEN 3
          WHEN 'low' THEN 4
        END
    `);

    return result;
  }

  async getPaymentAnalytics(startMonth, endMonth) {
    const result = await this.db.query(`
      SELECT *
      FROM payment_analytics_view
      WHERE month >= ? AND month <= ?
      ORDER BY month DESC
    `, [startMonth, endMonth]);

    return result;
  }

  async searchFamilies(searchTerm) {
    const result = await this.db.query(`
      SELECT *
      FROM family_read_model
      WHERE family_name LIKE ?
        OR children_names LIKE ?
      ORDER BY family_name
      LIMIT 50
    `, [`%${searchTerm}%`, `%${searchTerm}%`]);

    return result;
  }

  deserializeFamily(row) {
    return {
      ...row,
      children_names: JSON.parse(row.children_names || '[]'),
      risk_factors: JSON.parse(row.risk_factors || '{}')
    };
  }
}

module.exports = QueryHandler;

Projection Builder

class ProjectionBuilder {
  constructor(db, eventStore) {
    this.db = db;
    this.eventStore = eventStore;
    this.handlers = new Map();

    this.registerHandlers();
  }

  registerHandlers() {
    // Register event handlers for each read model
    this.handlers.set('FamilyEnrolled', this.handleFamilyEnrolled.bind(this));
    this.handlers.set('ChildAdded', this.handleChildAdded.bind(this));
    this.handlers.set('PaymentReceived', this.handlePaymentReceived.bind(this));
    this.handlers.set('EngagementScoreUpdated', this.handleEngagementScoreUpdated.bind(this));
    this.handlers.set('RiskScoreUpdated', this.handleRiskScoreUpdated.bind(this));
  }

  async start() {
    console.log('Projection Builder starting...');

    // Subscribe to events
    await this.eventStore.subscribe('*', async (event) => {
      await this.processEvent(event);
    });
  }

  async processEvent(event) {
    const handler = this.handlers.get(event.event_type);

    if (handler) {
      try {
        await handler(event);

        // Update checkpoint
        await this.updateCheckpoint('family_read_model', event);

      } catch (error) {
        console.error(`Error processing event ${event.event_type}:`, error);

        // Log error but don't stop processing
        await this.logProjectionError('family_read_model', event, error);
      }
    }
  }

  async handleFamilyEnrolled(event) {
    await this.db.query(`
      INSERT INTO family_read_model (
        family_id,
        family_name,
        enrollment_date,
        status,
        event_version
      ) VALUES (?, ?, ?, 'enrolled', ?)
    `, [
      event.aggregate_id,
      event.event_data.family_name,
      event.event_data.enrollment_date,
      event.aggregate_version
    ]);

    console.log(`Projection updated: FamilyEnrolled ${event.aggregate_id}`);
  }

  async handleChildAdded(event) {
    // Get current children
    const current = await this.db.query(`
      SELECT children_names, children_count FROM family_read_model
      WHERE family_id = ?
    `, [event.aggregate_id]);

    if (current.length === 0) return;

    const children = JSON.parse(current[0].children_names || '[]');
    children.push(event.event_data.child_name);

    await this.db.query(`
      UPDATE family_read_model
      SET 
        children_count = children_count + 1,
        children_names = ?,
        oldest_child_age = ?,
        event_version = ?
      WHERE family_id = ?
    `, [
      JSON.stringify(children),
      event.event_data.child_age,
      event.aggregate_version,
      event.aggregate_id
    ]);
  }

  async handlePaymentReceived(event) {
    await this.db.query(`
      UPDATE family_read_model
      SET 
        total_paid = total_paid + ?,
        payment_count = payment_count + 1,
        last_payment_date = ?,
        event_version = ?
      WHERE family_id = ?
    `, [
      event.event_data.amount,
      event.event_data.date,
      event.aggregate_version,
      event.aggregate_id
    ]);

    // Also update payment analytics view
    await this.updatePaymentAnalytics(event);
  }

  async handleEngagementScoreUpdated(event) {
    await this.db.query(`
      UPDATE family_read_model
      SET 
        engagement_score = ?,
        event_version = ?
      WHERE family_id = ?
    `, [
      event.event_data.new_score,
      event.aggregate_version,
      event.aggregate_id
    ]);
  }

  async handleRiskScoreUpdated(event) {
    await this.db.query(`
      UPDATE family_read_model
      SET 
        risk_score = ?,
        risk_tier = ?,
        risk_factors = ?,
        event_version = ?
      WHERE family_id = ?
    `, [
      event.event_data.new_score,
      event.event_data.risk_tier,
      JSON.stringify(event.event_data.risk_factors || {}),
      event.aggregate_version,
      event.aggregate_id
    ]);

    // Also update at-risk view if high risk
    if (event.event_data.new_score >= 70) {
      await this.updateAtRiskView(event);
    }
  }

  async updatePaymentAnalytics(event) {
    const month = new Date(event.event_data.date);
    month.setDate(1);  // First of month

    // Upsert monthly analytics
    await this.db.query(`
      MERGE INTO payment_analytics_view AS target
      USING (SELECT ? AS month) AS source
      ON target.month = source.month
      WHEN MATCHED THEN
        UPDATE SET
          families_paid_on_time = families_paid_on_time + 1,
          revenue_collected = revenue_collected + ?
      WHEN NOT MATCHED THEN
        INSERT (month, families_paid_on_time, revenue_collected)
        VALUES (?, 1, ?);
    `, [month, event.event_data.amount, month, event.event_data.amount]);
  }

  async updateAtRiskView(event) {
    // Complex logic to maintain at-risk view
    // (simplified for example)
    await this.db.query(`
      INSERT INTO at_risk_families_view (
        family_id, risk_score, risk_tier, risk_factors
      )
      SELECT family_id, risk_score, risk_tier, risk_factors
      FROM family_read_model
      WHERE family_id = ?
      ON DUPLICATE KEY UPDATE
        risk_score = VALUES(risk_score),
        risk_tier = VALUES(risk_tier),
        risk_factors = VALUES(risk_factors)
    `, [event.aggregate_id]);
  }

  async updateCheckpoint(projectionName, event) {
    await this.db.query(`
      MERGE INTO projection_checkpoints AS target
      USING (SELECT ? AS projection_name) AS source
      ON target.projection_name = source.projection_name
      WHEN MATCHED THEN
        UPDATE SET
          last_event_id = ?,
          last_event_version = ?,
          last_event_timestamp = ?,
          events_processed = events_processed + 1,
          updated_date = GETDATE()
      WHEN NOT MATCHED THEN
        INSERT (projection_name, last_event_id, last_event_version, last_event_timestamp, events_processed)
        VALUES (?, ?, ?, ?, 1);
    `, [
      projectionName,
      event.event_id,
      event.aggregate_version,
      event.event_timestamp,
      projectionName,
      event.event_id,
      event.aggregate_version,
      event.event_timestamp
    ]);
  }

  async logProjectionError(projectionName, event, error) {
    await this.db.query(`
      UPDATE projection_checkpoints
      SET 
        status = 'error',
        last_error = ?
      WHERE projection_name = ?
    `, [error.message, projectionName]);
  }

  // Rebuild entire read model from events
  async rebuildProjection(projectionName) {
    console.log(`Rebuilding projection: ${projectionName}...`);

    // Mark as rebuilding
    await this.db.query(`
      UPDATE projection_checkpoints
      SET status = 'rebuilding'
      WHERE projection_name = ?
    `, [projectionName]);

    // Clear read model
    await this.db.query(`TRUNCATE TABLE ${projectionName}`);

    // Replay all events
    const events = await this.eventStore.getAllEvents();

    for (const event of events) {
      await this.processEvent(event);
    }

    // Mark as complete
    await this.db.query(`
      UPDATE projection_checkpoints
      SET status = 'running'
      WHERE projection_name = ?
    `, [projectionName]);

    console.log(`Projection rebuilt: ${projectionName} (${events.length} events processed)`);
  }
}

module.exports = ProjectionBuilder;

Usage Example

const eventStore = new EventStore(db);
const commandHandler = new CommandHandler(eventStore);
const queryHandler = new QueryHandler(db);
const projectionBuilder = new ProjectionBuilder(db, eventStore);

// Start projection builder
await projectionBuilder.start();

// WRITE: Enroll family (command)
await commandHandler.enrollFamily({
  familyName: 'Martinez Family',
  enrollmentDate: '2024-09-01',
  parentName: 'Maria Martinez',
  email: 'maria@example.com',
  userId: 'sarah'
});
// Time: 1ms (just append event!)

// Event published async → Projection updated in background (10ms later)

// READ: Get family (query)
const family = await queryHandler.getFamily('family_123');
// Time: 2ms (query denormalized table!)
console.log(family.family_name, family.total_paid, family.engagement_score);

// READ: List at-risk families (complex query)
const atRisk = await queryHandler.listAtRiskFamilies(70);
// Time: 5ms (optimized view with indexes!)
console.log(`${atRisk.length} families at risk`);

// READ: Complex analytics
const analytics = await queryHandler.getFamiliesByRiskTier();
// Time: 8ms (pre-aggregated data!)
analytics.forEach(tier => {
  console.log(`${tier.risk_tier}: ${tier.count} families, avg engagement: ${tier.avg_engagement}`);
});

Variations

By Consistency Model

Eventual Consistency (Recommended): - Commands return immediately - Projections update async (milliseconds later) - Fastest, most scalable

Strong Consistency: - Commands wait for projection update - Slower but guarantees read-your-writes - Rarely needed

By Read Model Count

Single Read Model: - One denormalized view - Simple, covers most queries - Good starting point

Multiple Read Models: - Optimized view per use case - Dashboard view, report view, search view - Maximum performance, more complexity

Materialized Views: - Database-managed projections - Less code, but less control

By Rebuild Strategy

Event Replay: - Rebuild from event store - Complete, accurate - Slow for large systems

Snapshot + Events: - Restore from snapshot, replay recent events - Much faster - Requires snapshot management

Consequences

Benefits

1. Blazing read speed Queries: 2-10ms (vs seconds with event sourcing only).

2. Complex queries easy Denormalized = no JOINs, pre-computed aggregations.

3. Scale independently Reads and writes scale separately.

4. Multiple views Different read models for different use cases.

5. Write integrity Event sourcing on write side (perfect audit).

6. Query flexibility Add new read models without changing write side.

Costs

1. Eventual consistency Reads lag writes by milliseconds (usually acceptable).

2. Complexity More moving parts than single model.

3. Projection management Must keep read models in sync with events.

4. Storage duplication Events + read models = more storage.

5. Rebuild capability required Must be able to rebuild projections from events.

Sample Code

Performance comparison:

// Event Sourcing Only (SLOW)
console.time('event-sourcing-read');
const stateFromEvents = await eventStore.reconstructAggregate('family_123');
console.timeEnd('event-sourcing-read');
// event-sourcing-read: 2847ms (replayed 10,000 events!)

// CQRS Read Model (FAST)
console.time('cqrs-read');
const stateFromProjection = await queryHandler.getFamily('family_123');
console.timeEnd('cqrs-read');
// cqrs-read: 2ms (single query!)

// 1,423× FASTER! 🚀

Known Uses

Homeschool Co-Op Platform - Event-sourced writes (audit trail) - Denormalized read models (fast dashboards) - 2ms average query time

E-Commerce - Order commands append events - Product catalog read from denormalized views - Inventory projections for availability

Banking - Transaction events (immutable) - Account balance projections (fast lookup) - Regulatory reporting views

Social Media - Post/like/comment commands - Feed projections (denormalized) - Analytics views

Requires: - Pattern 27: Event Sourcing - write model

Enables: - Pattern 29: Real-Time Processing - process event streams - Pattern 30: Scalability - scale reads independently

Works With: - Pattern 26: Feedback Loop - projections feed learning

References

Academic Foundations

CQRS Frameworks

Read/Write Separation

Eventual Consistency

  • BASE: Pritchett, D. (2008). "BASE: An Acid Alternative." ACM Queue 6(3): 48-55. https://queue.acm.org/detail.cfm?id=1394128
  • CAP Theorem: Brewer, E. (2012). "CAP Twelve Years Later: How the 'Rules' Have Changed." IEEE Computer 45(2): 23-29.
  • Consistency Patterns: Kleppmann, M. (2017). Designing Data-Intensive Applications. O'Reilly. ISBN: 978-1449373320
  • Pattern 1: Universal Event Log - Commands create events
  • Pattern 27: Event Sourcing - CQRS often paired with event sourcing
  • Pattern 28: CQRS - Separate read and write models
  • Pattern 29: Real-Time Processing - Update read models in real-time
  • Pattern 30: Scalability Patterns - CQRS enables independent scaling
  • Volume 3, Pattern 17: State-Aware Behavior - Commands update state

Practical Implementation

Tools & Services