Volume 2: Organizational Intelligence Platforms

Pattern 21: Automated Workflow Execution

Intent

Enable systematic, reliable execution of multi-step intervention workflows triggered automatically by predictions, events, or conditions, with built-in error handling, retry logic, state management, and human oversight capabilities, ensuring actions happen at exactly the right time without requiring manual intervention.

Also Known As

  • Workflow Automation
  • Process Orchestration
  • Task Scheduling Engine
  • Intervention Automation
  • Action Pipeline

Problem

Predictions are useless if we don't act on them. But manual execution doesn't scale.

Sarah's co-op has built amazing intelligence: - Pattern 12 predicts Martinez family has 87% withdrawal risk in 32 days - Pattern 13 confirms high confidence (82/100) - Pattern 15 recommends: "Call family, discuss concerns, offer payment plan"

But then what?

Manual execution fails: - Sarah sees the recommendation Monday morning - Gets busy with parent calls, event planning - Forgets to call Martinez family - Remembers Friday afternoon (too late, family already decided to leave) - Same thing happens with 5 other families this month - Result: Great predictions, zero action, families still leave

The manual execution problems: 1. Forgotten tasks - Humans forget, especially when busy 2. Delayed action - "I'll do it later" becomes "I forgot to do it" 3. Inconsistent execution - Sarah handles it one way, Mike handles differently 4. No follow-up - First action happens, but follow-up steps forgotten 5. Scale failure - Can't personally manage 50+ at-risk families 6. Timing errors - Miss optimal intervention windows 7. No audit trail - Can't prove intervention was attempted 8. Context loss - By Friday, forgot why Martinez needed call

What we need: - Automatic trigger when prediction + confidence meet threshold - Multi-step workflow: Day 1 (email), Day 3 (text if no response), Day 7 (call) - State tracking: Which step are we on? What happened? - Error handling: Email bounced? Try phone instead - Retry logic: Call went to voicemail? Try again tomorrow - Human oversight: Flag for review if automation fails - Audit trail: Prove we attempted contact - Timing precision: Execute at optimal moments (not 2am!)

Without automated workflows: - Predictions gather dust - Manual execution inconsistent - Families slip through cracks - Can't scale beyond 20-30 families - No systematic follow-up - Miss intervention windows

With automated workflows: - Predictions trigger actions automatically - Consistent execution every time - Nothing forgotten - Scales to 500+ families - Multi-step sequences with perfect timing - Complete audit trail - Human oversight when needed

Context

When this pattern applies:

  • Have predictions/triggers that should cause actions
  • Actions follow predictable sequences (workflows)
  • Manual execution fails to scale or is inconsistent
  • Timing of actions matters
  • Need audit trail of what was done
  • Want to ensure nothing falls through cracks

When this pattern may not be needed:

  • Very small scale (<10 entities)
  • Actions are one-off, not repeatable
  • Human judgment required for every action
  • No predictable workflows

Forces

Competing concerns:

1. Automation vs Human Judgment - Full automation = fast, consistent, scalable - Human-in-loop = judgment, empathy, flexibility - Balance: Automate routine, flag complex for human

2. Rigid vs Flexible Workflows - Rigid = predictable, simple to implement - Flexible = handles exceptions, complex to build - Balance: Core path automated, escape hatches for exceptions

3. Immediate vs Scheduled Execution - Immediate = fast response - Scheduled = optimal timing, batching - Balance: Urgent actions immediate, routine scheduled

4. Simple vs Sophisticated State Management - Simple = easy to understand, hard to debug - Sophisticated = handles edge cases, complex - Balance: State machine for core logic, simple for add-ons

5. Fail-Fast vs Retry-Forever - Fail-fast = don't waste resources - Retry-forever = ensure delivery - Balance: Smart retries with exponential backoff + max attempts

Solution

Build comprehensive workflow execution engine with:

1. Workflow Definition

Workflow = {
  trigger: { event/condition that starts workflow },
  steps: [ ordered sequence of actions ],
  timing: { when to execute each step },
  retry_logic: { what to do if step fails },
  escalation: { when to involve human },
  completion_criteria: { when workflow is done }
}

2. State Machine

States: pending → running → paused → completed → failed → cancelled
Transitions: Based on step outcomes
Persistence: Track state in database

3. Execution Engine - Poll for workflows ready to execute - Execute steps in order - Handle errors gracefully - Track progress - Log everything

4. Timing Control - Respect user time zones - Avoid night/weekend (unless urgent) - Optimal send times per channel - Batch similar actions

5. Integration Points - Pattern 13 (Confidence) → Only automate high-confidence predictions - Pattern 14 (Time Windows) → Execute at optimal time - Pattern 15 (Recommendations) → Recommendations become workflow steps - Pattern 24 (Templates) → Use templates for communication steps

Structure

Workflow Management Tables

-- Define workflow templates
CREATE TABLE workflow_templates (
  template_id INT PRIMARY KEY IDENTITY(1,1),

  template_name VARCHAR(200) NOT NULL,
  description NVARCHAR(1000),

  -- Triggering
  trigger_type VARCHAR(100),  -- 'prediction', 'event', 'schedule', 'manual'
  trigger_conditions NVARCHAR(MAX),  -- JSON: conditions to start this workflow

  -- Execution
  steps NVARCHAR(MAX) NOT NULL,  -- JSON: ordered array of steps
  default_timing NVARCHAR(MAX),  -- JSON: timing for each step

  -- Control
  max_concurrent_instances INT DEFAULT 1,  -- Per family
  priority INT DEFAULT 5,  -- 1-10, higher = more urgent

  -- Retry/Error
  retry_strategy NVARCHAR(MAX),  -- JSON
  escalation_rules NVARCHAR(MAX),  -- JSON

  -- Metadata
  active BIT DEFAULT 1,
  created_date DATETIME2 DEFAULT GETDATE(),
  updated_date DATETIME2 DEFAULT GETDATE(),

  CONSTRAINT UQ_workflow_name UNIQUE (template_name)
);

-- Track workflow instances (actual executions)
CREATE TABLE workflow_instances (
  instance_id INT PRIMARY KEY IDENTITY(1,1),
  template_id INT NOT NULL,
  family_id INT NOT NULL,

  -- Context
  triggered_by VARCHAR(100),  -- 'prediction_id:12345', 'event_id:67890'
  trigger_date DATETIME2 DEFAULT GETDATE(),

  -- State
  status VARCHAR(50) DEFAULT 'pending',  -- 'pending', 'running', 'paused', 'completed', 'failed', 'cancelled'
  current_step INT DEFAULT 0,  -- Which step (0-based index)

  -- Timing
  started_at DATETIME2,
  completed_at DATETIME2,
  next_execution_at DATETIME2,  -- When to run next step

  -- Results
  outcome VARCHAR(50),  -- 'success', 'partial_success', 'failed', 'cancelled'
  outcome_notes NVARCHAR(MAX),

  -- Error handling
  error_count INT DEFAULT 0,
  last_error NVARCHAR(MAX),

  -- Metadata
  created_date DATETIME2 DEFAULT GETDATE(),

  CONSTRAINT FK_instance_template FOREIGN KEY (template_id)
    REFERENCES workflow_templates(template_id),
  CONSTRAINT FK_instance_family FOREIGN KEY (family_id)
    REFERENCES families(family_id)
);

-- Index for polling ready workflows
CREATE INDEX IX_workflow_ready ON workflow_instances(status, next_execution_at)
  WHERE status IN ('pending', 'running') AND next_execution_at IS NOT NULL;

-- Track individual step executions
CREATE TABLE workflow_step_executions (
  execution_id INT PRIMARY KEY IDENTITY(1,1),
  instance_id INT NOT NULL,

  step_number INT NOT NULL,
  step_name VARCHAR(200),
  step_type VARCHAR(100),  -- 'email', 'sms', 'call', 'task', 'wait', 'condition'

  -- Execution
  status VARCHAR(50) DEFAULT 'pending',  -- 'pending', 'running', 'completed', 'failed', 'skipped'

  started_at DATETIME2,
  completed_at DATETIME2,

  -- Input/Output
  input_data NVARCHAR(MAX),  -- JSON: what was sent to step
  output_data NVARCHAR(MAX),  -- JSON: result from step

  -- Results
  success BIT,
  error_message NVARCHAR(MAX),
  retry_count INT DEFAULT 0,

  created_date DATETIME2 DEFAULT GETDATE(),

  CONSTRAINT FK_step_instance FOREIGN KEY (instance_id)
    REFERENCES workflow_instances(instance_id)
);

-- Queue for workflow execution (decouples triggering from execution)
CREATE TABLE workflow_queue (
  queue_id INT PRIMARY KEY IDENTITY(1,1),
  instance_id INT NOT NULL,

  scheduled_for DATETIME2 NOT NULL,
  priority INT DEFAULT 5,

  status VARCHAR(50) DEFAULT 'queued',  -- 'queued', 'processing', 'completed', 'failed'

  attempts INT DEFAULT 0,
  max_attempts INT DEFAULT 3,
  last_attempt_at DATETIME2,

  created_date DATETIME2 DEFAULT GETDATE(),

  CONSTRAINT FK_queue_instance FOREIGN KEY (instance_id)
    REFERENCES workflow_instances(instance_id)
);

CREATE INDEX IX_queue_ready ON workflow_queue(status, scheduled_for, priority)
  WHERE status = 'queued';

Implementation

Workflow Engine

class WorkflowEngine {
  constructor(db, config = {}) {
    this.db = db;
    this.pollingInterval = config.pollingInterval || 60000;  // 1 minute
    this.maxConcurrent = config.maxConcurrent || 10;
    this.running = false;
    this.activeExecutions = new Set();
  }

  async start() {
    console.log('Workflow Engine starting...');
    this.running = true;
    this.poll();
  }

  async stop() {
    console.log('Workflow Engine stopping...');
    this.running = false;
  }

  async poll() {
    while (this.running) {
      try {
        // Don't overload system
        if (this.activeExecutions.size < this.maxConcurrent) {
          await this.executeReadyWorkflows();
        }

        await this.sleep(this.pollingInterval);
      } catch (error) {
        console.error('Polling error:', error);
        await this.sleep(this.pollingInterval);
      }
    }
  }

  async executeReadyWorkflows() {
    // Get workflows ready to execute
    const ready = await this.db.query(`
      SELECT TOP (?) 
        wi.instance_id,
        wi.template_id,
        wi.family_id,
        wi.current_step,
        wt.steps,
        wt.retry_strategy
      FROM workflow_instances wi
      JOIN workflow_templates wt ON wi.template_id = wt.template_id
      WHERE wi.status IN ('pending', 'running')
        AND wi.next_execution_at <= GETDATE()
      ORDER BY wi.next_execution_at ASC
    `, [this.maxConcurrent - this.activeExecutions.size]);

    for (const workflow of ready) {
      // Execute async without blocking
      this.executeWorkflow(workflow).catch(err => {
        console.error(`Workflow ${workflow.instance_id} failed:`, err);
      });
    }
  }

  async executeWorkflow(workflow) {
    const instanceId = workflow.instance_id;

    // Mark as executing
    this.activeExecutions.add(instanceId);

    try {
      // Update status
      await this.db.query(`
        UPDATE workflow_instances
        SET status = 'running', started_at = CASE WHEN started_at IS NULL THEN GETDATE() ELSE started_at END
        WHERE instance_id = ?
      `, [instanceId]);

      // Parse steps
      const steps = JSON.parse(workflow.steps);
      const currentStepIndex = workflow.current_step;

      if (currentStepIndex >= steps.length) {
        // Workflow complete
        await this.completeWorkflow(instanceId, 'success');
        return;
      }

      const step = steps[currentStepIndex];

      // Execute step
      const result = await this.executeStep(instanceId, currentStepIndex, step, workflow);

      if (result.success) {
        // Move to next step
        const nextStep = currentStepIndex + 1;
        const nextExecutionAt = this.calculateNextExecutionTime(steps, nextStep, workflow);

        await this.db.query(`
          UPDATE workflow_instances
          SET 
            current_step = ?,
            next_execution_at = ?,
            error_count = 0
          WHERE instance_id = ?
        `, [nextStep, nextExecutionAt, instanceId]);

      } else {
        // Step failed - apply retry logic
        await this.handleStepFailure(instanceId, step, result.error, workflow);
      }

    } catch (error) {
      console.error(`Workflow ${instanceId} execution error:`, error);
      await this.handleWorkflowError(instanceId, error);

    } finally {
      this.activeExecutions.delete(instanceId);
    }
  }

  async executeStep(instanceId, stepIndex, step, workflow) {
    console.log(`Executing step ${stepIndex} of workflow ${instanceId}: ${step.type}`);

    // Create step execution record
    const execResult = await this.db.query(`
      INSERT INTO workflow_step_executions (
        instance_id, step_number, step_name, step_type, status, started_at
      ) 
      OUTPUT INSERTED.execution_id
      VALUES (?, ?, ?, ?, 'running', GETDATE())
    `, [instanceId, stepIndex, step.name, step.type]);

    const executionId = execResult[0].execution_id;

    try {
      // Get context (family data, etc.)
      const context = await this.getExecutionContext(workflow.family_id);

      // Execute based on step type
      let result;
      switch (step.type) {
        case 'email':
          result = await this.executeEmailStep(step, context);
          break;
        case 'sms':
          result = await this.executeSmsStep(step, context);
          break;
        case 'call':
          result = await this.executeCallStep(step, context);
          break;
        case 'task':
          result = await this.executeTaskStep(step, context);
          break;
        case 'wait':
          result = await this.executeWaitStep(step, context);
          break;
        case 'condition':
          result = await this.executeConditionStep(step, context);
          break;
        default:
          throw new Error(`Unknown step type: ${step.type}`);
      }

      // Update step execution
      await this.db.query(`
        UPDATE workflow_step_executions
        SET 
          status = 'completed',
          completed_at = GETDATE(),
          success = 1,
          output_data = ?
        WHERE execution_id = ?
      `, [JSON.stringify(result), executionId]);

      return { success: true, result };

    } catch (error) {
      // Update step execution with error
      await this.db.query(`
        UPDATE workflow_step_executions
        SET 
          status = 'failed',
          completed_at = GETDATE(),
          success = 0,
          error_message = ?
        WHERE execution_id = ?
      `, [error.message, executionId]);

      return { success: false, error: error.message };
    }
  }

  async executeEmailStep(step, context) {
    // Send email using Pattern 24 (Template-Based Communication)
    const emailService = require('./email-service');

    const result = await emailService.send({
      to: context.family.email,
      template: step.template_id,
      variables: {
        family_name: context.family.family_name,
        coordinator_name: 'Sarah',
        ...step.template_vars
      }
    });

    return result;
  }

  async executeSmsStep(step, context) {
    const smsService = require('./sms-service');

    return await smsService.send({
      to: context.family.phone,
      message: this.renderTemplate(step.message, context)
    });
  }

  async executeCallStep(step, context) {
    // Create task for coordinator to call
    await this.db.query(`
      INSERT INTO coordinator_tasks (
        family_id,
        task_type,
        description,
        due_date,
        priority
      ) VALUES (?, 'phone_call', ?, DATEADD(DAY, 1, GETDATE()), 'high')
    `, [context.family.family_id, `Call ${context.family.family_name} - ${step.reason}`]);

    return { task_created: true };
  }

  async executeTaskStep(step, context) {
    // Generic task creation
    await this.db.query(`
      INSERT INTO coordinator_tasks (
        family_id,
        task_type,
        description,
        due_date
      ) VALUES (?, ?, ?, ?)
    `, [context.family.family_id, step.task_type, step.description, step.due_date]);

    return { task_created: true };
  }

  async executeWaitStep(step, context) {
    // Just wait - next execution time will be set by calculateNextExecutionTime
    return { waited: step.duration_days };
  }

  async executeConditionStep(step, context) {
    // Evaluate condition to determine next step
    const condition = step.condition;
    const result = this.evaluateCondition(condition, context);

    return { 
      condition_met: result,
      next_step: result ? step.if_true_step : step.if_false_step
    };
  }

  evaluateCondition(condition, context) {
    // Simple condition evaluation (could be more sophisticated)
    // Example: { field: 'engagement_score', operator: '>', value: 50 }

    const value = context[condition.field];

    switch (condition.operator) {
      case '>': return value > condition.value;
      case '<': return value < condition.value;
      case '==': return value == condition.value;
      case '>=': return value >= condition.value;
      case '<=': return value <= condition.value;
      default: return false;
    }
  }

  calculateNextExecutionTime(steps, nextStepIndex, workflow) {
    if (nextStepIndex >= steps.length) {
      return null;  // Workflow complete
    }

    const nextStep = steps[nextStepIndex];

    // If step has explicit delay
    if (nextStep.delay_days) {
      const delay = new Date();
      delay.setDate(delay.getDate() + nextStep.delay_days);
      return this.adjustToBusinessHours(delay);
    }

    // If step has specific time
    if (nextStep.scheduled_time) {
      return new Date(nextStep.scheduled_time);
    }

    // Default: next available business hour
    return this.getNextBusinessHour();
  }

  adjustToBusinessHours(date) {
    // Don't send at night or weekends
    const hour = date.getHours();
    const day = date.getDay();

    // If weekend, move to Monday
    if (day === 0) date.setDate(date.getDate() + 1);  // Sunday -> Monday
    if (day === 6) date.setDate(date.getDate() + 2);  // Saturday -> Monday

    // If night (before 8am or after 6pm), move to 9am
    if (hour < 8 || hour >= 18) {
      date.setHours(9, 0, 0, 0);
      if (hour >= 18) {
        date.setDate(date.getDate() + 1);  // Move to tomorrow
      }
    }

    return date;
  }

  getNextBusinessHour() {
    const now = new Date();
    now.setMinutes(now.getMinutes() + 5);  // 5 minute buffer
    return this.adjustToBusinessHours(now);
  }

  async handleStepFailure(instanceId, step, error, workflow) {
    // Get retry strategy
    const retryStrategy = JSON.parse(workflow.retry_strategy || '{}');
    const maxRetries = retryStrategy.max_retries || 3;

    // Increment error count
    const result = await this.db.query(`
      UPDATE workflow_instances
      SET 
        error_count = error_count + 1,
        last_error = ?
      OUTPUT INSERTED.error_count
      WHERE instance_id = ?
    `, [error, instanceId]);

    const errorCount = result[0].error_count;

    if (errorCount >= maxRetries) {
      // Max retries exceeded - escalate or fail
      await this.escalateWorkflow(instanceId, `Step failed after ${errorCount} attempts: ${error}`);
    } else {
      // Schedule retry with exponential backoff
      const backoffMinutes = Math.pow(2, errorCount) * 5;  // 5, 10, 20, 40 minutes
      const retryAt = new Date();
      retryAt.setMinutes(retryAt.getMinutes() + backoffMinutes);

      await this.db.query(`
        UPDATE workflow_instances
        SET next_execution_at = ?
        WHERE instance_id = ?
      `, [retryAt, instanceId]);
    }
  }

  async escalateWorkflow(instanceId, reason) {
    // Flag for human review
    await this.db.query(`
      UPDATE workflow_instances
      SET 
        status = 'paused',
        outcome_notes = ?
      WHERE instance_id = ?
    `, [reason, instanceId]);

    // Create task for coordinator
    const workflow = await this.db.query(`
      SELECT family_id FROM workflow_instances WHERE instance_id = ?
    `, [instanceId]);

    await this.db.query(`
      INSERT INTO coordinator_tasks (
        family_id,
        task_type,
        description,
        priority
      ) VALUES (?, 'workflow_escalation', ?, 'high')
    `, [workflow[0].family_id, `Workflow ${instanceId} needs attention: ${reason}`]);
  }

  async completeWorkflow(instanceId, outcome) {
    await this.db.query(`
      UPDATE workflow_instances
      SET 
        status = 'completed',
        completed_at = GETDATE(),
        outcome = ?
      WHERE instance_id = ?
    `, [outcome, instanceId]);

    console.log(`Workflow ${instanceId} completed: ${outcome}`);
  }

  async handleWorkflowError(instanceId, error) {
    await this.db.query(`
      UPDATE workflow_instances
      SET 
        status = 'failed',
        completed_at = GETDATE(),
        outcome = 'error',
        outcome_notes = ?
      WHERE instance_id = ?
    `, [error.message, instanceId]);
  }

  async getExecutionContext(familyId) {
    const family = await this.db.query(`
      SELECT 
        f.*,
        fem.engagement_score,
        ra.withdrawal_risk
      FROM families f
      LEFT JOIN family_engagement_metrics fem ON f.family_id = fem.family_id
      LEFT JOIN risk_assessments ra ON f.family_id = ra.family_id
      WHERE f.family_id = ?
    `, [familyId]);

    return {
      family: family[0],
      engagement_score: family[0].engagement_score,
      withdrawal_risk: family[0].withdrawal_risk
    };
  }

  renderTemplate(template, context) {
    // Simple template rendering (replace {{variables}})
    return template.replace(/\{\{(\w+)\}\}/g, (match, key) => {
      return context[key] || match;
    });
  }

  sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

module.exports = WorkflowEngine;

Workflow Trigger

class WorkflowTrigger {
  constructor(db) {
    this.db = db;
  }

  async triggerFromPrediction(predictionId) {
    // Get prediction details
    const prediction = await this.db.query(`
      SELECT 
        p.family_id,
        p.predicted_probability,
        p.confidence_score,
        p.prediction_window_days
      FROM ml_predictions p
      WHERE p.prediction_id = ?
    `, [predictionId]);

    if (!prediction.length) return;

    const pred = prediction[0];

    // Determine workflow based on risk + confidence
    let templateName;

    if (pred.predicted_probability > 0.80 && pred.confidence_score > 80) {
      templateName = 'high_risk_intervention';
    } else if (pred.predicted_probability > 0.60 && pred.confidence_score > 70) {
      templateName = 'medium_risk_intervention';
    } else {
      return;  // Don't trigger workflow for low risk
    }

    // Create workflow instance
    await this.createWorkflowInstance(pred.family_id, templateName, `prediction_id:${predictionId}`);
  }

  async createWorkflowInstance(familyId, templateName, triggeredBy) {
    // Get template
    const template = await this.db.query(`
      SELECT * FROM workflow_templates WHERE template_name = ? AND active = 1
    `, [templateName]);

    if (!template.length) {
      console.error(`Template not found: ${templateName}`);
      return;
    }

    const tmpl = template[0];

    // Check if already running this workflow for this family
    const existing = await this.db.query(`
      SELECT instance_id FROM workflow_instances
      WHERE family_id = ? 
        AND template_id = ?
        AND status IN ('pending', 'running')
    `, [familyId, tmpl.template_id]);

    if (existing.length > 0) {
      console.log(`Workflow already running for family ${familyId}`);
      return;
    }

    // Create instance
    const result = await this.db.query(`
      INSERT INTO workflow_instances (
        template_id,
        family_id,
        triggered_by,
        status,
        current_step,
        next_execution_at
      )
      OUTPUT INSERTED.instance_id
      VALUES (?, ?, ?, 'pending', 0, GETDATE())
    `, [tmpl.template_id, familyId, triggeredBy]);

    console.log(`Created workflow instance ${result[0].instance_id} for family ${familyId}`);

    return result[0].instance_id;
  }
}

module.exports = WorkflowTrigger;

Usage Example

// Define workflow template
await db.query(`
  INSERT INTO workflow_templates (
    template_name,
    description,
    trigger_type,
    steps,
    retry_strategy
  ) VALUES (
    'high_risk_intervention',
    'Multi-step intervention for high-risk families',
    'prediction',
    ?,
    ?
  )
`, [
  JSON.stringify([
    {
      name: 'Send Initial Email',
      type: 'email',
      template_id: 'concern_outreach',
      delay_days: 0
    },
    {
      name: 'Wait for Response',
      type: 'wait',
      duration_days: 3
    },
    {
      name: 'Check Response',
      type: 'condition',
      condition: { field: 'email_response_received', operator: '==', value: true },
      if_true_step: 5,  // Skip to completion
      if_false_step: 3   // Continue to SMS
    },
    {
      name: 'Send SMS Follow-up',
      type: 'sms',
      message: 'Hi {{family_name}}, checking in - saw some concerning patterns. Can we chat?',
      delay_days: 0
    },
    {
      name: 'Schedule Phone Call',
      type: 'call',
      reason: 'High withdrawal risk - no email/SMS response',
      delay_days: 2
    },
    {
      name: 'Mark Complete',
      type: 'task',
      task_type: 'workflow_complete',
      description: 'High-risk intervention workflow completed'
    }
  ]),
  JSON.stringify({
    max_retries: 3,
    backoff_multiplier: 2
  })
]);

// Start workflow engine
const engine = new WorkflowEngine(db, {
  pollingInterval: 30000,  // 30 seconds
  maxConcurrent: 5
});

await engine.start();

// Trigger workflow from prediction
const trigger = new WorkflowTrigger(db);
await trigger.triggerFromPrediction(12345);

// Workflow executes automatically:
// Day 1: Email sent
// Day 4: SMS sent (if no email response)
// Day 6: Call task created (if no SMS response)
// Day 7: Workflow complete

Variations

By Execution Model

Polling (Shown Above): - Engine polls database for ready workflows - Simple, reliable - 30-60 second latency acceptable

Event-Driven: - Pub/sub triggers workflows instantly - Complex, requires message queue - Near-zero latency

Scheduled (Cron): - Run workflows at specific times - Simple for batch operations - Not real-time

By Complexity

Simple Sequential: - Steps execute in order - No branching or conditions - Easy to understand

Conditional Branching: - Steps can skip based on conditions - If/then logic - Moderate complexity

Full State Machine: - Complex routing logic - Parallel execution - Advanced error handling

By Integration

Standalone: - Workflow engine is separate service - API-based execution - Scales independently

Embedded: - Workflows run in main application - Simpler deployment - Shares resources

Consequences

Benefits

1. Nothing forgotten Every high-risk prediction triggers intervention automatically.

2. Perfect timing Execute at 9am Tuesday, not 2am Saturday.

3. Consistent execution Every family gets same quality intervention.

4. Scales effortlessly Handle 500+ families with same effort as 50.

5. Multi-step sequences Day 1: email, Day 3: SMS, Day 7: call - automatically.

6. Audit trail Prove every intervention was attempted.

7. Retry logic Email bounced? System tries SMS instead.

8. Human oversight System escalates when automation fails.

Costs

1. Development complexity Workflow engine is non-trivial to build.

2. Debugging challenges Async execution harder to debug than synchronous.

3. State management Must track workflow state carefully.

4. Over-automation risk May automate things that need human judgment.

5. Infrastructure requirements Needs reliable background processing.

6. Maintenance burden Workflows need updating as business changes.

Sample Code

Monitor workflow health:

async function getWorkflowStats() {
  const stats = await db.query(`
    SELECT 
      status,
      COUNT(*) as count,
      AVG(DATEDIFF(SECOND, trigger_date, completed_at)) as avg_duration_seconds
    FROM workflow_instances
    WHERE trigger_date >= DATE_SUB(NOW(), INTERVAL 30 DAY)
    GROUP BY status
  `);

  return stats;
}

Known Uses

Homeschool Co-op Intelligence Platform - High-risk intervention workflows (3-step sequences) - Payment reminder workflows (auto-escalating) - Onboarding workflows (8 steps over 4 weeks) - 95% completion rate, 12pp retention improvement

Salesforce/HubSpot - Marketing automation workflows - Lead nurturing sequences - Customer onboarding

Zapier/Make - No-code workflow automation - Integration between apps - Trigger-action sequences

AWS Step Functions - Orchestrate microservices - Complex workflows with branching - Retry and error handling built-in

Requires: - Pattern 1: Universal Event Log - triggers based on events - Pattern 13: Confidence Scoring - only automate high-confidence predictions - Pattern 15: Intervention Recommendation - recommendations become workflow steps

Enables: - Pattern 22: Progressive Escalation - multi-step workflows - Pattern 23: Triggered Interventions - event-driven execution - Pattern 24: Template-Based Communication - workflow steps use templates

Enhanced by: - Pattern 14: Predictive Time Windows - optimal timing - Pattern 25: Multi-Channel Orchestration - coordinate across channels

References

Academic Foundations

  • van der Aalst, Wil M.P. (2016). Process Mining: Data Science in Action (2nd ed.). Springer. ISBN: 978-3662498507
  • Dumas, Marlon, et al. (2018). Fundamentals of Business Process Management (2nd ed.). Springer. ISBN: 978-3662565087
  • Hohpe, Gregor, and Bobby Woolf (2003). Enterprise Integration Patterns. Addison-Wesley. ISBN: 978-0321200686 - https://www.enterpriseintegrationpatterns.com/

Workflow Engines & Platforms

Standards & Specifications

  • Pattern 15: Intervention Recommendation Engine - Recommendations trigger workflows
  • Pattern 23: Triggered Interventions - Event-triggered workflow execution
  • Pattern 26: Feedback Loop Implementation - Workflows incorporate feedback
  • Volume 3, Pattern 17: State-Aware Behavior - Workflow state management
  • Volume 3, Pattern 25: Cross-System Workflows - Complex multi-system workflows

Practical Implementation

Tools & Resources