Volume 2: Organizational Intelligence Platforms

Pattern 29: Real-Time Processing

Intent

Process continuous streams of events in real-time with sub-second latency to enable live dashboards, instant alerts, real-time analytics, and immediate pattern detection, treating data as continuously flowing streams rather than static batches, creating systems that pulse with the heartbeat of organizational activity.

Also Known As

  • Stream Processing
  • Real-Time Analytics
  • Event Stream Processing
  • Continuous Processing
  • Live Data Processing

Problem

Batch processing creates blind spots. By the time you analyze yesterday's data, critical moments have already passed.

The "batch blindness" problem:

Traditional batch approach:

// Midnight: Run daily analytics
cron.schedule('0 0 * * *', async () => {
  console.log('Running daily engagement analysis...');

  const families = await getAllFamilies();
  for (const family of families) {
    const engagement = calculateEngagementScore(family);
    await saveEngagementScore(family.id, engagement);
  }

  console.log('Analysis complete!');
});

Timeline: - 9am Monday: Chen family engagement starts dropping (from 85 to 65) - 3pm Monday: Continues dropping (now 45 - CRITICAL!) - 11:59pm Monday: Still dropping (now 30 - emergency!) - 12:00am Tuesday: Batch job FINALLY detects low engagement - 9am Tuesday: Sarah sees alert, reaches out - Chen family has already decided to withdraw 😱

Problem: 15 hours of critical decline went UNNOTICED! By the time batch processing ran, it was too late!

The "stock market" analogy:

Imagine trading stocks with only YESTERDAY's prices: - Monday: Stock at $100 - Tuesday morning: Stock actually at $120 - But you're looking at Monday's close: $100 - You make decisions on OLD DATA - You're always one day behind! 📉😱

Real stock traders need REAL-TIME: - See price changes SECOND by SECOND - React INSTANTLY to market movements - Make decisions on CURRENT data, not yesterday's! 📈⚡

Same for organizational intelligence!

The "medical monitoring" problem:

Batch approach (checking vitals once per day):

6am: Patient's heart rate normal (72 bpm)
8am: Heart rate spikes (145 bpm - DANGER!)
10am: Still elevated (152 bpm - CRITICAL!)
12pm: Patient in distress
6pm: Nurse checks vitals - discovers crisis 10 HOURS LATE!

Real-time approach (continuous EKG monitoring):

6:00am: Heart rate 72 bpm ✓
8:01am: Heart rate 145 bpm - ALERT! 🚨
8:02am: Nurse alerted, responds immediately
8:05am: Crisis averted!

The EKG doesn't wait for scheduled checks - it MONITORS CONTINUOUSLY! 🫀⚡

The "engagement cliff" scenario:

Martinez family engagement timeline (Monday):

9:00am: Engagement score 78 (healthy)
10:30am: Drops to 68 (concerning)
12:00pm: Drops to 52 (at-risk)
2:30pm: Drops to 38 (critical!)
4:00pm: Drops to 25 (emergency!)

Batch processing (runs midnight): - Detects problem at 12:00am Tuesday - Sarah acts 9am Tuesday - 24 hours after crisis started! 😱

Real-time processing: - Detects drop at 12:05pm Monday (5 min after crossing 50) - Sarah alerted immediately - Reaches out at 12:30pm Monday - Crisis caught early, family saved!

The "payment fraud" problem:

Martinez family's credit card stolen:

9:00am: Normal purchase $15
9:05am: Normal purchase $32
9:10am: FRAUDULENT charge $450 (duplicate payment!)
9:11am: FRAUDULENT charge $450 (another!)
9:12am: FRAUDULENT charge $450 (third!)

Batch processing (midnight): - Detects fraud at midnight - 3 fraudulent charges already processed - $1,350 stolen! 😱

Real-time processing: - Detects duplicate at 9:10:30am (30 seconds after first) - Blocks subsequent charges - Alerts family immediately - Only $450 exposed, $900 prevented!

What we need: Real-time stream processing

Continuous processing:

Events flow → Process immediately → Act within seconds
(not batch at midnight → act hours/days later)

Real-time characteristics:

1. Sub-second latency: - Event arrives at 9:10:00am - Processed by 9:10:00.5am (500ms) - Alert fired by 9:10:01am (1 second total)

2. Continuous monitoring: - Not once per day (batch) - EVERY event, as it happens - Like EKG: continuous heartbeat monitoring

3. Stateful processing: - Track state across events - "Engagement dropped from 78 to 38" (knows previous value) - "Third duplicate payment in 2 minutes" (counts occurrences)

4. Windowed aggregations: - "5 payments in last 10 minutes" (sliding window) - "Hourly engagement average" (tumbling window) - "Session: time between login and logout" (session window)

5. Pattern detection: - "Engagement dropping rapidly" (trend detection) - "Unusual payment pattern" (anomaly detection) - "Three emails bounced in a row" (sequence detection)

Without real-time processing: - Blind to current state (only see yesterday) - React hours/days late (crisis already happened) - Miss critical patterns (detected after damage done) - Can't prevent problems (only clean up after)

With real-time processing: - See current state (live data, like stock ticker) - React within seconds (catch problems early) - Detect patterns immediately (as they emerge) - Prevent problems (intervene before damage)

Context

When this pattern applies:

  • Time-sensitive decisions (minutes/seconds matter)
  • Need to detect patterns as they emerge
  • High event volume (thousands per second)
  • Critical to prevent problems (not just react after)
  • Real-time dashboards/monitoring needed
  • Value of information decays rapidly

When this pattern may not be needed:

  • Batch acceptable (daily/weekly fine)
  • Low event volume (< 100/day)
  • Historical analysis only (not real-time action)
  • No time-sensitive decisions

Forces

Competing concerns:

1. Real-Time vs Accuracy - Real-time = process immediately (may be incomplete) - Accurate = wait for all data (delayed) - Balance: Accept eventual consistency

2. Latency vs Throughput - Low latency = process each event immediately - High throughput = batch events for efficiency - Balance: Micro-batching (100ms windows)

3. Complexity vs Capability - Simple = easy to understand but limited - Complex = powerful but hard to maintain - Balance: Start simple, add sophistication as needed

4. Stateless vs Stateful - Stateless = simple, scalable - Stateful = powerful but harder to scale - Balance: Stateful where needed, stateless elsewhere

5. Memory vs Persistence - In-memory = fast but volatile - Persistent = durable but slower - Balance: Memory for hot data, persist for durability

Solution

Build real-time stream processing with:

1. Event Streams

Events flow continuously (like stock ticker):

EventStream = {
  topic: 'family_events',
  events: [
    {timestamp: '9:00:00.000', type: 'PaymentReceived', family_id: 123, amount: 450},
    {timestamp: '9:00:01.234', type: 'EngagementUpdated', family_id: 456, score: 72},
    {timestamp: '9:00:02.567', type: 'EmailBounced', family_id: 789, email: '...'},
    // ... continuous stream, never ends
  ]
}

2. Stream Processing Pipeline

Events → Filter → Transform → Aggregate → Detect Patterns → Trigger Actions

Example: Engagement monitoring

eventStream
  .filter(e => e.type === 'EngagementUpdated')
  .window(sliding(5, 'minutes'))
  .map(events => ({
    family_id: events[0].family_id,
    current_score: events[events.length - 1].score,
    previous_score: events[0].score,
    drop: events[0].score - events[events.length - 1].score
  }))
  .filter(result => result.drop > 30)  // Dropped 30+ points in 5 min
  .forEach(alert => {
    console.log(`ALERT: Family ${alert.family_id} engagement dropped ${alert.drop} points!`);
    triggerIntervention(alert.family_id);
  });

3. Window Types

Tumbling Window (non-overlapping):

[---- 5 min ----][---- 5 min ----][---- 5 min ----]
9:00-9:05        9:05-9:10        9:10-9:15

Use for: Hourly summaries, daily counts

Sliding Window (overlapping):

[---- 5 min ----]
   [---- 5 min ----]
      [---- 5 min ----]

Use for: Moving averages, trend detection

Session Window (activity-based):

[-- events --] gap [-- events --] gap [-- events --]
(session 1)         (session 2)        (session 3)

Use for: User sessions, conversation tracking

4. Stateful Stream Processing

Track state across events:

// Stateful: Count events per family
const familyEventCounts = new Map();

eventStream.forEach(event => {
  const count = familyEventCounts.get(event.family_id) || 0;
  familyEventCounts.set(event.family_id, count + 1);

  if (count + 1 >= 5) {
    console.log(`Family ${event.family_id} hit 5 events!`);
  }
});

5. Complex Event Processing (CEP)

Detect patterns across multiple events:

// Pattern: "Payment overdue → Reminder sent → No response in 48h"
pattern([
  {type: 'PaymentOverdue', alias: 'overdue'},
  {type: 'ReminderSent', within: '1 day', after: 'overdue'},
  {type: 'PaymentReceived', NOT: true, within: '48 hours', after: 'reminder'}
])
.forEach(match => {
  console.log(`Pattern matched for family ${match.overdue.family_id}`);
  escalateIntervention(match.overdue.family_id);
});

6. Real-Time Dashboards

Push updates to live dashboards:

// Every time engagement updates, push to dashboard
eventStream
  .filter(e => e.type === 'EngagementUpdated')
  .forEach(event => {
    // Push to websocket clients
    dashboardSocket.emit('engagement-update', {
      family_id: event.family_id,
      score: event.score,
      timestamp: event.timestamp
    });
  });

// Dashboard updates in REAL-TIME! Like stock ticker! 📈

Structure

Stream Processing Tables

-- Real-time aggregations (updated continuously)
CREATE TABLE realtime_family_metrics (
  family_id VARCHAR(200) PRIMARY KEY,

  -- Last values (point-in-time)
  current_engagement_score INT,
  current_risk_score INT,

  -- Sliding window metrics (last 7 days)
  events_last_7_days INT,
  logins_last_7_days INT,
  payments_last_7_days INT,

  -- Moving averages
  engagement_7_day_avg DECIMAL(5,2),
  engagement_30_day_avg DECIMAL(5,2),

  -- Trends
  engagement_trend VARCHAR(50),  -- 'rising', 'stable', 'declining'
  engagement_change_7_days INT,

  -- Alerts
  alert_level VARCHAR(50),  -- 'none', 'watch', 'warning', 'critical'
  alert_reason NVARCHAR(500),
  alert_since DATETIME2,

  -- Metadata
  last_updated DATETIME2 DEFAULT GETDATE(),
  events_processed BIGINT
);

-- Real-time event counters (windowed)
CREATE TABLE realtime_event_counters (
  counter_id INT PRIMARY KEY IDENTITY(1,1),

  family_id VARCHAR(200),
  event_type VARCHAR(100),

  -- Window
  window_start DATETIME2,
  window_end DATETIME2,
  window_type VARCHAR(50),  -- 'tumbling', 'sliding', 'session'

  -- Counts
  event_count INT,

  -- Aggregations
  sum_value DECIMAL(10,2),
  avg_value DECIMAL(10,2),
  min_value DECIMAL(10,2),
  max_value DECIMAL(10,2),

  last_updated DATETIME2 DEFAULT GETDATE()
);

-- Real-time pattern detections
CREATE TABLE realtime_pattern_detections (
  detection_id INT PRIMARY KEY IDENTITY(1,1),

  pattern_name VARCHAR(200),
  family_id VARCHAR(200),

  -- Pattern details
  pattern_events NVARCHAR(MAX),  -- JSON: events that matched pattern
  confidence DECIMAL(5,2),

  detected_at DATETIME2 DEFAULT GETDATE(),

  -- Response
  action_taken BIT DEFAULT 0,
  action_type VARCHAR(100),
  action_result NVARCHAR(1000)
);

-- Stream processing checkpoints
CREATE TABLE stream_checkpoints (
  stream_name VARCHAR(200) PRIMARY KEY,

  last_offset BIGINT,
  last_event_id UNIQUEIDENTIFIER,
  last_event_timestamp DATETIME2,

  events_processed_total BIGINT,

  processing_lag_seconds INT,  -- How far behind real-time?

  status VARCHAR(50) DEFAULT 'running',  -- 'running', 'stopped', 'error'
  last_error NVARCHAR(MAX),

  updated_date DATETIME2 DEFAULT GETDATE()
);

Implementation

Stream Processor

class StreamProcessor {
  constructor(eventStore, db) {
    this.eventStore = eventStore;
    this.db = db;
    this.processors = [];
    this.state = new Map();
  }

  async start() {
    console.log('Stream Processor starting...');

    // Subscribe to event stream
    await this.eventStore.subscribe('*', async (event) => {
      await this.processEvent(event);
    });

    console.log('Stream Processor running - processing events in real-time...');
  }

  async processEvent(event) {
    const startTime = Date.now();

    // Process through all registered processors
    for (const processor of this.processors) {
      if (processor.eventTypes.includes('*') || processor.eventTypes.includes(event.event_type)) {
        try {
          await processor.process(event, this.state);
        } catch (error) {
          console.error(`Processor ${processor.name} failed:`, error);
        }
      }
    }

    // Track processing latency
    const latency = Date.now() - startTime;

    if (latency > 100) {
      console.warn(`High latency: ${latency}ms for ${event.event_type}`);
    }
  }

  registerProcessor(processor) {
    this.processors.push(processor);
    console.log(`Registered processor: ${processor.name}`);
  }
}

// Engagement Trend Processor
class EngagementTrendProcessor {
  constructor(db) {
    this.db = db;
    this.name = 'EngagementTrendProcessor';
    this.eventTypes = ['EngagementScoreUpdated'];
  }

  async process(event, state) {
    const familyId = event.aggregate_id;
    const newScore = event.event_data.new_score;
    const oldScore = event.event_data.old_score;

    // Get recent history for trend analysis
    const history = await this.getRecentScores(familyId);
    history.push({score: newScore, timestamp: event.event_timestamp});

    // Calculate trend
    const trend = this.calculateTrend(history);
    const change7Days = this.calculateChange(history, 7);

    // Determine alert level
    const alertLevel = this.determineAlertLevel(newScore, trend, change7Days);

    // Update real-time metrics
    await this.db.query(`
      MERGE INTO realtime_family_metrics AS target
      USING (SELECT ? AS family_id) AS source
      ON target.family_id = source.family_id
      WHEN MATCHED THEN
        UPDATE SET
          current_engagement_score = ?,
          engagement_trend = ?,
          engagement_change_7_days = ?,
          alert_level = ?,
          last_updated = GETDATE(),
          events_processed = events_processed + 1
      WHEN NOT MATCHED THEN
        INSERT (family_id, current_engagement_score, engagement_trend, engagement_change_7_days, alert_level, events_processed)
        VALUES (?, ?, ?, ?, ?, 1);
    `, [
      familyId, newScore, trend, change7Days, alertLevel,
      familyId, newScore, trend, change7Days, alertLevel
    ]);

    // Fire alert if critical
    if (alertLevel === 'critical') {
      await this.fireAlert(familyId, newScore, trend, change7Days);
    }

    console.log(`Engagement trend updated: ${familyId} - Score: ${newScore}, Trend: ${trend}, Alert: ${alertLevel}`);
  }

  async getRecentScores(familyId) {
    // Get last 30 days of scores
    const results = await this.db.query(`
      SELECT 
        event_data,
        event_timestamp
      FROM events
      WHERE aggregate_id = ?
        AND event_type = 'EngagementScoreUpdated'
        AND event_timestamp >= DATEADD(DAY, -30, GETDATE())
      ORDER BY event_timestamp ASC
    `, [familyId]);

    return results.map(r => ({
      score: JSON.parse(r.event_data).new_score,
      timestamp: r.event_timestamp
    }));
  }

  calculateTrend(history) {
    if (history.length < 2) return 'stable';

    // Simple linear regression
    const recent = history.slice(-7);  // Last 7 data points
    const slope = this.linearRegression(recent);

    if (slope > 2) return 'rising';
    if (slope < -2) return 'declining';
    return 'stable';
  }

  linearRegression(data) {
    const n = data.length;
    let sumX = 0, sumY = 0, sumXY = 0, sumXX = 0;

    data.forEach((point, i) => {
      sumX += i;
      sumY += point.score;
      sumXY += i * point.score;
      sumXX += i * i;
    });

    return (n * sumXY - sumX * sumY) / (n * sumXX - sumX * sumX);
  }

  calculateChange(history, days) {
    if (history.length < 2) return 0;

    const cutoff = new Date(Date.now() - days * 24 * 60 * 60 * 1000);
    const recent = history.filter(h => new Date(h.timestamp) >= cutoff);

    if (recent.length < 2) return 0;

    return recent[recent.length - 1].score - recent[0].score;
  }

  determineAlertLevel(score, trend, change7Days) {
    if (score < 30 || change7Days < -40) return 'critical';
    if (score < 50 || change7Days < -25) return 'warning';
    if (score < 70 || change7Days < -10) return 'watch';
    return 'none';
  }

  async fireAlert(familyId, score, trend, change) {
    // Integration with Pattern 23 (Triggers)
    console.log(`🚨 CRITICAL ALERT: Family ${familyId} - Score: ${score}, Trend: ${trend}, Change: ${change}`);

    // Trigger immediate intervention
    const TriggerEngine = require('./trigger-engine');
    const triggers = new TriggerEngine(this.db);

    await triggers.handleEvent('engagement_critical', {
      family_id: familyId,
      engagement_score: score,
      trend: trend,
      change_7_days: change
    });
  }
}

// Payment Pattern Processor (detect fraud, duplicates)
class PaymentPatternProcessor {
  constructor(db) {
    this.db = db;
    this.name = 'PaymentPatternProcessor';
    this.eventTypes = ['PaymentReceived'];
    this.recentPayments = new Map();  // In-memory state
  }

  async process(event, state) {
    const familyId = event.aggregate_id;
    const amount = event.event_data.amount;
    const timestamp = event.event_timestamp;

    // Get recent payments for this family (from in-memory state)
    if (!this.recentPayments.has(familyId)) {
      this.recentPayments.set(familyId, []);
    }

    const payments = this.recentPayments.get(familyId);
    payments.push({amount, timestamp});

    // Keep only last 10 minutes
    const tenMinutesAgo = new Date(Date.now() - 10 * 60 * 1000);
    const recent = payments.filter(p => new Date(p.timestamp) >= tenMinutesAgo);
    this.recentPayments.set(familyId, recent);

    // Detect patterns

    // Pattern 1: Duplicate payments
    const duplicates = recent.filter(p => p.amount === amount);
    if (duplicates.length > 1) {
      await this.detectDuplicatePayment(familyId, amount, duplicates.length);
    }

    // Pattern 2: Rapid succession
    if (recent.length >= 3) {
      await this.detectRapidPayments(familyId, recent.length);
    }

    // Pattern 3: Unusual amount
    const avgAmount = await this.getAveragePayment(familyId);
    if (amount > avgAmount * 3) {
      await this.detectUnusualAmount(familyId, amount, avgAmount);
    }
  }

  async detectDuplicatePayment(familyId, amount, count) {
    console.log(`⚠️ Duplicate payment detected: Family ${familyId}, $${amount} × ${count}`);

    await this.db.query(`
      INSERT INTO realtime_pattern_detections (
        pattern_name,
        family_id,
        pattern_events,
        confidence
      ) VALUES ('duplicate_payment', ?, ?, 0.95)
    `, [
      familyId,
      JSON.stringify({amount, count, within_minutes: 10})
    ]);

    // Alert coordinator
    await this.alertCoordinator(familyId, 'Possible duplicate payment', 'high');
  }

  async detectRapidPayments(familyId, count) {
    console.log(`⚠️ Rapid payments detected: Family ${familyId}, ${count} payments in 10 minutes`);
  }

  async detectUnusualAmount(familyId, amount, avgAmount) {
    console.log(`⚠️ Unusual payment amount: Family ${familyId}, $${amount} (avg: $${avgAmount})`);
  }

  async getAveragePayment(familyId) {
    const result = await this.db.query(`
      SELECT AVG(CAST(JSON_VALUE(event_data, '$.amount') AS DECIMAL(10,2))) as avg_amount
      FROM events
      WHERE aggregate_id = ?
        AND event_type = 'PaymentReceived'
    `, [familyId]);

    return result[0]?.avg_amount || 450;
  }

  async alertCoordinator(familyId, message, priority) {
    // Send real-time alert
    console.log(`📧 Coordinator alert: ${message} (${priority})`);
  }
}

// Windowed Aggregation Processor
class WindowedAggregationProcessor {
  constructor(db) {
    this.db = db;
    this.name = 'WindowedAggregationProcessor';
    this.eventTypes = ['*'];
    this.windows = new Map();
  }

  async process(event, state) {
    const familyId = event.aggregate_id;
    const windowKey = `${familyId}_7days`;

    // Sliding 7-day window
    if (!this.windows.has(windowKey)) {
      this.windows.set(windowKey, []);
    }

    const window = this.windows.get(windowKey);
    window.push(event);

    // Remove events older than 7 days
    const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
    const current = window.filter(e => new Date(e.event_timestamp) >= sevenDaysAgo);
    this.windows.set(windowKey, current);

    // Aggregate
    const eventCount = current.length;
    const loginCount = current.filter(e => e.event_type === 'UserLogin').length;
    const paymentCount = current.filter(e => e.event_type === 'PaymentReceived').length;

    // Update real-time metrics
    await this.db.query(`
      UPDATE realtime_family_metrics
      SET 
        events_last_7_days = ?,
        logins_last_7_days = ?,
        payments_last_7_days = ?,
        last_updated = GETDATE()
      WHERE family_id = ?
    `, [eventCount, loginCount, paymentCount, familyId]);
  }
}

module.exports = {
  StreamProcessor,
  EngagementTrendProcessor,
  PaymentPatternProcessor,
  WindowedAggregationProcessor
};

Real-Time Dashboard Integration

// WebSocket server for real-time dashboard
const WebSocket = require('ws');

class RealtimeDashboard {
  constructor(streamProcessor) {
    this.wss = new WebSocket.Server({port: 8080});
    this.streamProcessor = streamProcessor;
    this.clients = new Set();

    this.setupWebSocket();
    this.setupStreamHandlers();
  }

  setupWebSocket() {
    this.wss.on('connection', (ws) => {
      console.log('Dashboard client connected');
      this.clients.add(ws);

      ws.on('close', () => {
        this.clients.delete(ws);
      });
    });
  }

  setupStreamHandlers() {
    // Register processor to push updates to dashboard
    const dashboardProcessor = {
      name: 'DashboardProcessor',
      eventTypes: ['*'],
      process: async (event) => {
        // Push to all connected clients
        this.broadcast({
          type: 'event',
          event_type: event.event_type,
          family_id: event.aggregate_id,
          timestamp: event.event_timestamp,
          data: event.event_data
        });
      }
    };

    this.streamProcessor.registerProcessor(dashboardProcessor);
  }

  broadcast(message) {
    const data = JSON.stringify(message);

    this.clients.forEach(client => {
      if (client.readyState === WebSocket.OPEN) {
        client.send(data);
      }
    });
  }
}

// Usage
const streamProcessor = new StreamProcessor(eventStore, db);
const dashboard = new RealtimeDashboard(streamProcessor);

// Dashboard receives updates in REAL-TIME! Like stock ticker! 📈

Usage Example

// Set up stream processing
const processor = new StreamProcessor(eventStore, db);

// Register processors
processor.registerProcessor(new EngagementTrendProcessor(db));
processor.registerProcessor(new PaymentPatternProcessor(db));
processor.registerProcessor(new WindowedAggregationProcessor(db));

// Start processing
await processor.start();

// Now events are processed in REAL-TIME!

// Example event flow:
// 9:00:00.000 - PaymentReceived event arrives
// 9:00:00.025 - PaymentPatternProcessor detects duplicate (25ms later!)
// 9:00:00.050 - Alert fired to coordinator (50ms total!)
// 9:00:00.100 - Dashboard updated via WebSocket (100ms total!)

// SUB-SECOND END-TO-END LATENCY! ⚡

Variations

By Processing Model

Stream Processing: - Process events one-by-one - Immediate results - Complex stateful operations

Micro-Batching: - Collect events for 100-500ms - Process in small batches - Balance latency and throughput

Hybrid: - Critical: stream - Analytics: micro-batch

By State Management

Stateless: - Each event processed independently - Easy to scale - Limited capability

Stateful: - Track state across events - Powerful (trends, patterns) - Harder to scale

Externalized State: - Store state in Redis/DB - Scalable and fault-tolerant

By Windowing

Time Windows: - Last 5 minutes, last hour - Fixed duration

Count Windows: - Last 100 events - Fixed count

Session Windows: - Activity-based - Ends after inactivity gap

Consequences

Benefits

1. Immediate awareness Know what's happening NOW, not yesterday.

2. Prevent problems Catch issues early (before damage).

3. Real-time dashboards Live updates like stock ticker.

4. Pattern detection Detect trends as they emerge.

5. Sub-second response Alert within seconds of event.

6. Always current No stale data.

Costs

1. Infrastructure complexity More complex than batch.

2. State management Stateful processing harder to scale.

3. Resource intensive Continuous processing uses more resources.

4. Eventual consistency May see partial/incomplete data.

5. Monitoring required Need to track stream health.

Sample Code

Measure processing latency:

function measureLatency(event) {
  const eventTime = new Date(event.event_timestamp);
  const processTime = new Date();
  const latency = processTime - eventTime;

  console.log(`Latency: ${latency}ms`);

  if (latency > 1000) {
    console.warn(`High latency! ${latency}ms > 1 second`);
  }
}

Known Uses

Stock Trading - Real-time price updates - Pattern detection (technical analysis) - Sub-millisecond alerts

Medical Monitoring - Continuous EKG/vitals - Immediate anomaly detection - Life-critical alerts

Fraud Detection - Real-time transaction monitoring - Pattern recognition - Instant blocking

Social Media - Live feeds - Trending topics - Real-time engagement metrics

Builds On: - Pattern 27: Event Sourcing - events to process - Pattern 28: CQRS - projections

Enables: - Pattern 23: Triggered Interventions - instant triggers - Pattern 26: Feedback Loop - real-time learning

Requires: - High-performance infrastructure - Scalable event store

References

Academic Foundations

  • Akidau, Tyler, et al. (2018). Streaming Systems: The What, Where, When, and How of Large-Scale Data Processing. O'Reilly. ISBN: 978-1491983874
  • Akidau, Tyler, et al. (2015). "The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing." VLDB 2015. http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf
  • Marz, Nathan, and James Warren (2015). Big Data: Principles and Best Practices of Scalable Realtime Data Systems. Manning. ISBN: 978-1617290343 - Lambda architecture
  • Kreps, Jay (2014). "Questioning the Lambda Architecture." https://www.oreilly.com/radar/questioning-the-lambda-architecture/ - Kappa architecture

Stream Processing Frameworks

Reactive Streams

Windowing & Time

  • Pattern 1: Universal Event Log - Real-time event ingestion
  • Pattern 27: Event Sourcing - Process event streams in real-time
  • Pattern 28: CQRS - Update read models in real-time
  • Pattern 29: Real-Time Processing - Stream processing architecture
  • Pattern 30: Scalability Patterns - Scale stream processing
  • Volume 3, Pattern 22: Real-Time Lookup & Validation - Real-time data access

Practical Implementation

Tools & Services