Deployment Log Streamer
| 8 min read
tl;dr: Real-time log streaming during deployments: architecture patterns, log aggregation, filtering and routing, multi-tenant isolation, and production-ready observability workflows.
During deployments, you need to see what’s happening in real-time. Waiting for logs to appear in your logging system is too slow. You need streaming logs, filtered by deployment, service, and environment.
Here’s how to build a deployment log streamer that works at scale.
The Problem
Traditional logging workflows:
- Application writes logs to files/stdout
- Log agent collects logs (periodic polling)
- Logs sent to central storage
- You query storage to see logs
Problems:
- High latency (logs appear minutes later)
- Hard to filter by deployment
- No real-time visibility
- Difficult to debug deployment issues
What you need:
- Real-time log streaming
- Filter by deployment ID, service, environment
- Tail-like functionality in UI
- Multi-tenant isolation
Architecture Overview
A deployment log streamer has several components:
Log Streaming Architecture
graph TB
App1[App 1<br/>stdout/stderr] --> Aggregator[Log Aggregator<br/>Collect, Parse, Enrich]
App2[App 2<br/>stdout/stderr] --> Aggregator
Aggregator --> Processor[Stream Processor<br/>Filter, Route, Transform]
Processor --> Storage[(Storage<br/>Elasticsearch<br/>S3/Archive)]
Processor --> Client[Client<br/>Web UI, CLI, API]
Note[Features:<br/>Real-time streaming<br/>Filtering & routing<br/>Multi-tenant isolation<br/>High-volume handling]
Components:
- Log Sources: Applications writing logs (stdout, stderr, files)
- Log Aggregator: Collects, parses, and enriches logs
- Stream Processor: Filters, routes, and transforms logs
- Storage: Durable storage for search and analytics
- Client: UI/CLI/API for viewing logs
Deployment Pipeline Integration
Log streaming should be integrated into your deployment pipeline:
Deployment Pipeline with Log Aggregation
flowchart LR
Build[Build<br/>Compile, Test, Package] --> Deploy[Deploy<br/>Rollout, Health Check]
Deploy --> Monitor[Monitor<br/>Metrics, Logs, Alerts]
Monitor --> Stream[Stream<br/>Collect, Process, Deliver]
Stream --> Display[Display<br/>UI, CLI, API]
Stream --> LogFlow[Log Flow:<br/>App Logs → Agent →<br/>Stream Buffer → Consumer →<br/>Storage/UI]
Pipeline Stages:
- Build: Compile, test, package
- Deploy: Rollout, health checks, verification
- Monitor: Metrics, logs, alerts
- Stream: Real-time log collection and delivery
- Display: UI/CLI showing live logs
Integration points:
- Start streaming when deployment starts
- Tag logs with deployment ID
- Filter UI by deployment ID
- Stop streaming after deployment completes (or keep for TTL)
Log Processing Flow
Raw logs need processing before streaming:
Log Processing Flow
Raw Logs → Parser (Extract timestamp, level, message) → Enricher (Add deployment ID, service, environment) → Structured Log Entry
Processing Features:
- Pattern matching (regex, JSON, structured)
- Field extraction & normalization
- Metadata enrichment (deployment, service, env)
- Filtering & routing based on content
1. Collection
Collect logs from multiple sources:
class LogCollector {
constructor() {
this.sources = [];
}
addSource(source) {
this.sources.push(source);
}
async collect() {
// Collect from stdout/stderr
process.stdout.on('data', (chunk) => {
this.handleLog(chunk, { source: 'stdout' });
});
// Collect from files
const watcher = fs.watch('/var/log/app.log', (event, filename) => {
if (event === 'change') {
this.readNewLines(filename);
}
});
// Collect from Docker containers
docker.getContainer(containerId).logs({
follow: true,
stdout: true,
stderr: true
}, (err, stream) => {
stream.on('data', (chunk) => {
this.handleLog(chunk, { source: 'docker' });
});
});
}
}
2. Parsing
Parse unstructured logs into structured format:
class LogParser {
parse(rawLog) {
// Try different formats
const patterns = [
// JSON logs
/^\{.*\}$/,
// Structured: timestamp level message
/^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s+(\w+)\s+(.*)$/,
// Syslog format
/^<(\d+)>(\w{3}\s+\d+\s+\d{2}:\d{2}:\d{2})\s+(\S+)\s+(.*)$/
];
for (const pattern of patterns) {
const match = rawLog.match(pattern);
if (match) {
return this.parseByPattern(match, pattern);
}
}
// Fallback: treat as plain text
return {
message: rawLog,
timestamp: new Date().toISOString(),
level: 'INFO'
};
}
parseByPattern(match, pattern) {
if (pattern.source.includes('JSON')) {
return JSON.parse(match[0]);
}
// Parse structured format
return {
timestamp: this.parseTimestamp(match[1]),
level: match[2],
message: match[3]
};
}
}
3. Enrichment
Add metadata to logs:
class LogEnricher {
enrich(log, context) {
return {
...log,
// Deployment context
deploymentId: context.deploymentId,
service: context.service,
environment: context.environment,
version: context.version,
// Infrastructure context
host: context.host,
containerId: context.containerId,
podName: context.podName,
// Request context (if available)
requestId: context.requestId,
userId: context.userId,
// Timestamps
receivedAt: new Date().toISOString(),
processedAt: new Date().toISOString()
};
}
}
4. Streaming
Stream processed logs to clients:
class LogStreamer {
constructor() {
this.subscribers = new Map(); // deploymentId -> Set of WebSocket connections
}
subscribe(deploymentId, filters, ws) {
if (!this.subscribers.has(deploymentId)) {
this.subscribers.set(deploymentId, new Set());
}
this.subscribers.get(deploymentId).add({
ws,
filters // { level, service, search }
});
}
stream(log) {
const subscribers = this.subscribers.get(log.deploymentId) || new Set();
for (const subscriber of subscribers) {
if (this.matchesFilters(log, subscriber.filters)) {
subscriber.ws.send(JSON.stringify(log));
}
}
}
matchesFilters(log, filters) {
if (filters.level && log.level !== filters.level) {
return false;
}
if (filters.service && log.service !== filters.service) {
return false;
}
if (filters.search && !log.message.includes(filters.search)) {
return false;
}
return true;
}
}
Real-Time Delivery
For real-time log streaming, use WebSocket or Server-Sent Events (SSE):
WebSocket Implementation
// Server
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws, req) => {
const deploymentId = req.url.split('deploymentId=')[1];
const filters = parseFilters(req.url);
// Subscribe to log stream
logStreamer.subscribe(deploymentId, filters, ws);
ws.on('close', () => {
logStreamer.unsubscribe(deploymentId, ws);
});
});
// Client
const ws = new WebSocket('ws://logs.example.com?deploymentId=deploy-123&level=ERROR');
ws.onmessage = (event) => {
const log = JSON.parse(event.data);
displayLog(log);
};
SSE Implementation
// Server
app.get('/api/logs/stream', (req, res) => {
const deploymentId = req.query.deploymentId;
const filters = parseFilters(req.query);
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const subscriber = {
send: (log) => {
res.write(`data: ${JSON.stringify(log)}\n\n`);
}
};
logStreamer.subscribe(deploymentId, filters, subscriber);
req.on('close', () => {
logStreamer.unsubscribe(deploymentId, subscriber);
});
});
// Client
const eventSource = new EventSource('/api/logs/stream?deploymentId=deploy-123');
eventSource.onmessage = (event) => {
const log = JSON.parse(event.data);
displayLog(log);
};
Log Routing and Filtering
Route logs to different destinations based on content:
Log Routing and Filtering
Routing Strategy:
- All logs → Storage (for search/analytics)
- ERROR level → Alerts (PagerDuty/Slack)
- Specific service → Real-time UI
- By deployment ID → Filtered stream
class LogRouter {
constructor() {
this.routes = [];
}
addRoute(condition, destination) {
this.routes.push({ condition, destination });
}
route(log) {
for (const { condition, destination } of this.routes) {
if (condition(log)) {
destination.send(log);
}
}
}
}
// Configure routes
router.addRoute(
(log) => log.level === 'ERROR',
alertDestination // Send to PagerDuty/Slack
);
router.addRoute(
(log) => log.service === 'api-service',
realTimeUIDestination // Stream to UI
);
router.addRoute(
() => true, // All logs
storageDestination // Store in Elasticsearch
);
Filtering capabilities:
- By log level (INFO, WARN, ERROR, DEBUG)
- By service name
- By deployment ID
- By environment
- By search term (full-text search)
- By time range
Multi-Tenant Isolation
In multi-tenant systems, logs must be isolated:
Multi-Tenant Log Isolation
Isolation Strategies:
- Separate Topics/Streams: Each tenant gets its own Kafka topic
- Index Prefixes: Elasticsearch index per tenant
- Access Control: Filter by tenant in queries
- Resource Quotas: Limit logs per tenant per minute
Benefits: Security, performance isolation, compliance, resource quotas
Isolation strategies:
1. Separate Topics/Streams
// Each tenant gets its own Kafka topic
const topic = `logs-${tenantId}-${environment}`;
await kafka.producer.send({ topic, messages: [log] });
2. Index Prefixes
// Elasticsearch index per tenant
const index = `logs-${tenantId}-${date}`;
await elasticsearch.index({ index, body: log });
3. Access Control
// Filter by tenant in queries
const logs = await elasticsearch.search({
index: 'logs-*',
body: {
query: {
bool: {
must: [
{ term: { tenantId } },
{ term: { deploymentId } }
]
}
}
}
});
4. Resource Quotas
class TenantQuota {
constructor(tenantId) {
this.tenantId = tenantId;
this.logsPerMinute = 0;
this.maxLogsPerMinute = 10000;
}
canAccept() {
return this.logsPerMinute < this.maxLogsPerMinute;
}
recordLog() {
this.logsPerMinute++;
// Reset counter every minute
}
}
Handling High Volume
At scale, log streams can be massive:
1. Batching
class LogBatcher {
constructor(batchSize = 100, flushInterval = 1000) {
this.batch = [];
this.batchSize = batchSize;
this.flushInterval = flushInterval;
this.startFlushTimer();
}
add(log) {
this.batch.push(log);
if (this.batch.length >= this.batchSize) {
this.flush();
}
}
flush() {
if (this.batch.length > 0) {
this.sendBatch(this.batch);
this.batch = [];
}
}
startFlushTimer() {
setInterval(() => this.flush(), this.flushInterval);
}
}
2. Backpressure
class LogStreamerWithBackpressure {
constructor(maxQueueSize = 10000) {
this.queue = [];
this.maxQueueSize = maxQueueSize;
}
stream(log) {
if (this.queue.length >= this.maxQueueSize) {
// Drop oldest logs or reject new ones
this.queue.shift(); // Drop oldest
// Or: throw new Error('Queue full'); // Reject
}
this.queue.push(log);
this.processQueue();
}
}
3. Sampling
// Sample logs when volume is high
class LogSampler {
constructor(sampleRate = 0.1) {
this.sampleRate = sampleRate;
}
shouldSample(log) {
// Sample ERROR logs always, INFO logs at sampleRate
if (log.level === 'ERROR') return true;
return Math.random() < this.sampleRate;
}
}
Storage Strategy
Store logs for search and analytics:
1. Hot Storage (Recent)
- Elasticsearch, OpenSearch
- Fast search, recent logs (last 7-30 days)
- Indexed for full-text search
2. Warm Storage (Older)
- Compressed Elasticsearch indices
- Older logs (30-90 days)
- Slower but still searchable
3. Cold Storage (Archive)
- S3, Glacier
- Very old logs (90+ days)
- Cheap, slow retrieval
class LogStorage {
async store(log) {
// Always store in hot storage
await elasticsearch.index({
index: `logs-${this.getDate()}`,
body: log
});
// Archive old indices
await this.archiveOldIndices();
}
async archiveOldIndices() {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - 30);
const oldIndices = await this.getIndicesOlderThan(cutoffDate);
for (const index of oldIndices) {
await this.moveToColdStorage(index);
}
}
}
Observability and Debugging
Make logs useful for debugging:
1. Structured Logging
// Use structured logs
logger.info('Deployment started', {
deploymentId: 'deploy-123',
service: 'api-service',
version: '1.2.3',
environment: 'production',
userId: 'user-456'
});
2. Correlation IDs
// Add correlation ID to track requests across services
const correlationId = req.headers['x-correlation-id'] || generateId();
logger.info('Processing request', { correlationId, ... });
3. Log Levels
// Use appropriate log levels
logger.debug('Internal state', { state }); // Development
logger.info('User action', { action }); // Normal operation
logger.warn('Deprecated API used', { api }); // Warning
logger.error('Request failed', { error }); // Error
4. Search and Query
// Enable powerful search
const logs = await searchLogs({
deploymentId: 'deploy-123',
level: 'ERROR',
timeRange: { start: '2024-12-15T10:00:00Z', end: '2024-12-15T11:00:00Z' },
search: 'connection failed',
service: 'api-service'
});
Key Takeaways
- Real-time streaming: WebSocket/SSE for live log delivery
- Deployment context: Tag logs with deployment ID, service, environment
- Filtering: Filter by level, service, deployment, search terms
- Multi-tenant: Isolate logs by tenant (topics, indices, access control)
- High volume: Batching, backpressure, sampling
- Storage tiers: Hot (fast), warm (compressed), cold (archive)
- Structured logs: JSON format for better search and analysis
A good deployment log streamer gives you real-time visibility into what’s happening during deployments. That visibility is critical for debugging issues and ensuring successful deployments.
Start simple: stream logs to a WebSocket, filter by deployment ID. Add complexity (routing, multi-tenant, storage) as you scale.
Building observability systems? I provide architecture reviews, log streaming design, and production-ready patterns for deployment visibility. Let's discuss your implementation.
P.S. Follow me on Twitter where I share engineering insights, system design patterns, and technical leadership perspectives.
Enjoyed this? Support my work
Buy me a coffee