home /
writings /
thoughts /
courses /
research /
projects /
Back to writings

Deployment Log Streamer

Oct 9, 2025 | 8 min read

tl;dr: Real-time log streaming during deployments: architecture patterns, log aggregation, filtering and routing, multi-tenant isolation, and production-ready observability workflows.

During deployments, you need to see what’s happening in real-time. Waiting for logs to appear in your logging system is too slow. You need streaming logs, filtered by deployment, service, and environment.

Here’s how to build a deployment log streamer that works at scale.

The Problem

Traditional logging workflows:

  1. Application writes logs to files/stdout
  2. Log agent collects logs (periodic polling)
  3. Logs sent to central storage
  4. You query storage to see logs

Problems:

  • High latency (logs appear minutes later)
  • Hard to filter by deployment
  • No real-time visibility
  • Difficult to debug deployment issues

What you need:

  • Real-time log streaming
  • Filter by deployment ID, service, environment
  • Tail-like functionality in UI
  • Multi-tenant isolation

Architecture Overview

A deployment log streamer has several components:

Log Streaming Architecture

graph TB
    App1[App 1<br/>stdout/stderr] --> Aggregator[Log Aggregator<br/>Collect, Parse, Enrich]
    App2[App 2<br/>stdout/stderr] --> Aggregator
    
    Aggregator --> Processor[Stream Processor<br/>Filter, Route, Transform]
    Processor --> Storage[(Storage<br/>Elasticsearch<br/>S3/Archive)]
    Processor --> Client[Client<br/>Web UI, CLI, API]
    
    Note[Features:<br/>Real-time streaming<br/>Filtering & routing<br/>Multi-tenant isolation<br/>High-volume handling]

Components:

  1. Log Sources: Applications writing logs (stdout, stderr, files)
  2. Log Aggregator: Collects, parses, and enriches logs
  3. Stream Processor: Filters, routes, and transforms logs
  4. Storage: Durable storage for search and analytics
  5. Client: UI/CLI/API for viewing logs

Deployment Pipeline Integration

Log streaming should be integrated into your deployment pipeline:

Deployment Pipeline with Log Aggregation

flowchart LR
    Build[Build<br/>Compile, Test, Package] --> Deploy[Deploy<br/>Rollout, Health Check]
    Deploy --> Monitor[Monitor<br/>Metrics, Logs, Alerts]
    Monitor --> Stream[Stream<br/>Collect, Process, Deliver]
    Stream --> Display[Display<br/>UI, CLI, API]
    
    Stream --> LogFlow[Log Flow:<br/>App Logs → Agent →<br/>Stream Buffer → Consumer →<br/>Storage/UI]

Pipeline Stages:

  1. Build: Compile, test, package
  2. Deploy: Rollout, health checks, verification
  3. Monitor: Metrics, logs, alerts
  4. Stream: Real-time log collection and delivery
  5. Display: UI/CLI showing live logs

Integration points:

  • Start streaming when deployment starts
  • Tag logs with deployment ID
  • Filter UI by deployment ID
  • Stop streaming after deployment completes (or keep for TTL)

Log Processing Flow

Raw logs need processing before streaming:

Log Processing Flow

Raw Logs → Parser (Extract timestamp, level, message) → Enricher (Add deployment ID, service, environment) → Structured Log Entry

Processing Features:

  • Pattern matching (regex, JSON, structured)
  • Field extraction & normalization
  • Metadata enrichment (deployment, service, env)
  • Filtering & routing based on content

1. Collection

Collect logs from multiple sources:

class LogCollector {
  constructor() {
    this.sources = [];
  }
  
  addSource(source) {
    this.sources.push(source);
  }
  
  async collect() {
    // Collect from stdout/stderr
    process.stdout.on('data', (chunk) => {
      this.handleLog(chunk, { source: 'stdout' });
    });
    
    // Collect from files
    const watcher = fs.watch('/var/log/app.log', (event, filename) => {
      if (event === 'change') {
        this.readNewLines(filename);
      }
    });
    
    // Collect from Docker containers
    docker.getContainer(containerId).logs({
      follow: true,
      stdout: true,
      stderr: true
    }, (err, stream) => {
      stream.on('data', (chunk) => {
        this.handleLog(chunk, { source: 'docker' });
      });
    });
  }
}

2. Parsing

Parse unstructured logs into structured format:

class LogParser {
  parse(rawLog) {
    // Try different formats
    const patterns = [
      // JSON logs
      /^\{.*\}$/,
      // Structured: timestamp level message
      /^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+(\w+)\s+(.*)$/,
      // Syslog format
      /^<(\d+)>(\w{3}\s+\d+\s+\d{2}:\d{2}:\d{2})\s+(\S+)\s+(.*)$/
    ];
    
    for (const pattern of patterns) {
      const match = rawLog.match(pattern);
      if (match) {
        return this.parseByPattern(match, pattern);
      }
    }
    
    // Fallback: treat as plain text
    return {
      message: rawLog,
      timestamp: new Date().toISOString(),
      level: 'INFO'
    };
  }
  
  parseByPattern(match, pattern) {
    if (pattern.source.includes('JSON')) {
      return JSON.parse(match[0]);
    }
    
    // Parse structured format
    return {
      timestamp: this.parseTimestamp(match[1]),
      level: match[2],
      message: match[3]
    };
  }
}

3. Enrichment

Add metadata to logs:

class LogEnricher {
  enrich(log, context) {
    return {
      ...log,
      // Deployment context
      deploymentId: context.deploymentId,
      service: context.service,
      environment: context.environment,
      version: context.version,
      
      // Infrastructure context
      host: context.host,
      containerId: context.containerId,
      podName: context.podName,
      
      // Request context (if available)
      requestId: context.requestId,
      userId: context.userId,
      
      // Timestamps
      receivedAt: new Date().toISOString(),
      processedAt: new Date().toISOString()
    };
  }
}

4. Streaming

Stream processed logs to clients:

class LogStreamer {
  constructor() {
    this.subscribers = new Map(); // deploymentId -> Set of WebSocket connections
  }
  
  subscribe(deploymentId, filters, ws) {
    if (!this.subscribers.has(deploymentId)) {
      this.subscribers.set(deploymentId, new Set());
    }
    
    this.subscribers.get(deploymentId).add({
      ws,
      filters // { level, service, search }
    });
  }
  
  stream(log) {
    const subscribers = this.subscribers.get(log.deploymentId) || new Set();
    
    for (const subscriber of subscribers) {
      if (this.matchesFilters(log, subscriber.filters)) {
        subscriber.ws.send(JSON.stringify(log));
      }
    }
  }
  
  matchesFilters(log, filters) {
    if (filters.level && log.level !== filters.level) {
      return false;
    }
    
    if (filters.service && log.service !== filters.service) {
      return false;
    }
    
    if (filters.search && !log.message.includes(filters.search)) {
      return false;
    }
    
    return true;
  }
}

Real-Time Delivery

For real-time log streaming, use WebSocket or Server-Sent Events (SSE):

WebSocket Implementation

// Server
const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (ws, req) => {
  const deploymentId = req.url.split('deploymentId=')[1];
  const filters = parseFilters(req.url);
  
  // Subscribe to log stream
  logStreamer.subscribe(deploymentId, filters, ws);
  
  ws.on('close', () => {
    logStreamer.unsubscribe(deploymentId, ws);
  });
});

// Client
const ws = new WebSocket('ws://logs.example.com?deploymentId=deploy-123&level=ERROR');

ws.onmessage = (event) => {
  const log = JSON.parse(event.data);
  displayLog(log);
};

SSE Implementation

// Server
app.get('/api/logs/stream', (req, res) => {
  const deploymentId = req.query.deploymentId;
  const filters = parseFilters(req.query);
  
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');
  
  const subscriber = {
    send: (log) => {
      res.write(`data: ${JSON.stringify(log)}\n\n`);
    }
  };
  
  logStreamer.subscribe(deploymentId, filters, subscriber);
  
  req.on('close', () => {
    logStreamer.unsubscribe(deploymentId, subscriber);
  });
});

// Client
const eventSource = new EventSource('/api/logs/stream?deploymentId=deploy-123');

eventSource.onmessage = (event) => {
  const log = JSON.parse(event.data);
  displayLog(log);
};

Log Routing and Filtering

Route logs to different destinations based on content:

Log Routing and Filtering

Routing Strategy:

  • All logs → Storage (for search/analytics)
  • ERROR level → Alerts (PagerDuty/Slack)
  • Specific service → Real-time UI
  • By deployment ID → Filtered stream
class LogRouter {
  constructor() {
    this.routes = [];
  }
  
  addRoute(condition, destination) {
    this.routes.push({ condition, destination });
  }
  
  route(log) {
    for (const { condition, destination } of this.routes) {
      if (condition(log)) {
        destination.send(log);
      }
    }
  }
}

// Configure routes
router.addRoute(
  (log) => log.level === 'ERROR',
  alertDestination // Send to PagerDuty/Slack
);

router.addRoute(
  (log) => log.service === 'api-service',
  realTimeUIDestination // Stream to UI
);

router.addRoute(
  () => true, // All logs
  storageDestination // Store in Elasticsearch
);

Filtering capabilities:

  • By log level (INFO, WARN, ERROR, DEBUG)
  • By service name
  • By deployment ID
  • By environment
  • By search term (full-text search)
  • By time range

Multi-Tenant Isolation

In multi-tenant systems, logs must be isolated:

Multi-Tenant Log Isolation

Isolation Strategies:

  1. Separate Topics/Streams: Each tenant gets its own Kafka topic
  2. Index Prefixes: Elasticsearch index per tenant
  3. Access Control: Filter by tenant in queries
  4. Resource Quotas: Limit logs per tenant per minute

Benefits: Security, performance isolation, compliance, resource quotas

Isolation strategies:

1. Separate Topics/Streams

// Each tenant gets its own Kafka topic
const topic = `logs-${tenantId}-${environment}`;
await kafka.producer.send({ topic, messages: [log] });

2. Index Prefixes

// Elasticsearch index per tenant
const index = `logs-${tenantId}-${date}`;
await elasticsearch.index({ index, body: log });

3. Access Control

// Filter by tenant in queries
const logs = await elasticsearch.search({
  index: 'logs-*',
  body: {
    query: {
      bool: {
        must: [
          { term: { tenantId } },
          { term: { deploymentId } }
        ]
      }
    }
  }
});

4. Resource Quotas

class TenantQuota {
  constructor(tenantId) {
    this.tenantId = tenantId;
    this.logsPerMinute = 0;
    this.maxLogsPerMinute = 10000;
  }
  
  canAccept() {
    return this.logsPerMinute < this.maxLogsPerMinute;
  }
  
  recordLog() {
    this.logsPerMinute++;
    // Reset counter every minute
  }
}

Handling High Volume

At scale, log streams can be massive:

1. Batching

class LogBatcher {
  constructor(batchSize = 100, flushInterval = 1000) {
    this.batch = [];
    this.batchSize = batchSize;
    this.flushInterval = flushInterval;
    this.startFlushTimer();
  }
  
  add(log) {
    this.batch.push(log);
    
    if (this.batch.length >= this.batchSize) {
      this.flush();
    }
  }
  
  flush() {
    if (this.batch.length > 0) {
      this.sendBatch(this.batch);
      this.batch = [];
    }
  }
  
  startFlushTimer() {
    setInterval(() => this.flush(), this.flushInterval);
  }
}

2. Backpressure

class LogStreamerWithBackpressure {
  constructor(maxQueueSize = 10000) {
    this.queue = [];
    this.maxQueueSize = maxQueueSize;
  }
  
  stream(log) {
    if (this.queue.length >= this.maxQueueSize) {
      // Drop oldest logs or reject new ones
      this.queue.shift(); // Drop oldest
      // Or: throw new Error('Queue full'); // Reject
    }
    
    this.queue.push(log);
    this.processQueue();
  }
}

3. Sampling

// Sample logs when volume is high
class LogSampler {
  constructor(sampleRate = 0.1) {
    this.sampleRate = sampleRate;
  }
  
  shouldSample(log) {
    // Sample ERROR logs always, INFO logs at sampleRate
    if (log.level === 'ERROR') return true;
    return Math.random() < this.sampleRate;
  }
}

Storage Strategy

Store logs for search and analytics:

1. Hot Storage (Recent)

  • Elasticsearch, OpenSearch
  • Fast search, recent logs (last 7-30 days)
  • Indexed for full-text search

2. Warm Storage (Older)

  • Compressed Elasticsearch indices
  • Older logs (30-90 days)
  • Slower but still searchable

3. Cold Storage (Archive)

  • S3, Glacier
  • Very old logs (90+ days)
  • Cheap, slow retrieval
class LogStorage {
  async store(log) {
    // Always store in hot storage
    await elasticsearch.index({
      index: `logs-${this.getDate()}`,
      body: log
    });
    
    // Archive old indices
    await this.archiveOldIndices();
  }
  
  async archiveOldIndices() {
    const cutoffDate = new Date();
    cutoffDate.setDate(cutoffDate.getDate() - 30);
    
    const oldIndices = await this.getIndicesOlderThan(cutoffDate);
    
    for (const index of oldIndices) {
      await this.moveToColdStorage(index);
    }
  }
}

Observability and Debugging

Make logs useful for debugging:

1. Structured Logging

// Use structured logs
logger.info('Deployment started', {
  deploymentId: 'deploy-123',
  service: 'api-service',
  version: '1.2.3',
  environment: 'production',
  userId: 'user-456'
});

2. Correlation IDs

// Add correlation ID to track requests across services
const correlationId = req.headers['x-correlation-id'] || generateId();
logger.info('Processing request', { correlationId, ... });

3. Log Levels

// Use appropriate log levels
logger.debug('Internal state', { state }); // Development
logger.info('User action', { action });    // Normal operation
logger.warn('Deprecated API used', { api }); // Warning
logger.error('Request failed', { error }); // Error

4. Search and Query

// Enable powerful search
const logs = await searchLogs({
  deploymentId: 'deploy-123',
  level: 'ERROR',
  timeRange: { start: '2024-12-15T10:00:00Z', end: '2024-12-15T11:00:00Z' },
  search: 'connection failed',
  service: 'api-service'
});

Key Takeaways

  1. Real-time streaming: WebSocket/SSE for live log delivery
  2. Deployment context: Tag logs with deployment ID, service, environment
  3. Filtering: Filter by level, service, deployment, search terms
  4. Multi-tenant: Isolate logs by tenant (topics, indices, access control)
  5. High volume: Batching, backpressure, sampling
  6. Storage tiers: Hot (fast), warm (compressed), cold (archive)
  7. Structured logs: JSON format for better search and analysis

A good deployment log streamer gives you real-time visibility into what’s happening during deployments. That visibility is critical for debugging issues and ensuring successful deployments.

Start simple: stream logs to a WebSocket, filter by deployment ID. Add complexity (routing, multi-tenant, storage) as you scale.

Building observability systems? I provide architecture reviews, log streaming design, and production-ready patterns for deployment visibility. Let's discuss your implementation.

P.S. Follow me on Twitter where I share engineering insights, system design patterns, and technical leadership perspectives.

Enjoyed this? Support my work

Buy me a coffee