Pipe Error Handling: Building Reliable Stream Applications
Imagine you're building a file upload service. A user starts uploading a 1GB video file—the stream is flowing, everything looks good. Then suddenly: network timeout. What happens next?
Without proper error handling, your application might:
- Leave file handles open (memory leak)
- Keep streams running in the background (wasting resources)
- Corrupt the destination file (data integrity issue)
- Crash entirely (terrible user experience)
With proper error handling, your application:
- Cleans up resources immediately
- Informs the user clearly
- Logs the issue for debugging
- Remains stable and ready for the next request
Today, we're going to discover how to build rock-solid stream applications that handle failures gracefully. By the end of this journey, you'll understand why .pipe() alone isn't enough for production code, and how to use pipeline() to build truly reliable applications.
Quick Reference
When to use: Every production stream application that needs proper error handling and resource cleanup
Basic syntax:
import { pipeline } from "stream/promises";
await pipeline(readableStream, transformStream, writableStream);
Key benefit:
- ✅ Automatic error propagation
- ✅ Guaranteed resource cleanup
- ✅ Memory leak prevention
- ✅ Production-ready error handling
Most important rule: Never use .pipe() alone in production—always use pipeline() instead.
What You Need to Know First
To get the most out of this guide, you should understand:
- Stream piping basics: How
.pipe()works and basic stream operations (see our Stream Piping Basics guide) - JavaScript error handling: try/catch blocks and Error objects
- Promises and async/await: Modern asynchronous JavaScript patterns
- Basic Node.js streams: Readable and writable stream concepts
If you haven't read the Stream Piping Basics guide, please start there—this article builds directly on those concepts.
What We'll Cover in This Article
By the end of this guide, you'll understand:
- Why
.pipe()error handling is problematic - How errors propagate (or don't) through pipes
- The
pipeline()function and why it's superior - How to handle errors at each stage of a pipeline
- Resource cleanup and memory leak prevention
- Production-ready error handling patterns
- Building robust file processing applications
What We'll Explain Along the Way
We'll introduce these concepts with full explanations:
- Error propagation in streams
- Resource cleanup and memory leaks
- The finally block pattern
- Error recovery strategies
- Logging and monitoring patterns
The Problem: Why .pipe() Error Handling Fails
Let's start by understanding what goes wrong with the basic .pipe() approach. This is crucial—many developers think they're handling errors properly when they're actually leaving critical gaps.
The False Sense of Security
Here's what many developers write when they first learn about stream error handling:
import fs from "fs";
const readable = fs.createReadStream("input.txt");
const writable = fs.createWriteStream("output.txt");
// Attempt at error handling
readable.on("error", (err) => {
console.error("Read error:", err);
});
writable.on("error", (err) => {
console.error("Write error:", err);
});
// Pipe them together
readable.pipe(writable);
console.log("Started copying...");
This looks reasonable, right? We're listening for errors on both streams. But let's see what actually happens when things go wrong.
Experiment: What Really Happens?
Let's create a scenario where an error occurs and watch what happens:
import fs from "fs";
console.log("Creating streams...\n");
const readable = fs.createReadStream("input.txt");
const writable = fs.createWriteStream("output.txt");
// Track what's happening
let readableErrorFired = false;
let writableErrorFired = false;
let readableClosedFired = false;
let writableFinishedFired = false;
readable.on("error", (err) => {
readableErrorFired = true;
console.log("❌ Readable error:", err.message);
});
writable.on("error", (err) => {
writableErrorFired = true;
console.log("❌ Writable error:", err.message);
});
readable.on("close", () => {
readableClosedFired = true;
console.log("🔒 Readable closed");
});
writable.on("finish", () => {
writableFinishedFired = true;
console.log("✅ Writable finished");
});
// Start the pipe
readable.pipe(writable);
// Simulate an error after 100ms
setTimeout(() => {
console.log("\n💥 Simulating error...\n");
// Force a read error by destroying the stream
readable.destroy(new Error("Simulated read failure"));
// Wait a bit and check state
setTimeout(() => {
console.log("\n📊 Final State:");
console.log("- Readable error fired:", readableErrorFired);
console.log("- Writable error fired:", writableErrorFired);
console.log("- Readable closed:", readableClosedFired);
console.log("- Writable finished:", writableFinishedFired);
}, 100);
}, 100);
Output:
Creating streams...
Started copying...
💥 Simulating error...
❌ Readable error: Simulated read failure
🔒 Readable closed
📊 Final State:
- Readable error fired: true
- Writable error fired: false
- Readable closed: true
- Writable finished: false
What happened? The readable stream errored and closed, but:
- ❌ The writable stream doesn't know anything went wrong
- ❌ The writable stream is still open (potential memory leak!)
- ❌ The file might be partially written and corrupted
- ❌ No cleanup happened automatically
The Three Critical Problems with .pipe()
Let's explore each problem in detail:
Problem 1: Errors Don't Propagate
import fs from "fs";
const readable = fs.createReadStream("input.txt");
const writable = fs.createWriteStream("output.txt");
readable.on("error", (err) => {
console.error("Source failed:", err.message);
// Question: Does writable know about this error?
// Answer: NO! writable keeps running
});
writable.on("error", (err) => {
console.error("Destination failed:", err.message);
// Question: Does readable know about this error?
// Answer: NO! readable keeps running
});
readable.pipe(writable);
// If input.txt doesn't exist:
// Output: "Source failed: ENOENT: no such file or directory"
// But writable stream stays open, waiting for data that will never come
Why this is dangerous:
// Real-world scenario: Processing 1000 files
for (let i = 0; i < 1000; i++) {
const readable = fs.createReadStream(`file${i}.txt`);
const writable = fs.createWriteStream(`output${i}.txt`);
readable.pipe(writable);
// If file500.txt is missing:
// - 500 writable streams are left open
// - Memory leaks accumulate
// - Eventually: "Too many open files" error
// - Server crashes
}
Problem 2: No Automatic Cleanup
When an error occurs, streams don't clean up automatically:
import fs from "fs";
function copyFile(source: string, dest: string) {
const readable = fs.createReadStream(source);
const writable = fs.createWriteStream(dest);
readable.pipe(writable);
// If error occurs:
// - File handles stay open
// - Partial files remain on disk
// - No way to know operation failed
// - Resources leak
}
// Call it 100 times with bad files
for (let i = 0; i < 100; i++) {
copyFile("nonexistent.txt", `output${i}.txt`);
}
// Result: 100 leaked file handles!
// Operating system limit reached
// Application crashes
Problem 3: Partial Data Corruption
import fs from "fs";
const readable = fs.createReadStream("data.txt");
const writable = fs.createWriteStream("output.txt");
// Track what gets written
let bytesWritten = 0;
writable.on("write", () => {
bytesWritten += 65536;
});
readable.pipe(writable);
// Simulate error halfway through
setTimeout(() => {
readable.destroy(new Error("Connection lost"));
}, 500);
// Result:
// - output.txt contains partial data
// - File looks valid but is corrupted
// - No indication that copy was incomplete
// - Application thinks operation succeeded
Real-world impact:
// User uploads profile picture
const upload = fs.createReadStream(tempFile);
const storage = fs.createWriteStream("profiles/user123.jpg");
upload.pipe(storage);
// Network hiccup occurs halfway through upload
// Result: user123.jpg is corrupted
// User's profile shows broken image
// No error logged
// User has no idea what went wrong
The Manual Approach: Why It's Not Enough
You might think: "I'll just manually clean up when errors occur!" Let's try:
import fs from "fs";
const readable = fs.createReadStream("input.txt");
const writable = fs.createWriteStream("output.txt");
readable.on("error", (err) => {
console.error("Read error:", err.message);
// Manual cleanup attempt
writable.destroy();
readable.destroy();
});
writable.on("error", (err) => {
console.error("Write error:", err.message);
// Manual cleanup attempt
readable.destroy();
writable.destroy();
});
readable.pipe(writable);
Problems with this approach:
- Code duplication: Cleanup logic repeated in multiple places
- Easy to forget: What if you add a third stream? A fourth?
- Race conditions: What if both streams error simultaneously?
- Incomplete cleanup: What about removing partial files?
- No error propagation: How do you notify the caller?
// What about this scenario?
import zlib from "zlib";
const readable = fs.createReadStream("input.txt");
const gzip = zlib.createGzip();
const writable = fs.createWriteStream("output.txt.gz");
readable.pipe(gzip).pipe(writable);
// Now you need error handlers for THREE streams!
// Each needs to clean up the other two
// Nightmare to maintain!
This is where pipeline() comes to the rescue.
The Solution: The pipeline() Function
Node.js provides a built-in solution that handles all these problems automatically: the pipeline() function.
What pipeline() Does Differently
Let's see the same operation, but with pipeline():
import { pipeline } from "stream/promises";
import fs from "fs";
async function copyFile(source: string, dest: string): Promise<void> {
try {
await pipeline(fs.createReadStream(source), fs.createWriteStream(dest));
console.log("✅ Copy complete!");
} catch (err) {
console.error("❌ Copy failed:", err.message);
// All streams are automatically cleaned up
// All file handles are closed
// Error is properly propagated
}
}
// Usage
await copyFile("input.txt", "output.txt");
What pipeline() does automatically:
- Propagates errors: If any stream errors, all streams are informed
- Cleans up resources: All streams are properly closed/destroyed
- Returns a promise: Easy to use with async/await
- Handles backpressure: Just like
.pipe(), but safely - Works with any number of streams: 2 streams or 20, same code
Comparing .pipe() vs pipeline()
Let's see them side by side:
// ❌ Using .pipe() (problematic)
function copyWithPipe(source: string, dest: string) {
return new Promise((resolve, reject) => {
const readable = fs.createReadStream(source);
const writable = fs.createWriteStream(dest);
// Need to track both streams
let readableError: Error | null = null;
let writableError: Error | null = null;
readable.on("error", (err) => {
readableError = err;
writable.destroy();
reject(err);
});
writable.on("error", (err) => {
writableError = err;
readable.destroy();
reject(err);
});
writable.on("finish", () => {
if (!readableError && !writableError) {
resolve();
}
});
readable.pipe(writable);
});
}
// ✅ Using pipeline() (simple and safe)
async function copyWithPipeline(source: string, dest: string) {
await pipeline(fs.createReadStream(source), fs.createWriteStream(dest));
}
// Same functionality, but:
// - pipeline: 5 lines
// - pipe: 25+ lines
// - pipeline: Handles all edge cases
// - pipe: Easy to miss edge cases
How pipeline() Works Behind the Scenes
Let's understand what happens when you call pipeline():
import { pipeline } from "stream/promises";
import fs from "fs";
await pipeline(
fs.createReadStream("input.txt"), // Stream 1
fs.createWriteStream("output.txt") // Stream 2
);
// What pipeline() does internally (simplified):
// 1. Connect the streams (like .pipe())
stream1.pipe(stream2);
// 2. Set up error listeners on ALL streams
stream1.on("error", (err) => {
// Destroy all other streams
stream2.destroy(err);
// Propagate error
rejectPromise(err);
});
stream2.on("error", (err) => {
// Destroy all other streams
stream1.destroy(err);
// Propagate error
rejectPromise(err);
});
// 3. Set up completion listener
stream2.on("finish", () => {
// All streams completed successfully
resolvePromise();
});
// 4. Handle cleanup
// If promise is rejected or process exits,
// destroy all streams automatically
Visual flow:
// SUCCESS CASE:
// [Stream 1] → reads data → [Stream 2] → writes data
// ↓ ↓
// success finishes
// ↓ ↓
// └───────────────────────────→ resolvePromise()
// ERROR CASE:
// [Stream 1] → error occurs
// ↓
// error event
// ↓
// destroy Stream 2
// ↓
// rejectPromise(error)
// ↓
// cleanup all resources
Your First pipeline() Example
Let's build a complete working example with proper error handling:
import { pipeline } from "stream/promises";
import fs from "fs";
import path from "path";
/**
* Safely copies a file with full error handling
*/
async function safeCopyFile(source: string, dest: string): Promise<void> {
// Validate inputs
if (!source || !dest) {
throw new Error("Source and destination paths are required");
}
// Check source exists
try {
await fs.promises.access(source, fs.constants.R_OK);
} catch (err) {
throw new Error(`Source file not readable: ${source}`);
}
console.log(`📖 Reading: ${path.basename(source)}`);
console.log(`📝 Writing: ${path.basename(dest)}`);
try {
// The magic happens here
await pipeline(fs.createReadStream(source), fs.createWriteStream(dest));
console.log("✅ Copy successful!");
} catch (err) {
console.error("❌ Copy failed:", err.message);
// Clean up partial file if it exists
try {
await fs.promises.unlink(dest);
console.log("🧹 Cleaned up partial file");
} catch {
// File might not exist, that's okay
}
// Re-throw so caller knows operation failed
throw err;
}
}
// Usage example
async function main() {
try {
await safeCopyFile("large-file.txt", "backup.txt");
console.log("🎉 Operation complete!");
} catch (err) {
console.error("💥 Operation failed:", err.message);
process.exit(1);
}
}
main();
What this code demonstrates:
- Input validation before starting streams
- Existence check to fail fast
- Progress logging for visibility
- Automatic cleanup via pipeline()
- Manual partial file cleanup for data integrity
- Proper error propagation to caller
Output on success:
📖 Reading: large-file.txt
📝 Writing: backup.txt
✅ Copy successful!
🎉 Operation complete!
Output on failure (source doesn't exist):
💥 Operation failed: Source file not readable: large-file.txt
Output on failure (disk full mid-copy):
📖 Reading: large-file.txt
📝 Writing: backup.txt
❌ Copy failed: ENOSPC: no space left on device
🧹 Cleaned up partial file
💥 Operation failed: ENOSPC: no space left on device
See how pipeline() makes error handling straightforward and reliable?
Understanding Error Propagation
Now that you've seen pipeline() in action, let's understand exactly how errors flow through a pipeline. This knowledge helps you debug issues and build more sophisticated error handling.
Single Stream Error
Let's trace what happens when one stream in a pipeline fails:
import { pipeline } from "stream/promises";
import fs from "fs";
// Create a function that logs everything
async function copyWithLogging(source: string, dest: string) {
const readable = fs.createReadStream(source);
const writable = fs.createWriteStream(dest);
// Add detailed logging
readable.on("error", (err) => {
console.log("1️⃣ Readable error event:", err.message);
});
readable.on("close", () => {
console.log("2️⃣ Readable closed");
});
writable.on("error", (err) => {
console.log("3️⃣ Writable error event:", err.message);
});
writable.on("close", () => {
console.log("4️⃣ Writable closed");
});
try {
console.log("▶️ Starting pipeline...\n");
await pipeline(readable, writable);
console.log("\n✅ Pipeline completed successfully");
} catch (err) {
console.log("\n5️⃣ Pipeline rejected with:", err.message);
}
}
// Test with non-existent file
await copyWithLogging("does-not-exist.txt", "output.txt");
Output:
▶️ Starting pipeline...
1️⃣ Readable error event: ENOENT: no such file or directory
2️⃣ Readable closed
4️⃣ Writable closed
5️⃣ Pipeline rejected with: ENOENT: no such file or directory
What happened step by step:
// Timeline of events:
// T+0ms: pipeline() called
// - Creates internal promise
// - Sets up error handlers on both streams
// - Pipes readable → writable
// T+1ms: Readable tries to open file
// - File doesn't exist
// - Error thrown internally
// T+2ms: Readable emits 'error' event
// - Our handler logs: "Readable error event"
// - pipeline()'s handler receives same error
// T+3ms: pipeline() calls readable.destroy()
// - Readable closes immediately
// - Our handler logs: "Readable closed"
// T+4ms: pipeline() calls writable.destroy()
// - Writable closes (never got to write anything)
// - Our handler logs: "Writable closed"
// T+5ms: pipeline() rejects promise
// - catch block executes
// - Our handler logs: "Pipeline rejected"
// Result: All streams cleaned up, error propagated
Multiple Stream Pipeline
Now let's see what happens with more than two streams:
import { pipeline } from "stream/promises";
import fs from "fs";
import zlib from "zlib";
import { Transform } from "stream";
// Create a simple transform that converts to uppercase
class UpperCaseTransform extends Transform {
_transform(chunk: Buffer, encoding: string, callback: Function): void {
this.push(chunk.toString().toUpperCase());
callback();
}
}
async function complexPipeline() {
const readable = fs.createReadStream("input.txt");
const uppercase = new UpperCaseTransform();
const gzip = zlib.createGzip();
const writable = fs.createWriteStream("output.txt.gz");
// Log errors from each stream
readable.on("error", () => console.log("❌ Readable failed"));
uppercase.on("error", () => console.log("❌ Transform failed"));
gzip.on("error", () => console.log("❌ Gzip failed"));
writable.on("error", () => console.log("❌ Writable failed"));
// Log cleanup
readable.on("close", () => console.log("🔒 Readable closed"));
uppercase.on("close", () => console.log("🔒 Transform closed"));
gzip.on("close", () => console.log("🔒 Gzip closed"));
writable.on("close", () => console.log("🔒 Writable closed"));
try {
console.log("▶️ Starting 4-stream pipeline...\n");
await pipeline(readable, uppercase, gzip, writable);
console.log("\n✅ All streams completed");
} catch (err) {
console.log("\n💥 Pipeline failed:", err.message);
}
}
await complexPipeline();
If input.txt doesn't exist, output:
▶️ Starting 4-stream pipeline...
❌ Readable failed
🔒 Readable closed
🔒 Transform closed
🔒 Gzip closed
🔒 Writable closed
💥 Pipeline failed: ENOENT: no such file or directory
Key insight: When any stream fails, pipeline() immediately:
- Destroys ALL streams (not just the failed one)
- Closes all resources
- Rejects the promise with the original error
- Prevents partial data corruption
Error at Different Stages
Let's experiment with errors occurring at different points:
import { pipeline } from "stream/promises";
import fs from "fs";
import { Transform } from "stream";
// Transform that fails after processing some data
class FailAfterNTransform extends Transform {
private count = 0;
private failAfter: number;
constructor(failAfter: number) {
super();
this.failAfter = failAfter;
}
_transform(chunk: Buffer, encoding: string, callback: Function): void {
this.count++;
if (this.count > this.failAfter) {
// Simulate processing error
callback(new Error(`Failed at chunk ${this.count}`));
return;
}
console.log(`✅ Processed chunk ${this.count}`);
this.push(chunk);
callback();
}
}
async function testErrorTiming() {
console.log("Test: Error after 3 chunks\n");
try {
await pipeline(
fs.createReadStream("input.txt", { highWaterMark: 100 }),
new FailAfterNTransform(3),
fs.createWriteStream("output.txt")
);
} catch (err) {
console.log("\n💥 Caught error:", err.message);
// Check what got written
const written = await fs.promises.readFile("output.txt", "utf8");
console.log(`📊 Bytes written before error: ${written.length}`);
}
}
await testErrorTiming();
Output:
Test: Error after 3 chunks
✅ Processed chunk 1
✅ Processed chunk 2
✅ Processed chunk 3
💥 Caught error: Failed at chunk 4
📊 Bytes written before error: 300
Important observation: Some data was written before the error occurred. This is why you should:
- Delete partial files in the error handler
- Use transactions for critical operations
- Validate output after pipeline completes
- Log partial progress for debugging
Handling Errors at Each Stage
Sometimes you need different behavior depending on which stream fails. Let's discover how to handle errors for specific streams while still using pipeline().
Stream-Specific Error Handling
import { pipeline } from "stream/promises";
import fs from "fs";
async function copyWithSpecificHandling(
source: string,
dest: string
): Promise<void> {
const readable = fs.createReadStream(source);
const writable = fs.createWriteStream(dest);
// Track which stream failed
let failedStream: "readable" | "writable" | null = null;
// Listen to errors (but don't handle cleanup)
readable.on("error", (err) => {
failedStream = "readable";
console.log("📖 Source error:", err.message);
});
writable.on("error", (err) => {
failedStream = "writable";
console.log("📝 Destination error:", err.message);
});
try {
// pipeline() still handles cleanup
await pipeline(readable, writable);
console.log("✅ Copy successful");
} catch (err) {
// Now we know which stream failed
if (failedStream === "readable") {
console.error("💥 Could not read source file");
console.error("💡 Tip: Check file exists and is readable");
} else if (failedStream === "writable") {
console.error("💥 Could not write destination file");
console.error("💡 Tip: Check disk space and permissions");
// Clean up partial file
try {
await fs.promises.unlink(dest);
console.log("🧹 Removed partial file");
} catch {
// Already deleted or never created
}
}
throw err; // Re-throw for caller
}
}
// Test with different error scenarios
console.log("Test 1: Missing source file\n");
try {
await copyWithSpecificHandling("missing.txt", "output.txt");
} catch (err) {
console.log("Caught:", err.message);
}
console.log("\n---\n");
console.log("Test 2: Write to read-only location\n");
try {
await copyWithSpecificHandling("input.txt", "/root/output.txt");
} catch (err) {
console.log("Caught:", err.message);
}
Output:
Test 1: Missing source file
📖 Source error: ENOENT: no such file or directory
💥 Could not read source file
💡 Tip: Check file exists and is readable
Caught: ENOENT: no such file or directory
---
Test 2: Write to read-only location
📝 Destination error: EACCES: permission denied
💥 Could not write destination file
💡 Tip: Check disk space and permissions
🧹 Removed partial file
Caught: EACCES: permission denied
Retry Logic for Specific Errors
Sometimes you want to retry an operation if it fails temporarily:
import { pipeline } from "stream/promises";
import fs from "fs";
async function copyWithRetry(
source: string,
dest: string,
maxRetries: number = 3
): Promise<void> {
let attempt = 0;
while (attempt < maxRetries) {
attempt++;
console.log(`📌 Attempt ${attempt}/${maxRetries}`);
try {
await pipeline(fs.createReadStream(source), fs.createWriteStream(dest));
console.log("✅ Success!");
return; // Success, exit function
} catch (err) {
console.error(`❌ Attempt ${attempt} failed:`, err.message);
// Check if error is retryable
const isRetryable =
err.code === "EBUSY" || // File busy
err.code === "EAGAIN" || // Resource temporarily unavailable
err.code === "ETIMEDOUT"; // Timeout
if (!isRetryable) {
console.log("💥 Non-retryable error, giving up");
throw err;
}
if (attempt >= maxRetries) {
console.log("💥 Max retries reached, giving up");
throw err;
}
// Wait before retrying (exponential backoff)
const waitTime = Math.pow(2, attempt) * 1000; // 2s, 4s, 8s...
console.log(`⏳ Waiting ${waitTime}ms before retry...\n`);
await new Promise((resolve) => setTimeout(resolve, waitTime));
}
}
}
// Usage
try {
await copyWithRetry("input.txt", "output.txt", 3);
} catch (err) {
console.error("Final error:", err.message);
}
Output (simulated busy file):
📌 Attempt 1/3
❌ Attempt 1 failed: EBUSY: resource busy or locked
⏳ Waiting 2000ms before retry...
📌 Attempt 2/3
❌ Attempt 2 failed: EBUSY: resource busy or locked
⏳ Waiting 4000ms before retry...
📌 Attempt 3/3
✅ Success!
Graceful Degradation
Sometimes you want to continue processing even if one operation fails:
import { pipeline } from "stream/promises";
import fs from "fs";
async function copyMultipleFiles(
files: Array<{ source: string; dest: string }>
): Promise<{
succeeded: string[];
failed: Array<{ file: string; error: string }>;
}> {
const results = {
succeeded: [] as string[],
failed: [] as Array<{ file: string; error: string }>,
};
console.log(`📋 Processing ${files.length} files...\n`);
for (const { source, dest } of files) {
try {
console.log(`▶️ Copying: ${source}`);
await pipeline(fs.createReadStream(source), fs.createWriteStream(dest));
console.log(`✅ Success: ${dest}\n`);
results.succeeded.push(dest);
} catch (err) {
console.log(`❌ Failed: ${err.message}\n`);
results.failed.push({
file: source,
error: err.message,
});
// Continue with next file instead of stopping
// pipeline() already cleaned up this file's resources
}
}
return results;
}
// Usage
const files = [
{ source: "file1.txt", dest: "backup1.txt" },
{ source: "missing.txt", dest: "backup2.txt" },
{ source: "file3.txt", dest: "backup3.txt" },
];
const results = await copyMultipleFiles(files);
console.log("\n📊 Final Results:");
console.log(`✅ Succeeded: ${results.succeeded.length}`);
console.log(`❌ Failed: ${results.failed.length}`);
if (results.failed.length > 0) {
console.log("\nFailed files:");
results.failed.forEach(({ file, error }) => {
console.log(` - ${file}: ${error}`);
});
}
Output:
📋 Processing 3 files...
▶️ Copying: file1.txt
✅ Success: backup1.txt
▶️ Copying: missing.txt
❌ Failed: ENOENT: no such file or directory
▶️ Copying: file3.txt
✅ Success: backup3.txt
📊 Final Results:
✅ Succeeded: 2
❌ Failed: 1
Failed files:
- missing.txt: ENOENT: no such file or directory
Resource Cleanup and Memory Leaks
Understanding resource cleanup is crucial for production applications. Let's explore what can go wrong and how pipeline() prevents it.
What Are Resource Leaks?
A resource leak happens when your program allocates resources (file handles, memory, network connections) but doesn't release them. Let's see this in action:
import fs from "fs";
// ❌ BAD: Creates resource leaks
function leakyFileProcessor() {
for (let i = 0; i < 1000; i++) {
const readable = fs.createReadStream("input.txt");
const writable = fs.createWriteStream(`output${i}.txt`);
readable.pipe(writable);
// If ANY of these fail:
// - File handles stay open
// - Memory not released
// - Eventually: "Too many open files" error
}
console.log("Started processing 1000 files");
// But we have no way to know which ones failed!
}
// Monitor open file handles
console.log("File handles before:", process._getActiveHandles().length);
leakyFileProcessor();
console.log("File handles after:", process._getActiveHandles().length);
// Leaked handles still open!
What happens:
// Operating system has limits:
// - Max open files per process: ~1024 (Linux)
// - Each stream opens a file handle
// - Leaked handles count against this limit
// After running leaky code:
// Open handles: 2000+ (1000 readable + 1000 writable)
// System limit: 1024
// Result: Error "EMFILE: too many open files"
How pipeline() Prevents Leaks
Let's compare the same operation with proper cleanup:
import { pipeline } from "stream/promises";
import fs from "fs";
// ✅ GOOD: No resource leaks
async function safeFileProcessor() {
const results = { success: 0, failed: 0 };
for (let i = 0; i < 1000; i++) {
try {
await pipeline(
fs.createReadStream("input.txt"),
fs.createWriteStream(`output${i}.txt`)
);
results.success++;
} catch (err) {
results.failed++;
// pipeline() already cleaned up!
// File handles closed automatically
}
}
console.log(`✅ Succeeded: ${results.success}`);
console.log(`❌ Failed: ${results.failed}`);
}
// Monitor resource usage
const before = process._getActiveHandles().length;
await safeFileProcessor();
const after = process._getActiveHandles().length;
console.log(`\nFile handles before: ${before}`);
console.log(`File handles after: ${after}`);
console.log(`Leaked: ${after - before}`); // Should be 0!
Output:
✅ Succeeded: 1000
❌ Failed: 0
File handles before: 8
File handles after: 8
Leaked: 0
Key insight: pipeline() guarantees cleanup even when errors occur!
Demonstrating Leak Prevention
Let's create a scenario that would leak with .pipe() but not with pipeline():
import { pipeline } from "stream/promises";
import fs from "fs";
import { Transform } from "stream";
// Transform that intentionally fails
class FailingTransform extends Transform {
_transform(chunk: Buffer, encoding: string, callback: Function): void {
// Fail immediately
callback(new Error("Transform failed"));
}
}
// Test 1: Using .pipe() (LEAKS)
function testWithPipe() {
console.log("Test 1: Using .pipe()");
const readable = fs.createReadStream("input.txt");
const transform = new FailingTransform();
const writable = fs.createWriteStream("output.txt");
readable.pipe(transform).pipe(writable);
// Error handler only on last stream
writable.on("error", (err) => {
console.log("❌ Error caught:", err.message);
});
// Check resources after a delay
setTimeout(() => {
console.log("Readable destroyed?", readable.destroyed); // false!
console.log("Transform destroyed?", transform.destroyed); // false!
console.log("Writable destroyed?", writable.destroyed); // true
console.log("Result: RESOURCE LEAK!\n");
}, 100);
}
// Test 2: Using pipeline() (NO LEAKS)
async function testWithPipeline() {
console.log("Test 2: Using pipeline()");
const readable = fs.createReadStream("input.txt");
const transform = new FailingTransform();
const writable = fs.createWriteStream("output.txt");
try {
await pipeline(readable, transform, writable);
} catch (err) {
console.log("❌ Error caught:", err.message);
}
// Check resources immediately
console.log("Readable destroyed?", readable.destroyed); // true!
console.log("Transform destroyed?", transform.destroyed); // true!
console.log("Writable destroyed?", writable.destroyed); // true!
console.log("Result: NO LEAK!\n");
}
// Run both tests
testWithPipe();
await new Promise((resolve) => setTimeout(resolve, 200));
await testWithPipeline();
Output:
Test 1: Using .pipe()
❌ Error caught: Transform failed
Readable destroyed? false
Transform destroyed? false
Writable destroyed? true
Result: RESOURCE LEAK!
Test 2: Using pipeline()
❌ Error caught: Transform failed
Readable destroyed? true
Transform destroyed? true
Writable destroyed? true
Result: NO LEAK!
Critical difference:
.pipe(): Only the erroring stream and streams after it get destroyedpipeline(): ALL streams get destroyed, regardless of where the error occurred
Memory Leak Detection
Here's how to detect and prevent memory leaks in production:
import { pipeline } from "stream/promises";
import fs from "fs";
class ResourceMonitor {
private initialHandles: number;
private initialMemory: number;
constructor() {
this.initialHandles = process._getActiveHandles().length;
this.initialMemory = process.memoryUsage().heapUsed;
}
checkForLeaks(operationName: string): void {
const currentHandles = process._getActiveHandles().length;
const currentMemory = process.memoryUsage().heapUsed;
const handleLeak = currentHandles - this.initialHandles;
const memoryLeak = currentMemory - this.initialMemory;
console.log(`\n📊 Resource Check: ${operationName}`);
console.log(
`File handles: ${currentHandles} (${
handleLeak >= 0 ? "+" : ""
}${handleLeak})`
);
console.log(
`Memory: ${(currentMemory / 1024 / 1024).toFixed(2)} MB (${(
memoryLeak /
1024 /
1024
).toFixed(2)} MB)`
);
if (handleLeak > 0) {
console.log("⚠️ WARNING: Possible file handle leak!");
} else {
console.log("✅ No file handle leaks detected");
}
}
}
// Test a file operation
async function processFiles(count: number) {
const monitor = new ResourceMonitor();
for (let i = 0; i < count; i++) {
try {
await pipeline(
fs.createReadStream("input.txt"),
fs.createWriteStream(`output${i}.txt`)
);
} catch (err) {
// Errors are expected, pipeline() handles cleanup
}
}
// Force garbage collection if available
if (global.gc) {
global.gc();
}
monitor.checkForLeaks(`Processing ${count} files`);
}
// Run test
await processFiles(100);
Output:
📊 Resource Check: Processing 100 files
File handles: 8 (+0)
Memory: 15.23 MB (+2.34 MB)
✅ No file handle leaks detected
Production-Ready Error Handling Patterns
Now let's build real-world patterns you can use in production applications. These patterns combine everything we've learned into reusable, battle-tested solutions.
Pattern 1: File Processing with Comprehensive Error Handling
import { pipeline } from "stream/promises";
import fs from "fs";
import path from "path";
interface FileProcessorOptions {
onProgress?: (bytesProcessed: number) => void;
onError?: (error: Error, context: string) => void;
cleanupOnError?: boolean;
}
class FileProcessor {
private options: FileProcessorOptions;
constructor(options: FileProcessorOptions = {}) {
this.options = {
cleanupOnError: true,
...options,
};
}
async copy(source: string, dest: string): Promise<void> {
// Validate inputs
await this.validateInputs(source, dest);
// Track progress
let bytesProcessed = 0;
const readable = fs.createReadStream(source);
const writable = fs.createWriteStream(dest);
// Monitor progress
readable.on("data", (chunk) => {
bytesProcessed += chunk.length;
this.options.onProgress?.(bytesProcessed);
});
try {
// Execute pipeline
await pipeline(readable, writable);
// Verify output
await this.verifyOutput(source, dest);
} catch (err) {
// Log error with context
this.options.onError?.(err, `Copying ${source} to ${dest}`);
// Cleanup partial file
if (this.options.cleanupOnError) {
await this.cleanupPartialFile(dest);
}
// Re-throw for caller
throw new Error(`Failed to copy ${source}: ${err.message}`);
}
}
private async validateInputs(source: string, dest: string): Promise<void> {
// Check source exists
try {
await fs.promises.access(source, fs.constants.R_OK);
} catch {
throw new Error(`Source file not found or not readable: ${source}`);
}
// Check destination directory exists
const destDir = path.dirname(dest);
try {
await fs.promises.access(destDir, fs.constants.W_OK);
} catch {
throw new Error(`Destination directory not writable: ${destDir}`);
}
// Check we're not overwriting source
if (path.resolve(source) === path.resolve(dest)) {
throw new Error("Source and destination cannot be the same file");
}
}
private async verifyOutput(source: string, dest: string): Promise<void> {
const sourceStats = await fs.promises.stat(source);
const destStats = await fs.promises.stat(dest);
if (sourceStats.size !== destStats.size) {
throw new Error(
`File size mismatch: expected ${sourceStats.size}, got ${destStats.size}`
);
}
}
private async cleanupPartialFile(filePath: string): Promise<void> {
try {
await fs.promises.unlink(filePath);
console.log(`🧹 Cleaned up partial file: ${filePath}`);
} catch (err) {
// File might not exist, that's okay
if (err.code !== "ENOENT") {
console.warn(`Warning: Could not cleanup ${filePath}:`, err.message);
}
}
}
}
// Usage example
async function main() {
const processor = new FileProcessor({
onProgress: (bytes) => {
process.stdout.write(`\r📊 Processed: ${(bytes / 1024).toFixed(2)} KB`);
},
onError: (err, context) => {
console.error(`\n💥 Error during ${context}:`, err.message);
},
cleanupOnError: true,
});
try {
await processor.copy("large-file.txt", "backup.txt");
console.log("\n✅ File copied successfully!");
} catch (err) {
console.error("Operation failed:", err.message);
process.exit(1);
}
}
main();
Output on success:
📊 Processed: 1024.50 KB
✅ File copied successfully!
Output on error:
📊 Processed: 512.25 KB
💥 Error during Copying large-file.txt to backup.txt: ENOSPC: no space left on device
🧹 Cleaned up partial file: backup.txt
Operation failed: Failed to copy large-file.txt: ENOSPC: no space left on device
Pattern 2: Batch Processing with Error Recovery
import { pipeline } from "stream/promises";
import fs from "fs";
interface BatchResult {
file: string;
status: "success" | "failed" | "skipped";
error?: string;
duration?: number;
}
class BatchProcessor {
private maxRetries: number;
private concurrency: number;
constructor(maxRetries = 3, concurrency = 5) {
this.maxRetries = maxRetries;
this.concurrency = concurrency;
}
async processFiles(
files: Array<{ source: string; dest: string }>
): Promise<BatchResult[]> {
const results: BatchResult[] = [];
const queue = [...files];
const active: Promise<void>[] = [];
console.log(
`📋 Processing ${files.length} files with concurrency ${this.concurrency}\n`
);
while (queue.length > 0 || active.length > 0) {
// Start new tasks up to concurrency limit
while (active.length < this.concurrency && queue.length > 0) {
const file = queue.shift()!;
const task = this.processWithRetry(file, results);
active.push(task);
}
// Wait for at least one task to complete
if (active.length > 0) {
await Promise.race(active);
// Remove completed tasks
for (let i = active.length - 1; i >= 0; i--) {
if (await this.isSettled(active[i])) {
active.splice(i, 1);
}
}
}
}
return results;
}
private async processWithRetry(
file: { source: string; dest: string },
results: BatchResult[]
): Promise<void> {
const startTime = Date.now();
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
console.log(
`▶️ [${file.source}] Attempt ${attempt}/${this.maxRetries}`
);
await pipeline(
fs.createReadStream(file.source),
fs.createWriteStream(file.dest)
);
const duration = Date.now() - startTime;
console.log(`✅ [${file.source}] Success (${duration}ms)\n`);
results.push({
file: file.source,
status: "success",
duration,
});
return; // Success!
} catch (err) {
console.log(
`❌ [${file.source}] Attempt ${attempt} failed: ${err.message}`
);
if (attempt === this.maxRetries) {
// Final attempt failed
console.log(
`💥 [${file.source}] Giving up after ${this.maxRetries} attempts\n`
);
results.push({
file: file.source,
status: "failed",
error: err.message,
duration: Date.now() - startTime,
});
} else {
// Wait before retry
const waitTime = Math.pow(2, attempt) * 100;
console.log(
`⏳ [${file.source}] Waiting ${waitTime}ms before retry...\n`
);
await new Promise((resolve) => setTimeout(resolve, waitTime));
}
}
}
}
private async isSettled(promise: Promise<void>): Promise<boolean> {
try {
await Promise.race([
promise,
new Promise((_, reject) =>
setTimeout(() => reject(new Error("timeout")), 0)
),
]);
return true;
} catch {
return false;
}
}
printSummary(results: BatchResult[]): void {
const succeeded = results.filter((r) => r.status === "success").length;
const failed = results.filter((r) => r.status === "failed").length;
const totalDuration = results.reduce(
(sum, r) => sum + (r.duration || 0),
0
);
console.log("═".repeat(50));
console.log("📊 BATCH PROCESSING SUMMARY");
console.log("═".repeat(50));
console.log(`Total files: ${results.length}`);
console.log(`✅ Succeeded: ${succeeded}`);
console.log(`❌ Failed: ${failed}`);
console.log(`⏱️ Total time: ${totalDuration}ms`);
console.log(
`📈 Success rate: ${((succeeded / results.length) * 100).toFixed(1)}%`
);
if (failed > 0) {
console.log("\n❌ Failed files:");
results
.filter((r) => r.status === "failed")
.forEach((r) => console.log(` - ${r.file}: ${r.error}`));
}
console.log("═".repeat(50));
}
}
// Usage
async function main() {
const files = [
{ source: "file1.txt", dest: "backup/file1.txt" },
{ source: "file2.txt", dest: "backup/file2.txt" },
{ source: "missing.txt", dest: "backup/missing.txt" },
{ source: "file3.txt", dest: "backup/file3.txt" },
];
const processor = new BatchProcessor(3, 2); // 3 retries, 2 concurrent
const results = await processor.processFiles(files);
processor.printSummary(results);
}
main();
Output:
📋 Processing 4 files with concurrency 2
▶️ [file1.txt] Attempt 1/3
▶️ [file2.txt] Attempt 1/3
✅ [file1.txt] Success (45ms)
▶️ [missing.txt] Attempt 1/3
✅ [file2.txt] Success (52ms)
▶️ [file3.txt] Attempt 1/3
❌ [missing.txt] Attempt 1 failed: ENOENT: no such file or directory
⏳ [missing.txt] Waiting 200ms before retry...
✅ [file3.txt] Success (48ms)
▶️ [missing.txt] Attempt 2/3
❌ [missing.txt] Attempt 2 failed: ENOENT: no such file or directory
⏳ [missing.txt] Waiting 400ms before retry...
▶️ [missing.txt] Attempt 3/3
❌ [missing.txt] Attempt 3 failed: ENOENT: no such file or directory
💥 [missing.txt] Giving up after 3 attempts
══════════════════════════════════════════════════
📊 BATCH PROCESSING SUMMARY
══════════════════════════════════════════════════
Total files: 4
✅ Succeeded: 3
❌ Failed: 1
⏱️ Total time: 945ms
📈 Success rate: 75.0%
❌ Failed files:
- missing.txt: ENOENT: no such file or directory
══════════════════════════════════════════════════
Pattern 3: Logging and Monitoring
import { pipeline } from "stream/promises";
import fs from "fs";
interface LogEntry {
timestamp: string;
level: "info" | "warn" | "error";
operation: string;
details: any;
}
class PipelineLogger {
private logs: LogEntry[] = [];
log(level: LogEntry["level"], operation: string, details: any): void {
const entry: LogEntry = {
timestamp: new Date().toISOString(),
level,
operation,
details,
};
this.logs.push(entry);
// Console output with colors
const icon = level === "error" ? "❌" : level === "warn" ? "⚠️" : "ℹ️";
console.log(`${icon} [${entry.timestamp}] ${operation}:`, details);
}
async saveLogs(filepath: string): Promise<void> {
const content = JSON.stringify(this.logs, null, 2);
await fs.promises.writeFile(filepath, content);
}
getErrorCount(): number {
return this.logs.filter((l) => l.level === "error").length;
}
getWarningCount(): number {
return this.logs.filter((l) => l.level === "warn").length;
}
}
async function monitoredCopy(
source: string,
dest: string,
logger: PipelineLogger
): Promise<void> {
const operationId = `copy-${Date.now()}`;
logger.log("info", "Operation started", {
id: operationId,
source,
dest,
});
const startTime = Date.now();
let bytesProcessed = 0;
const readable = fs.createReadStream(source);
const writable = fs.createWriteStream(dest);
// Monitor events
readable.on("open", (fd) => {
logger.log("info", "Source opened", { operationId, fd });
});
readable.on("data", (chunk) => {
bytesProcessed += chunk.length;
});
writable.on("open", (fd) => {
logger.log("info", "Destination opened", { operationId, fd });
});
try {
await pipeline(readable, writable);
const duration = Date.now() - startTime;
logger.log("info", "Operation completed", {
operationId,
bytesProcessed,
duration,
throughput: `${(bytesProcessed / duration).toFixed(2)} bytes/ms`,
});
} catch (err) {
const duration = Date.now() - startTime;
logger.log("error", "Operation failed", {
operationId,
error: err.message,
code: err.code,
bytesProcessed,
duration,
});
throw err;
}
}
// Usage
async function main() {
const logger = new PipelineLogger();
try {
await monitoredCopy("input.txt", "output.txt", logger);
await monitoredCopy("file2.txt", "output2.txt", logger);
} catch (err) {
// Errors already logged
} finally {
// Save logs to file
await logger.saveLogs("pipeline-logs.json");
console.log(`\n📊 Session summary:`);
console.log(` Errors: ${logger.getErrorCount()}`);
console.log(` Warnings: ${logger.getWarningCount()}`);
console.log(` Logs saved to: pipeline-logs.json`);
}
}
main();
Output:
ℹ️ [2024-10-15T10:30:00.123Z] Operation started: { id: 'copy-1697365800123', source: 'input.txt', dest: 'output.txt' }
ℹ️ [2024-10-15T10:30:00.125Z] Source opened: { operationId: 'copy-1697365800123', fd: 20 }
ℹ️ [2024-10-15T10:30:00.127Z] Destination opened: { operationId: 'copy-1697365800123', fd: 21 }
ℹ️ [2024-10-15T10:30:00.250Z] Operation completed: { operationId: 'copy-1697365800123', bytesProcessed: 1048576, duration: 127, throughput: '8256.79 bytes/ms' }
ℹ️ [2024-10-15T10:30:00.251Z] Operation started: { id: 'copy-1697365800251', source: 'file2.txt', dest: 'output2.txt' }
❌ [2024-10-15T10:30:00.253Z] Operation failed: { operationId: 'copy-1697365800251', error: 'ENOENT: no such file or directory', code: 'ENOENT', bytesProcessed: 0, duration: 2 }
📊 Session summary:
Errors: 1
Warnings: 0
Logs saved to: pipeline-logs.json
Common Misconceptions
Let's address some false beliefs that trip up developers.
❌ Misconception: "Adding error listeners to both streams is enough"
Reality: Error listeners don't clean up the other stream or stop data flow.
Why this matters: Without proper cleanup, you get resource leaks and corrupted data.
Example of the problem:
// ❌ Seems safe but isn't
const readable = fs.createReadStream("input.txt");
const writable = fs.createWriteStream("output.txt");
readable.on("error", (err) => console.error("Read error:", err));
writable.on("error", (err) => console.error("Write error:", err));
readable.pipe(writable);
// If readable errors:
// - writable keeps waiting for data (resource leak)
// - writable's file handle stays open (memory leak)
// - No indication that operation failed
Correct approach:
// ✅ Use pipeline() instead
await pipeline(
fs.createReadStream("input.txt"),
fs.createWriteStream("output.txt")
);
// All cleanup happens automatically
❌ Misconception: "pipeline() makes streams synchronous"
Reality: pipeline() is still asynchronous—it just returns a promise.
Why this matters: You still need to await or .then() the result.
Example:
// ❌ Wrong: Not waiting for completion
pipeline(fs.createReadStream("input.txt"), fs.createWriteStream("output.txt"));
console.log("Done!"); // This prints IMMEDIATELY, not when done!
// ✅ Correct: Wait for completion
await pipeline(
fs.createReadStream("input.txt"),
fs.createWriteStream("output.txt")
);
console.log("Done!"); // This prints when actually done
❌ Misconception: "pipeline() is only for error handling"
Reality: pipeline() also handles backpressure, cleanup, and proper stream chaining.
Why this matters: It's the complete solution for production stream pipelines.
What pipeline() does:
// pipeline() handles:
// 1. ✅ Error propagation
// 2. ✅ Resource cleanup
// 3. ✅ Backpressure management
// 4. ✅ Proper stream chaining
// 5. ✅ Promise-based API
// 6. ✅ Works with async/await
// .pipe() only handles:
// 1. ✅ Backpressure management
// 2. ❌ No automatic error handling
// 3. ❌ No automatic cleanup
// 4. ❌ No promise support
❌ Misconception: "I can mix .pipe() and pipeline() in the same chain"
Reality: Pick one approach—mixing them defeats the purpose of pipeline().
Why this matters: Mixing approaches creates gaps in error handling.
Example of the problem:
// ❌ Wrong: Mixed approach
const readable = fs.createReadStream("input.txt");
const transform = createSomeTransform();
const writable = fs.createWriteStream("output.txt");
readable.pipe(transform); // Using .pipe()
await pipeline(transform, writable); // Using pipeline()
// Problem: If readable errors, pipeline() doesn't know!
// Readable isn't cleaned up by pipeline()
Correct approach:
// ✅ Use pipeline() for entire chain
await pipeline(
fs.createReadStream("input.txt"),
createSomeTransform(),
fs.createWriteStream("output.txt")
);
Check Your Understanding
Let's test what you've learned!
Quick Quiz
1. What's wrong with this code?
const readable = fs.createReadStream("input.txt");
const writable = fs.createWriteStream("output.txt");
readable.on("error", (err) => console.error(err));
writable.on("error", (err) => console.error(err));
readable.pipe(writable);
Show Answer
Multiple problems:
- No cleanup: When readable errors, writable stays open (resource leak)
- No error propagation: Errors don't notify the other stream
- No completion handling: No way to know when operation finishes
- Partial data: If error occurs mid-stream, partial file remains
Better approach:
try {
await pipeline(
fs.createReadStream("input.txt"),
fs.createWriteStream("output.txt")
);
console.log("Success!");
} catch (err) {
console.error("Failed:", err);
// All streams automatically cleaned up
}
2. True or False: pipeline() only works with two streams
Show Answer
False!
pipeline() works with any number of streams:
// 2 streams
await pipeline(readable, writable);
// 3 streams
await pipeline(readable, transform, writable);
// 5 streams
await pipeline(readable, transform1, transform2, transform3, writable);
// All get proper error handling and cleanup!
The number doesn't matter—pipeline() handles all of them correctly.
3. When would you use .pipe() instead of pipeline()?
Show Answer
Almost never in production code!
The only valid use cases for .pipe():
- Learning/educational purposes - Understanding how piping works
- Quick one-off scripts - Where reliability doesn't matter
- Piping to response objects - In HTTP servers (though
pipeline()works here too)
For production code, always use pipeline():
// ❌ Don't do this in production
readable.pipe(writable);
// ✅ Do this instead
await pipeline(readable, writable);
The slight extra typing is worth it for:
- Automatic error handling
- Resource cleanup
- Promise-based API
- Production reliability
4. What happens if a stream in the middle of a pipeline errors?
Show Answer
All streams are destroyed and cleaned up automatically:
await pipeline(
stream1, // ✅ Will be destroyed
stream2, // 💥 This one errors
stream3, // ✅ Will be destroyed
stream4 // ✅ Will be destroyed
);
Timeline:
- stream2 encounters error
pipeline()catches the error- Calls
.destroy()on ALL streams (1, 2, 3, 4) - All file handles/resources released
- Promise rejected with the error
- Catch block executes
Why this matters: No matter where the error occurs, nothing leaks!
Hands-On Exercise
Challenge: Create a robust file copy function that:
- Uses
pipeline()for safety - Validates inputs before starting
- Shows progress during copy
- Cleans up partial files on error
- Returns detailed results
Starter Code:
import { pipeline } from "stream/promises";
import fs from "fs";
interface CopyResult {
success: boolean;
bytesProcessed: number;
duration: number;
error?: string;
}
async function robustCopy(source: string, dest: string): Promise<CopyResult> {
// TODO: Implement this function
throw new Error("Not implemented");
}
// Test it:
const result = await robustCopy("test.txt", "test-copy.txt");
console.log(result);
Solution:
Show Solution
import { pipeline } from "stream/promises";
import fs from "fs";
import path from "path";
interface CopyResult {
success: boolean;
bytesProcessed: number;
duration: number;
error?: string;
}
async function robustCopy(source: string, dest: string): Promise<CopyResult> {
const startTime = Date.now();
let bytesProcessed = 0;
// Step 1: Validate inputs
try {
// Check source exists and is readable
await fs.promises.access(source, fs.constants.R_OK);
// Check destination directory exists
const destDir = path.dirname(dest);
await fs.promises.access(destDir, fs.constants.W_OK);
// Prevent overwriting source
if (path.resolve(source) === path.resolve(dest)) {
throw new Error("Source and destination cannot be the same");
}
} catch (err) {
return {
success: false,
bytesProcessed: 0,
duration: Date.now() - startTime,
error: `Validation failed: ${err.message}`,
};
}
// Step 2: Create streams
const readable = fs.createReadStream(source);
const writable = fs.createWriteStream(dest);
// Step 3: Track progress
readable.on("data", (chunk) => {
bytesProcessed += chunk.length;
const sizeMB = (bytesProcessed / 1024 / 1024).toFixed(2);
process.stdout.write(`\r📊 Copied: ${sizeMB} MB`);
});
try {
// Step 4: Execute pipeline
await pipeline(readable, writable);
const duration = Date.now() - startTime;
console.log(`\n✅ Copy complete!`);
// Step 5: Verify output size matches
const sourceStats = await fs.promises.stat(source);
const destStats = await fs.promises.stat(dest);
if (sourceStats.size !== destStats.size) {
throw new Error(
`Size mismatch: expected ${sourceStats.size}, got ${destStats.size}`
);
}
return {
success: true,
bytesProcessed,
duration,
};
} catch (err) {
const duration = Date.now() - startTime;
console.log(`\n❌ Copy failed: ${err.message}`);
// Step 6: Clean up partial file
try {
await fs.promises.unlink(dest);
console.log("🧹 Cleaned up partial file");
} catch {
// File might not exist
}
return {
success: false,
bytesProcessed,
duration,
error: err.message,
};
}
}
// Test the function
async function test() {
// Create a test file
await fs.promises.writeFile("test.txt", "Hello, World!\n".repeat(10000));
console.log("Test 1: Successful copy\n");
const result1 = await robustCopy("test.txt", "test-copy.txt");
console.log("Result:", result1);
console.log("\n---\n");
console.log("Test 2: Non-existent source\n");
const result2 = await robustCopy("missing.txt", "output.txt");
console.log("Result:", result2);
console.log("\n---\n");
console.log("Test 3: Same source and destination\n");
const result3 = await robustCopy("test.txt", "test.txt");
console.log("Result:", result3);
}
test();
Output:
Test 1: Successful copy
📊 Copied: 0.13 MB
✅ Copy complete!
Result: {
success: true,
bytesProcessed: 140000,
duration: 45
}
---
Test 2: Non-existent source
Result: {
success: false,
bytesProcessed: 0,
duration: 2,
error: 'Validation failed: ENOENT: no such file or directory'
}
---
Test 3: Same source and destination
Result: {
success: false,
bytesProcessed: 0,
duration: 1,
error: 'Validation failed: Source and destination cannot be the same'
}
Key points in the solution:
- Early validation - Fail fast before creating streams
- Progress tracking - User feedback during operation
- pipeline() for safety - Automatic error handling and cleanup
- Size verification - Ensure complete copy
- Partial file cleanup - Maintain data integrity
- Detailed results - Caller knows exactly what happened
- Multiple error scenarios - Handles all failure cases gracefully
Summary: Key Takeaways
Let's review what we've discovered about error handling in stream pipes:
The Problem with .pipe():
- ❌ Errors don't propagate between streams
- ❌ No automatic resource cleanup
- ❌ Easy to create memory leaks
- ❌ Partial data corruption possible
- ❌ Requires complex manual error handling
The Solution: pipeline():
- ✅ Automatic error propagation to all streams
- ✅ Guaranteed resource cleanup on errors
- ✅ Prevents memory leaks automatically
- ✅ Promise-based API (works with async/await)
- ✅ Works with any number of streams
- ✅ Production-ready reliability
Essential Syntax:
import { pipeline } from 'stream/promises';
// Basic usage
await pipeline(
readableStream,
writableStream
);
// With transforms
await pipeline(
readableStream,
transformStream1,
transformStream2,
writableStream
);
// With error handling
try {
await pipeline(streams...);
console.log('Success!');
} catch (err) {
console.error('Failed:', err);
// All streams automatically cleaned up
}
Production Patterns:
- Always validate inputs before creating streams
- Track progress for user feedback
- Clean up partial files on errors
- Log operations for debugging
- Use retry logic for transient failures
- Implement graceful degradation for batch operations
Remember:
- Use
pipeline()for ALL production stream operations - Never mix
.pipe()andpipeline()in the same chain - Always handle the promise (await or .then/.catch)
- Clean up partial output on errors
- Validate inputs before starting streams
Additional Resources
Official Documentation:
- Node.js Stream Pipeline API - Official pipeline documentation
- Error Handling in Node.js - General error handling patterns
Community Resources:
- Stream Error Handling Best Practices - Node.js official guide
- pump module - Alternative pipeline implementation
Debugging Tools:
node --trace-warnings- Show full stack traces for warningsprocess.on('uncaughtException')- Catch unhandled stream errors- Memory profiling tools - Detect resource leaks
Version Information
Tested with:
- Node.js: v18.x, v20.x, v22.x
- Works in: Node.js environment only (streams are not available in browsers)
API Compatibility:
stream.pipeline(): Available since Node.js v10.0.0stream/promises: Available since Node.js v15.0.0- Promise-based pipeline: Stable and recommended for all new code
Migration Guide:
// Old approach (Node.js < 15)
const { promisify } = require("util");
const pipeline = promisify(require("stream").pipeline);
// New approach (Node.js >= 15)
import { pipeline } from "stream/promises";