Skip to main content

Advanced Stream Redirection: Building Data Processing Pipelines

In the previous article, we discovered how standard streams (stdin, stdout, stderr) let processes communicate. Now we're ready for the exciting part: learning how to orchestrate multiple programs working together through sophisticated stream redirection.

Have you ever wondered how developers build command-line tools that seamlessly work together, passing data through chains of transformations? Or how to redirect input and output in complex ways to build automated data processing workflows? Let's explore these powerful techniques.

Quick Reference

When to use: Building data pipelines, automating workflows, processing large datasets, creating production-ready CLI tools

Basic patterns:

# Combine stdout and stderr
command > output.txt 2>&1

# Split outputs
command > output.txt 2> errors.txt

# Chain multiple processes
cat data.txt | filter | transform | output

Common use cases:

  • Processing log files through multiple stages
  • Automated data transformation workflows
  • Server-side data pipelines
  • Development and deployment automation

What You Need to Know First

Required reading:

Helpful background:

  • Basic terminal/command-line usage
  • Node.js fundamentals (running scripts, installing packages)
  • File system operations (reading/writing files)

What We'll Cover in This Article

By the end of this guide, you'll master:

  • Complex stream redirection patterns and operators
  • Building multi-stage data processing pipelines
  • Programmatic stream redirection in Node.js
  • Handling edge cases and errors in pipelines
  • Real-world pipeline architectures
  • Performance considerations for large-scale processing

What We'll Explain Along the Way

We'll explore these concepts with detailed examples:

  • File descriptor manipulation (what 2>&1 really means)
  • Process substitution (advanced bash technique)
  • Backpressure in stream processing
  • Error propagation in pipelines
  • Resource cleanup and memory management

Understanding Redirection Operators in Depth

Let's dig deeper into redirection operators and discover how they really work at the operating system level.

The > Operator: Redirecting stdout

When you write:

$ node script.ts > output.txt

Here's what happens behind the scenes:

  1. Shell starts your Node.js process with three streams
  2. Before starting, the shell closes stdout (file descriptor 1)
  3. Opens output.txt and assigns it to file descriptor 1
  4. Now stdout points to the file instead of the terminal
  5. Your program runs normally, writing to stdout
  6. All stdout output flows into the file

Your program doesn't know it's writing to a file—it still just writes to stdout. The redirection happens transparently.

Visual representation:

Before: process → stdout (fd 1) → terminal
After: process → stdout (fd 1) → output.txt

The >> Operator: Append Instead of Overwrite

$ node script.ts >> output.txt

This works exactly like >, except the file is opened in append mode. New output adds to the end rather than replacing the file's contents.

When to use each:

  • Use > when starting fresh (log rotation, new analysis)
  • Use >> when accumulating data (continuous logging, aggregating results)
# Day 1: Start fresh log
$ node monitor.ts > system.log

# Day 2-30: Append to existing log
$ node monitor.ts >> system.log

The 2> Operator: Redirecting stderr Specifically

Remember, 2 is the file descriptor for stderr. So 2> says "redirect file descriptor 2."

$ node script.ts 2> errors.txt

This redirects only stderr to the file. stdout still goes to the terminal.

Practical example:

// mixed-output.ts
// Demonstrates: Separate stdout and stderr streams

// Normal output
console.log("Processing record 1");
console.log("Processing record 2");

// Error output
process.stderr.write("Warning: Low memory\n");

// More normal output
console.log("Processing record 3");

Run it with separation:

$ node mixed-output.ts > records.txt 2> warnings.txt

Now check the files:

$ cat records.txt
Processing record 1
Processing record 2
Processing record 3

$ cat warnings.txt
Warning: Low memory

Perfect separation! This pattern is incredibly useful for production systems where you want to:

  • Save results in one location
  • Save errors/warnings in another
  • Monitor each independently

The 2>&1 Pattern: Merging stderr into stdout

This is one of the most powerful—and initially confusing—redirection patterns. Let's decode it step by step.

$ node script.ts > output.txt 2>&1

What 2>&1 means literally:

  • 2> - redirect stderr (file descriptor 2)
  • &1 - to wherever file descriptor 1 is currently pointing

The order matters! Let's see why:

# ✅ Correct: stdout redirected first, then stderr follows
$ node script.ts > output.txt 2>&1

Step 1: > output.txt # stdout (fd 1) now points to output.txt
Step 2: 2>&1 # stderr (fd 2) now points to where fd 1 points (output.txt)
Result: Both go to output.txt
# ❌ Different behavior: stderr redirected first
$ node script.ts 2>&1 > output.txt

Step 1: 2>&1 # stderr (fd 2) now points to where fd 1 points (terminal)
Step 2: > output.txt # stdout (fd 1) now points to output.txt
Result: stdout to file, stderr to terminal (not what we wanted!)

Visual comparison:

Correct order (> file 2>&1):
stdout → output.txt
stderr → output.txt

Wrong order (2>&1 > file):
stdout → output.txt
stderr → terminal

Why this pattern is useful:

// production-script.ts
// In production, you want complete logs with timestamps

console.log("[INFO] Starting process");
process.stderr.write("[ERROR] Configuration missing\n");
console.log("[INFO] Using defaults");

Capture everything:

$ node production-script.ts > complete.log 2>&1

Now complete.log contains all output in chronological order:

[INFO] Starting process
[ERROR] Configuration missing
[INFO] Using defaults

The &> Shortcut (Bash Only)

In Bash, you can use a shortcut:

$ node script.ts &> output.txt

This is equivalent to:

$ node script.ts > output.txt 2>&1

Note: The &> operator doesn't exist in all shells (it's Bash-specific). The > file 2>&1 pattern works everywhere and is more portable.

The < Operator: Input Redirection

We can also redirect input from files:

$ node process.ts < input.txt

This sends the file's contents to your program's stdin.

Example program:

// process.ts
// Purpose: Process data from stdin
// Works with: keyboard input, file redirection, or pipes

let totalLines = 0;

process.stdin.on("data", (chunk: Buffer) => {
const text = chunk.toString();
const lines = text.split("\n").filter((line) => line.trim());
totalLines += lines.length;
});

process.stdin.on("end", () => {
console.log(`Processed ${totalLines} lines`);
});

Now you can use it multiple ways:

# From a file
$ node process.ts < data.txt
Processed 1000 lines

# From keyboard (type Ctrl+D when done)
$ node process.ts
[type your data]
Processed 5 lines

# From another command
$ cat data.txt | node process.ts
Processed 1000 lines

Building Complex Data Pipelines

Now let's explore the real power: chaining multiple processes together through pipes. Each program does one thing well, and together they accomplish complex tasks.

The Pipe Operator: Connecting Processes

The pipe operator | connects the stdout of one process to the stdin of the next:

$ command1 | command2 | command3

Data flow:

command1 → stdout → stdin → command2 → stdout → stdin → command3 → stdout → terminal

Real-World Pipeline Example: Log Analysis

Let's build a practical log analysis pipeline step by step.

Scenario: You have a web server log file and need to:

  1. Extract lines containing errors
  2. Count how many errors occurred
  3. Find the most common error messages
  4. Save results to a report

The data (server.log):

2024-11-05 10:23:45 INFO User logged in
2024-11-05 10:24:12 ERROR Database connection failed
2024-11-05 10:24:15 INFO Request processed
2024-11-05 10:25:03 ERROR Database connection failed
2024-11-05 10:26:18 ERROR File not found: config.json
2024-11-05 10:27:22 INFO User logged out
2024-11-05 10:28:45 ERROR Database connection failed

Step 1: Extract error lines

$ cat server.log | grep "ERROR"
2024-11-05 10:24:12 ERROR Database connection failed
2024-11-05 10:25:03 ERROR Database connection failed
2024-11-05 10:26:18 ERROR File not found: config.json
2024-11-05 10:28:45 ERROR Database connection failed

Step 2: Extract just the error messages

$ cat server.log | grep "ERROR" | cut -d' ' -f4-
Database connection failed
Database connection failed
File not found: config.json
Database connection failed

The cut command splits each line by spaces (-d' ') and takes fields 4 onwards (-f4-).

Step 3: Sort error messages together

$ cat server.log | grep "ERROR" | cut -d' ' -f4- | sort
Database connection failed
Database connection failed
Database connection failed
File not found: config.json

Step 4: Count occurrences of each error

$ cat server.log | grep "ERROR" | cut -d' ' -f4- | sort | uniq -c
3 Database connection failed
1 File not found: config.json

Step 5: Sort by frequency (most common first)

$ cat server.log | grep "ERROR" | cut -d' ' -f4- | sort | uniq -c | sort -rn
3 Database connection failed
1 File not found: config.json

Step 6: Save to a report

$ cat server.log | grep "ERROR" | cut -d' ' -f4- | sort | uniq -c | sort -rn > error-report.txt

All in one command:

$ cat server.log | grep "ERROR" | cut -d' ' -f4- | sort | uniq -c | sort -rn > error-report.txt

This is the power of pipes! Each command is simple, but together they perform sophisticated analysis.

Building Node.js Pipeline Components

Let's create Node.js programs designed to work in pipelines. The key principles:

  1. Read from stdin (don't hardcode input files)
  2. Write data to stdout (for the next stage)
  3. Write messages to stderr (so they don't contaminate data)
  4. Exit gracefully when input ends

Example 1: JSON Formatter

// json-format.ts
// Purpose: Format JSON data for readability
// Usage: cat data.json | node json-format.ts

let inputData = "";

process.stdin.on("data", (chunk: Buffer) => {
inputData += chunk.toString();
});

process.stdin.on("end", () => {
try {
// Parse JSON
const parsed = JSON.parse(inputData);

// Format with 2-space indentation
const formatted = JSON.stringify(parsed, null, 2);

// Send to stdout for next stage or terminal
process.stdout.write(formatted + "\n");

// Log success to stderr (visible but doesn't affect data)
process.stderr.write(`✓ Formatted ${Object.keys(parsed).length} keys\n`);
} catch (error) {
// Errors go to stderr
process.stderr.write(`✗ Invalid JSON: ${error.message}\n`);
process.exit(1);
}
});

Example 2: CSV to JSON Converter

// csv-to-json.ts
// Purpose: Convert CSV to JSON format
// Usage: cat data.csv | node csv-to-json.ts

let headers: string[] = [];
const rows: Record<string, string>[] = [];
let isFirstLine = true;

process.stdin.on("data", (chunk: Buffer) => {
const lines = chunk.toString().split("\n");

lines.forEach((line) => {
if (!line.trim()) return;

if (isFirstLine) {
// First line is headers
headers = line.split(",").map((h) => h.trim());
isFirstLine = false;
process.stderr.write(`Found headers: ${headers.join(", ")}\n`);
} else {
// Data rows
const values = line.split(",").map((v) => v.trim());
const row: Record<string, string> = {};

headers.forEach((header, index) => {
row[header] = values[index] || "";
});

rows.push(row);
}
});
});

process.stdin.on("end", () => {
// Output JSON to stdout
process.stdout.write(JSON.stringify(rows, null, 2) + "\n");

// Stats to stderr
process.stderr.write(`✓ Converted ${rows.length} rows\n`);
});

Example 3: Data Filter

// filter-large.ts
// Purpose: Filter JSON objects by size
// Usage: cat data.json | node csv-to-json.ts | node filter-large.ts

let inputData = "";

process.stdin.on("data", (chunk: Buffer) => {
inputData += chunk.toString();
});

process.stdin.on("end", () => {
try {
const data = JSON.parse(inputData);

// Filter: keep only items with size > 100
const filtered = data.filter((item: any) => {
return item.size && parseInt(item.size) > 100;
});

// Output filtered data
process.stdout.write(JSON.stringify(filtered, null, 2) + "\n");

// Stats to stderr
process.stderr.write(
`Filtered ${data.length}${filtered.length} items\n`
);
} catch (error) {
process.stderr.write(`Error: ${error.message}\n`);
process.exit(1);
}
});

Building a complete pipeline:

$ cat data.csv | node csv-to-json.ts | node filter-large.ts | node json-format.ts > results.json
Found headers: name, size, date
✓ Converted 150 rows
Filtered 15023 items
✓ Formatted 23 keys

Notice how status messages appear on your terminal (stderr) while data flows through the pipeline to the output file (stdout).

Advanced Redirection Techniques

Splitting Output to Multiple Destinations

Sometimes you want to see output on screen AND save it to a file. The tee command does this:

$ node script.ts | tee output.txt

tee reads stdin, writes to stdout (so you see it), and also writes to the file.

Real-world example:

# Run tests, see results, and save log
$ npm test | tee test-results.txt

# Watch logs while saving them
$ node server.ts | tee server.log

Process Substitution (Advanced Bash)

Process substitution lets you use command output as if it were a file.

$ diff <(command1) <(command2)

The <(command) syntax runs the command and makes its output available as a temporary file.

Example: Compare outputs of two commands

$ diff <(node version1.ts) <(node version2.ts)

This compares the output of two different script versions without creating temporary files manually.

Named Pipes (FIFOs)

Named pipes let processes communicate through the filesystem:

# Create a named pipe
$ mkfifo mypipe

# Terminal 1: Write to the pipe
$ node producer.ts > mypipe

# Terminal 2: Read from the pipe
$ node consumer.ts < mypipe

The producer blocks until the consumer starts reading, creating synchronization between processes.

Use case: Streaming data between long-running processes without filling disk space with intermediate files.

Programmatic Stream Redirection in Node.js

Sometimes you need to redirect streams from within your Node.js program. Let's explore how to do this programmatically.

Redirecting stdout to a File

// redirect-stdout.ts
// Purpose: Redirect stdout to a file programmatically

import fs from "fs";

// Create a writable stream to a file
const logFile = fs.createWriteStream("output.log");

// Save original stdout
const originalStdout = process.stdout.write;

// Override stdout.write to go to file
process.stdout.write = function (chunk: any): boolean {
return logFile.write(chunk);
};

// Now all console.log goes to the file
console.log("This goes to output.log");
console.log("So does this");

// Restore original stdout if needed
process.stdout.write = originalStdout;
console.log("This goes back to terminal");

Capturing Child Process Output

// capture-output.ts
// Purpose: Run a command and capture its output

import { spawn } from "child_process";

function runCommand(command: string, args: string[]): Promise<string> {
return new Promise((resolve, reject) => {
const child = spawn(command, args);
let output = "";
let errorOutput = "";

// Capture stdout
child.stdout.on("data", (chunk: Buffer) => {
output += chunk.toString();
});

// Capture stderr
child.stderr.on("data", (chunk: Buffer) => {
errorOutput += chunk.toString();
});

// Handle process completion
child.on("close", (code: number) => {
if (code === 0) {
resolve(output);
} else {
reject(new Error(`Command failed with code ${code}: ${errorOutput}`));
}
});
});
}

// Usage
async function main() {
try {
const output = await runCommand("ls", ["-la"]);
console.log("Command output:", output);
} catch (error) {
console.error("Command failed:", error);
}
}

main();

Building a Custom Pipeline

// pipeline-builder.ts
// Purpose: Chain Node.js scripts together programmatically

import { spawn } from "child_process";

interface PipelineStage {
command: string;
args: string[];
}

function buildPipeline(stages: PipelineStage[]): Promise<string> {
return new Promise((resolve, reject) => {
if (stages.length === 0) {
return resolve("");
}

// Spawn all processes
const processes = stages.map((stage) => spawn(stage.command, stage.args));

// Connect pipes: stdout of each to stdin of next
for (let i = 0; i < processes.length - 1; i++) {
processes[i].stdout.pipe(processes[i + 1].stdin);
}

// Capture final output
let output = "";
const lastProcess = processes[processes.length - 1];

lastProcess.stdout.on("data", (chunk: Buffer) => {
output += chunk.toString();
});

// Handle errors
processes.forEach((proc, index) => {
proc.on("error", (error) => {
reject(new Error(`Stage ${index + 1} failed: ${error.message}`));
});
});

// Handle completion
lastProcess.on("close", (code: number) => {
if (code === 0) {
resolve(output);
} else {
reject(new Error(`Pipeline failed with code ${code}`));
}
});
});
}

// Usage example
async function analyzeLogs() {
const pipeline: PipelineStage[] = [
{ command: "cat", args: ["server.log"] },
{ command: "grep", args: ["ERROR"] },
{ command: "sort", args: [] },
{ command: "uniq", args: ["-c"] },
];

try {
const result = await buildPipeline(pipeline);
console.log("Analysis complete:");
console.log(result);
} catch (error) {
console.error("Pipeline failed:", error);
}
}

analyzeLogs();

Performance Considerations

Memory Usage: Streams vs Buffers

When processing large files, stream-based approaches use constant memory while buffer-based approaches scale with file size.

Memory comparison:

// ❌ Bad: Loads entire file into memory
import fs from "fs";

// For a 1GB file, this uses ~1GB of memory
const content = fs.readFileSync("large-file.txt", "utf8");
const lines = content.split("\n");
console.log(`Lines: ${lines.length}`);
// ✅ Good: Streams data in chunks
import fs from "fs";
import readline from "readline";

// For a 1GB file, this uses ~10MB of memory
const stream = fs.createReadStream("large-file.txt");
const rl = readline.createInterface({ input: stream });

let lineCount = 0;
rl.on("line", () => lineCount++);
rl.on("close", () => console.log(`Lines: ${lineCount}`));

Performance table:

File SizeBuffer ApproachStream Approach
10 MB10 MB memory5 MB memory
100 MB100 MB memory5 MB memory
1 GB1 GB memory5 MB memory
10 GBOut of memory ❌5 MB memory ✅

Backpressure: Handling Flow Control

When piping streams, the consumer might be slower than the producer. This creates backpressure—data accumulating in memory buffers.

Without backpressure handling:

// ❌ Can cause memory issues with fast sources
const source = fs.createReadStream("huge-file.txt");
const destination = fs.createWriteStream("output.txt");

source.on("data", (chunk) => {
destination.write(chunk); // Might not be ready!
});

With proper backpressure handling:

// ✅ Respects flow control
const source = fs.createReadStream("huge-file.txt");
const destination = fs.createWriteStream("output.txt");

source.on("data", (chunk) => {
const canContinue = destination.write(chunk);

if (!canContinue) {
// Destination buffer is full, pause source
source.pause();

// Resume when destination is ready
destination.once("drain", () => {
source.resume();
});
}
});

Best approach: Use pipe():

// ✅ Best: pipe() handles backpressure automatically
const source = fs.createReadStream("huge-file.txt");
const destination = fs.createWriteStream("output.txt");

source.pipe(destination);

The pipe() method automatically:

  • Pauses the source when destination is overwhelmed
  • Resumes when destination is ready
  • Handles errors properly
  • Cleans up resources

Common Misconceptions

❌ Misconception: Pipes work like file copying

Reality: Pipes stream data continuously without creating intermediate files.

Why this matters: Pipes can process files larger than your available disk space because they don't store intermediate results.

Example:

# ❌ Thinking: This creates temporary files
$ cat huge.log | grep "ERROR" | sort > results.txt

# ✅ Reality: Data flows through memory only
# No intermediate files created!

If huge.log is 50GB and you only have 20GB free disk space, this still works because data flows through memory in chunks.

❌ Misconception: Redirecting stdout also redirects console.error()

Reality: console.error() writes to stderr, not stdout, so it isn't affected by > redirection.

Example:

// mixed-logs.ts
console.log("Normal message");
console.error("Error message");
console.log("Another normal message");
$ node mixed-logs.ts > output.txt
Error message # Appears on screen (stderr not redirected)

Check the file:

$ cat output.txt
Normal message
Another normal message

❌ Misconception: Pipe failures are silent

Reality: If any stage in a pipeline fails, you need to handle it explicitly or it might go unnoticed.

Example:

// failing-stage.ts
process.stdin.on("data", (chunk) => {
// This stage will crash
throw new Error("Oops!");
});
$ echo "test" | node failing-stage.ts | cat
# The error might not propagate!

Solution: Use proper error handling:

process.stdin.on("data", (chunk) => {
try {
// Your processing
} catch (error) {
process.stderr.write(`Error: ${error.message}\n`);
process.exit(1); // Exit with error code
}
});

Troubleshooting Pipeline Issues

Problem: Pipeline produces no output

Symptoms: Command runs but produces empty output.

Common causes:

  1. A stage in the pipeline filtered everything out (80%)
  2. Output is buffered and not flushed (15%)
  3. Wrong redirection order (5%)

Diagnostic steps:

# Step 1: Test each stage individually
$ cat data.txt # Does source have data?
$ cat data.txt | grep "pattern" # Does filter match anything?
$ cat data.txt | grep "pattern" | sort # Does sort work?

# Step 2: Check for buffering
$ node script.ts | cat # Force flush with cat

# Step 3: Verify redirection order
$ node script.ts > output.txt 2>&1 # Correct order

Solution: Check each pipeline stage independently to find where data disappears.

Problem: "Broken pipe" error

Symptoms: Error message about broken pipe, pipeline terminates early.

Common cause: A process in the pipeline exits before reading all input from the previous stage.

Example:

$ cat huge-file.txt | head -n 10
cat: write error: Broken pipe

This is actually normal! head reads 10 lines and exits, causing cat to get a broken pipe when trying to write more.

Solution: This is usually harmless. To suppress:

$ cat huge-file.txt 2>/dev/null | head -n 10

Problem: Pipeline hangs indefinitely

Symptoms: Command starts but never completes.

Common causes:

  1. Deadlock: process waiting for input that never comes (70%)
  2. Infinite loop in one stage (20%)
  3. Waiting for stdin but should use files (10%)

Diagnostic steps:

// Add timeouts to detect hangs
setTimeout(() => {
process.stderr.write("Still waiting for input...\n");
}, 5000);

// Log when data arrives
process.stdin.on("data", (chunk) => {
process.stderr.write(`Received ${chunk.length} bytes\n`);
});

Solution: Check if your program expects input from stdin. If running interactively, consider adding timeout or help message.

Production-Ready Pipeline Pattern

Here's a template for building robust, production-ready pipeline components:

// production-pipeline-stage.ts
// Purpose: Template for building reliable pipeline components

import { Transform } from "stream";

class ProductionTransform extends Transform {
private processed = 0;
private errors = 0;

constructor() {
super({ objectMode: true });
}

_transform(
chunk: any,
encoding: BufferEncoding,
callback: (error?: Error | null, data?: any) => void
): void {
try {
// Your transformation logic here
const result = this.processChunk(chunk);

this.processed++;

// Send result to next stage
callback(null, result);
} catch (error) {
this.errors++;

// Log error to stderr (doesn't break pipeline)
process.stderr.write(
`Error processing chunk ${this.processed}: ${error.message}\n`
);

// Skip this chunk and continue
callback();
}
}

_flush(callback: (error?: Error | null) => void): void {
// Called when input stream ends
process.stderr.write(
`Pipeline stage complete: ${this.processed} processed, ${this.errors} errors\n`
);
callback();
}

private processChunk(chunk: any): any {
// Implement your transformation here
return chunk;
}
}

// Usage
const transformer = new ProductionTransform();

process.stdin
.pipe(transformer)
.pipe(process.stdout)
.on("error", (error) => {
process.stderr.write(`Pipeline error: ${error.message}\n`);
process.exit(1);
})
.on("finish", () => {
process.stderr.write("Pipeline completed successfully\n");
});

Check Your Understanding

Quick Quiz

  1. What does 2>&1 do and why does order matter?

    Show Answer

    2>&1 redirects stderr (fd 2) to wherever stdout (fd 1) is currently pointing.

    Order matters because redirections are processed left to right:

    # Correct: Both go to file
    command > file.txt 2>&1

    # Wrong: Only stdout to file, stderr to terminal
    command 2>&1 > file.txt

    In the correct version, > redirects stdout to file first, then 2>&1 sends stderr to that same file. In the wrong version, 2>&1 duplicates stdout (which still points to terminal), then > redirects only stdout to the file.

  2. Why would you use tee in a pipeline?

    Show Answer

    tee lets you see output on your terminal AND save it to a file simultaneously.

    # Without tee: Only see results, not saved
    npm test

    # With tee: See results AND save to file
    npm test | tee test-results.txt

    This is invaluable for long-running processes where you want to monitor progress in real-time while also keeping a permanent log.

  3. When should you use pipe() vs manual data handling?

    Show Answer

    Use pipe() when:

    • Simply transferring data from source to destination
    • Need automatic backpressure handling
    • Want error propagation and cleanup

    Use manual handling when:

    • Need to transform or inspect data as it flows
    • Building complex branching logic
    • Aggregating data across chunks
    // Use pipe(): Simple transfer
    source.pipe(destination);

    // Manual handling: Need to transform
    source.on("data", (chunk) => {
    const transformed = processChunk(chunk);
    destination.write(transformed);
    });

Hands-On Exercise

Challenge: Build a pipeline component that counts word frequency from stdin and outputs the top 10 most common words as JSON.

Requirements:

  • Read from stdin
  • Count word occurrences (case-insensitive)
  • Output top 10 as JSON to stdout
  • Log progress to stderr

Starter Code:

// word-frequency.ts
// TODO: Implement word frequency counter

let wordCounts: Record<string, number> = {};

process.stdin.on("data", (chunk: Buffer) => {
// Your code here
});

process.stdin.on("end", () => {
// Output top 10 as JSON
});
Show Solution
// word-frequency.ts
// Solution: Word frequency counter for pipelines

interface WordCount {
word: string;
count: number;
}

let wordCounts: Record<string, number> = {};
let totalWords = 0;

process.stdin.on("data", (chunk: Buffer) => {
const text = chunk.toString();

// Extract words (alphanumeric only)
const words = text.toLowerCase().match(/\b[a-z0-9]+\b/g) || [];

// Count occurrences
words.forEach((word) => {
wordCounts[word] = (wordCounts[word] || 0) + 1;
totalWords++;
});

// Log progress to stderr
process.stderr.write(`Processed ${totalWords} words so far...\n`);
});

process.stdin.on("end", () => {
// Convert to array and sort by count
const sorted: WordCount[] = Object.entries(wordCounts)
.map(([word, count]) => ({ word, count }))
.sort((a, b) => b.count - a.count)
.slice(0, 10);

// Output JSON to stdout
const result = {
total_words: totalWords,
unique_words: Object.keys(wordCounts).length,
top_10: sorted,
};

process.stdout.write(JSON.stringify(result, null, 2) + "\n");

// Final stats to stderr
process.stderr.write(
`✓ Analysis complete: ${totalWords} total, ` +
`${Object.keys(wordCounts).length} unique\n`
);
});

Test it:

# From a file
$ cat article.txt | node word-frequency.ts

# From multiple files
$ cat *.txt | node word-frequency.ts

# In a pipeline
$ cat article.txt | node word-frequency.ts | node json-formatter.ts > results.json

Why this solution works:

  1. Streams data incrementally - doesn't load entire file into memory
  2. Logs to stderr - progress messages don't contaminate JSON output
  3. Outputs valid JSON to stdout - can be piped to other tools
  4. Handles edge cases - filters out non-words, handles empty input
  5. Provides useful metadata - includes total and unique word counts

Real-World Pipeline Architectures

Let's explore how production systems use pipelines to process data at scale.

Architecture 1: ETL (Extract, Transform, Load) Pipeline

Scenario: Processing daily sales data from CSV to database.

# Pipeline stages:
# 1. Extract: Read CSV from S3 or file system
# 2. Transform: Clean, validate, enrich data
# 3. Load: Insert into database

$ cat sales-data.csv \
| node extract-csv.ts \
| node validate-data.ts \
| node enrich-with-metadata.ts \
| node load-to-database.ts \
> processed.log 2> errors.log

Each stage is independent:

// validate-data.ts
// Purpose: Validate and filter invalid records

process.stdin.on("data", (chunk: Buffer) => {
const records = JSON.parse(chunk.toString());

const valid = records.filter((record: any) => {
// Validation rules
if (!record.date || !record.amount) {
process.stderr.write(`Invalid record: ${JSON.stringify(record)}\n`);
return false;
}
return true;
});

process.stdout.write(JSON.stringify(valid) + "\n");
});

Architecture 2: Log Aggregation Pipeline

Scenario: Collecting logs from multiple servers and analyzing patterns.

# Collect from multiple sources
$ (cat server1.log & cat server2.log & cat server3.log) \
| node parse-logs.ts \
| node filter-errors.ts \
| node aggregate-by-hour.ts \
| node generate-report.ts \
> daily-report.html

Parallel input, serial processing:

// aggregate-by-hour.ts
// Purpose: Group log entries by hour

const hourlyBuckets: Record<string, any[]> = {};

process.stdin.on("data", (chunk: Buffer) => {
const entries = JSON.parse(chunk.toString());

entries.forEach((entry: any) => {
const hour = new Date(entry.timestamp).getHours();
const key = `hour-${hour}`;

if (!hourlyBuckets[key]) {
hourlyBuckets[key] = [];
}

hourlyBuckets[key].push(entry);
});
});

process.stdin.on("end", () => {
// Output aggregated data
const summary = Object.entries(hourlyBuckets).map(([hour, entries]) => ({
hour,
count: entries.length,
errors: entries.filter((e: any) => e.level === "ERROR").length,
}));

process.stdout.write(JSON.stringify(summary, null, 2) + "\n");
});

Architecture 3: Real-Time Processing Pipeline

Scenario: Processing streaming data from a message queue or API.

// real-time-pipeline.ts
// Purpose: Process continuous stream of events

import { Transform } from "stream";

// Stage 1: Parse events
class EventParser extends Transform {
_transform(chunk: any, encoding: string, callback: Function) {
try {
const events = chunk.toString().split("\n").filter(Boolean);
events.forEach((event) => {
this.push(JSON.parse(event));
});
callback();
} catch (error) {
callback(error);
}
}
}

// Stage 2: Enrich with metadata
class EventEnricher extends Transform {
_transform(event: any, encoding: string, callback: Function) {
event.processedAt = new Date().toISOString();
event.serverId = process.env.SERVER_ID || "unknown";
this.push(event);
callback();
}
}

// Stage 3: Filter important events
class EventFilter extends Transform {
_transform(event: any, encoding: string, callback: Function) {
if (event.priority === "high" || event.type === "error") {
this.push(event);
}
callback();
}
}

// Stage 4: Format output
class EventFormatter extends Transform {
_transform(event: any, encoding: string, callback: Function) {
const formatted = JSON.stringify(event) + "\n";
this.push(formatted);
callback();
}
}

// Build the pipeline
process.stdin
.pipe(new EventParser())
.pipe(new EventEnricher())
.pipe(new EventFilter())
.pipe(new EventFormatter())
.pipe(process.stdout);

Usage:

# Process from a message queue
$ kafka-consumer --topic events | node real-time-pipeline.ts > filtered-events.log

# Process from API stream
$ curl -N https://api.example.com/stream | node real-time-pipeline.ts

Architecture 4: Distributed Processing

Scenario: Splitting work across multiple processes for parallel processing.

# Split input into chunks, process in parallel, merge results
$ cat huge-dataset.csv \
| split -l 10000 --filter='node process-chunk.ts' \
| node merge-results.ts \
> final-output.json

Process chunk component:

// process-chunk.ts
// Purpose: Process a chunk of data independently

process.stdin.on("data", (chunk: Buffer) => {
const lines = chunk.toString().split("\n");

const processed = lines.map((line) => {
// Heavy processing here
return expensiveOperation(line);
});

process.stdout.write(JSON.stringify(processed) + "\n");
});

function expensiveOperation(data: string): any {
// Simulate CPU-intensive work
// In real world: ML inference, image processing, etc.
return { processed: data.toUpperCase() };
}

Performance Optimization Tips

1. Use Stream Transforms for Heavy Processing

Instead of accumulating data, process it as it streams:

// ❌ Bad: Accumulate then process
let allData: any[] = [];
process.stdin.on("data", (chunk) => {
allData.push(...JSON.parse(chunk.toString()));
});
process.stdin.on("end", () => {
const processed = allData.map(heavyOperation);
console.log(JSON.stringify(processed));
});

// ✅ Good: Stream processing
import { Transform } from "stream";

const processor = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
const result = heavyOperation(chunk);
callback(null, result);
},
});

process.stdin.pipe(processor).pipe(process.stdout);

2. Optimize Buffer Sizes

Adjust buffer sizes based on your data:

// Default buffer size: 16KB
const small = fs.createReadStream("file.txt");

// Larger buffer for throughput
const large = fs.createReadStream("file.txt", {
highWaterMark: 64 * 1024, // 64KB
});

// Smaller buffer for memory constraints
const tiny = fs.createReadStream("file.txt", {
highWaterMark: 4 * 1024, // 4KB
});

Guidelines:

  • Large files, fast disks: 64KB - 256KB buffers
  • Network streams: 16KB - 32KB buffers
  • Memory constrained: 4KB - 8KB buffers

3. Parallelize Independent Operations

If stages don't depend on each other, run them in parallel:

# Sequential: Slow
$ cat data.txt | process1 | process2 | process3

# Parallel: Fast (if independent)
$ cat data.txt | tee >(process1 > out1) >(process2 > out2) >(process3 > out3)

4. Use Compression for Network Transfers

When piping over networks, compress data:

# Without compression
$ cat large-file.txt | ssh remote-server 'cat > file.txt'

# With compression (much faster over network)
$ cat large-file.txt | gzip | ssh remote-server 'gunzip > file.txt'

Summary: Key Takeaways

Let's recap the advanced redirection techniques we've explored:

Redirection Operators:

  • > redirects stdout (overwrites file)
  • >> redirects stdout (appends to file)
  • 2> redirects stderr only
  • 2>&1 merges stderr into stdout (order matters!)
  • < redirects file into stdin
  • | pipes stdout to stdin of next command

Pipeline Principles:

  • Each stage reads from stdin, writes to stdout
  • Status messages go to stderr (not stdout)
  • Exit with proper error codes
  • Handle backpressure in production code
  • Use pipe() for automatic resource management

Performance Guidelines:

  • Stream data instead of buffering when possible
  • Optimize buffer sizes for your use case
  • Parallelize independent operations
  • Monitor memory usage in long-running pipelines

Production Best Practices:

  • Log progress to stderr
  • Handle errors gracefully without breaking pipeline
  • Provide meaningful exit codes
  • Clean up resources properly
  • Test each pipeline stage independently

What's Next?

Now that you've mastered stream redirection and pipelines, you're ready to explore:

Advanced Node.js Streams:

  • Transform streams for custom data processing
  • Duplex streams for bidirectional communication
  • Object mode streams for structured data
  • Stream error handling and recovery strategies

Related Topics:

  • Building CLI tools with libraries like Commander.js
  • Process management and monitoring
  • Worker threads for CPU-intensive pipeline stages
  • Message queues for distributed pipelines

Practical Projects:

  • Build a log analysis toolkit
  • Create an ETL pipeline for data migration
  • Develop a real-time data processing system
  • Design a distributed computation framework

You now understand how to orchestrate complex data flows using stream redirection. These patterns form the foundation of Unix-style programming: small, focused tools that work together through standard interfaces. Whether you're processing gigabytes of logs, building data transformation pipelines, or creating developer tools, mastering streams gives you the power to handle data elegantly and efficiently.

Version Information

Tested with:

  • Node.js: v18.x, v20.x, v22.x
  • Bash: 5.x
  • Operating Systems: Linux, macOS, Windows (WSL)

Platform Notes:

  • Windows users: Use WSL (Windows Subsystem for Linux) for full bash redirection support
  • PowerShell has different redirection syntax - these examples use bash/WSL
  • All Node.js code works cross-platform

Stream redirection embodies the Unix philosophy at its finest: powerful, composable tools that do one thing well and work together seamlessly. Master these patterns, and you'll build systems that are elegant, efficient, and maintainable.