Handling Backpressure: Building Reliable Stream Processing
Remember learning to drive? Understanding what the brake pedal does is one thing. Actually knowing when and how to use it smoothly in trafficβthat's where the real skill comes in.
The same is true for backpressure. In the previous article, we discovered what backpressure is and why it matters. Today, we're going to master the practical skills: when to pause, when to resume, how to monitor buffer state, and how to build production-ready stream processing that never crashes.
Let's roll up our sleeves and build some reliable code together.
Quick Referenceβ
The Complete Backpressure Pattern:
const readStream = fs.createReadStream(source);
const writeStream = fs.createWriteStream(destination);
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) readStream.pause();
});
writeStream.on("drain", () => {
readStream.resume();
});
readStream.on("end", () => writeStream.end());
When to use this pattern:
- Processing files larger than available memory
- Network streaming with slow connections
- Any time data arrives faster than you can process it
Key insight:
π― The drain event is your green light to resumeβit means the buffer has cleared enough space
What You Need to Know Firstβ
Required reading:
- Writable Streams: Managing Data Flow with Backpressure - You must understand what backpressure is and why it exists
You should be comfortable with:
- Node.js event handling (
on,once,emit) - Async patterns in JavaScript
- Basic error handling with try-catch
If you haven't read the foundation article, please start there. This article builds directly on those concepts.
What We'll Cover in This Articleβ
By the end of this guide, you'll be able to:
- Implement the complete backpressure pattern correctly
- Use the
drainevent to resume data flow - Monitor buffer state in real-time
- Build production-ready stream processors
- Debug backpressure issues effectively
- Compare memory usage with and without backpressure
- Handle edge cases and errors gracefully
What We'll Explain Along the Wayβ
These concepts will be explained with detailed examples:
- The pause/resume mechanism (with visual timeline)
- Buffer state monitoring techniques
- Memory profiling during stream processing
- Error propagation between streams
- Resource cleanup patterns
The Drain Event: Your Signal to Resumeβ
Let's start by deeply understanding the most important event in backpressure handling: the drain event.
What the Drain Event Tells Youβ
When a writable stream's buffer fills up and write() returns false, you need to wait. But wait for what? The drain event!
Think of it like a parking lot:
- Buffer full (write returns false): "Sorry, parking lot is full!"
- Drain event fires: "Good news! Cars have left, spots available again!"
import fs from "fs";
const writeStream = fs.createWriteStream("output.txt", {
highWaterMark: 10, // Small buffer for demonstration
});
// Write until buffer is full
console.log("Writing data...");
writeStream.write("aaaaa"); // 5 bytes
writeStream.write("bbbbb"); // 5 bytes, now at 10 (full!)
const canContinue = writeStream.write("ccccc"); // 5 more bytes
console.log("Can write more?", canContinue); // false - buffer exceeded highWaterMark
// Now let's listen for when space becomes available
writeStream.on("drain", () => {
console.log("β
Drain event fired!");
console.log("Buffer has been processed, safe to write more");
console.log("Current buffer size:", writeStream.writableLength);
});
// The drain event will fire automatically when the stream
// has processed enough data to go below highWaterMark
When Exactly Does Drain Fire?β
Let's visualize the timeline of events:
import fs from "fs";
const writeStream = fs.createWriteStream("output.txt", {
highWaterMark: 5,
});
console.log("=== Timeline of Events ===\n");
// Time 0: Empty buffer
console.log("Time 0: Buffer empty");
console.log("Buffer size:", writeStream.writableLength); // 0
// Time 1: Write some data
writeStream.write("abc"); // 3 bytes
console.log('\nTime 1: After writing "abc"');
console.log("Buffer size:", writeStream.writableLength); // 3
// Time 2: Fill buffer to highWaterMark
const result = writeStream.write("de"); // 2 more bytes = 5 total
console.log('\nTime 2: After writing "de"');
console.log("Buffer size:", writeStream.writableLength); // 5
console.log("Can continue?", result); // false - at highWaterMark!
// Time 3: Stream processes data
console.log("\nTime 3: Stream is processing data...");
console.log("(Writing to disk happens in background)");
// Time 4: Drain event fires
writeStream.on("drain", () => {
console.log("\nTime 4: DRAIN EVENT FIRED! π");
console.log("Buffer size:", writeStream.writableLength); // Less than 5
console.log("Safe to write more data now");
});
// Close stream when done
setTimeout(() => {
writeStream.end();
}, 100);
Output:
=== Timeline of Events ===
Time 0: Buffer empty
Buffer size: 0
Time 1: After writing "abc"
Buffer size: 3
Time 2: After writing "de"
Buffer size: 5
Can continue? false
Time 3: Stream is processing data...
(Writing to disk happens in background)
Time 4: DRAIN EVENT FIRED! π
Buffer size: 0
Safe to write more data now
The key insight: The drain event fires when:
- The buffer was full (write() returned false)
- The stream has written enough data to disk
- The buffer size drops below highWaterMark
- It's now safe to write more data
Drain Event Best Practicesβ
Here are some important things I learned about using the drain event:
β DO: Register the listener before you start writing
const writeStream = fs.createWriteStream("output.txt");
// β
Register drain listener early
writeStream.on("drain", () => {
console.log("Ready for more data");
readStream.resume();
});
// Now start writing
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) readStream.pause();
});
β DON'T: Register the listener only after pause
// β Bad: Registering too late
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
readStream.pause();
// Registering here might miss the drain event!
writeStream.on("drain", () => readStream.resume());
}
});
β
DO: Use once if you only need to resume once
// If you pause once and only need to resume once
writeStream.once("drain", () => {
console.log("Resuming after single pause");
readStream.resume();
});
β DON'T: Register multiple drain listeners accidentally
// β Bad: Creates new listener on every pause
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
readStream.pause();
// This creates a NEW listener each time!
writeStream.on("drain", () => readStream.resume());
}
});
// After 10 pauses, you have 10 drain listeners!
// Each one will call resume(), causing issues
Implementing Complete Backpressure Handlingβ
Now let's build a complete, production-ready implementation step by step. We'll start simple and add features progressively.
Step 1: Basic Implementationβ
Let's begin with the minimal correct pattern:
import fs from "fs";
import path from "path";
// File paths
const sourceFile = path.join(__dirname, "large-input.txt");
const destFile = path.join(__dirname, "output.txt");
// Create streams
const readStream = fs.createReadStream(sourceFile);
const writeStream = fs.createWriteStream(destFile);
console.log("Starting file copy with backpressure handling...\n");
// Step 1: Handle incoming data
readStream.on("data", (chunk: Buffer) => {
// Try to write the chunk
const canContinue = writeStream.write(chunk);
// If buffer is full, pause reading
if (!canContinue) {
console.log("βΈοΈ Buffer full, pausing read stream");
readStream.pause();
}
});
// Step 2: Resume when buffer drains
writeStream.on("drain", () => {
console.log("β
Buffer drained, resuming read stream");
readStream.resume();
});
// Step 3: Close write stream when reading is done
readStream.on("end", () => {
console.log("π Reading complete, closing write stream");
writeStream.end();
});
// Step 4: Confirm completion
writeStream.on("finish", () => {
console.log("βοΈ Writing complete!");
});
This is the foundation. It works, but let's make it production-ready.
Step 2: Adding Progress Trackingβ
Let's add monitoring so we can see what's happening:
import fs from "fs";
import path from "path";
const sourceFile = path.join(__dirname, "large-input.txt");
const destFile = path.join(__dirname, "output.txt");
const readStream = fs.createReadStream(sourceFile);
const writeStream = fs.createWriteStream(destFile);
// Track statistics
let bytesRead = 0;
let bytesWritten = 0;
let chunkCount = 0;
let pauseCount = 0;
let drainCount = 0;
console.log("Starting monitored file copy...\n");
readStream.on("data", (chunk: Buffer) => {
chunkCount++;
bytesRead += chunk.length;
// Log every 10 chunks
if (chunkCount % 10 === 0) {
console.log(`π Read ${chunkCount} chunks (${bytesRead} bytes)`);
}
// Write with backpressure check
const canContinue = writeStream.write(chunk);
bytesWritten += chunk.length;
if (!canContinue) {
pauseCount++;
console.log(`βΈοΈ Pause #${pauseCount} at ${bytesRead} bytes`);
console.log(` Buffer size: ${writeStream.writableLength} bytes`);
readStream.pause();
}
});
writeStream.on("drain", () => {
drainCount++;
console.log(`β
Drain #${drainCount} - Buffer cleared`);
console.log(` Buffer size: ${writeStream.writableLength} bytes`);
readStream.resume();
});
readStream.on("end", () => {
console.log("\n=== Final Statistics ===");
console.log(`Total bytes: ${bytesRead}`);
console.log(`Total chunks: ${chunkCount}`);
console.log(`Times paused: ${pauseCount}`);
console.log(`Times drained: ${drainCount}`);
writeStream.end();
});
writeStream.on("finish", () => {
console.log("\nβ
Copy completed successfully!");
});
Example output:
Starting monitored file copy...
π Read 10 chunks (655360 bytes)
π Read 20 chunks (1310720 bytes)
βΈοΈ Pause #1 at 1310720 bytes
Buffer size: 65536 bytes
β
Drain #1 - Buffer cleared
Buffer size: 0 bytes
π Read 30 chunks (1966080 bytes)
βΈοΈ Pause #2 at 1966080 bytes
Buffer size: 65536 bytes
β
Drain #2 - Buffer cleared
Buffer size: 0 bytes
=== Final Statistics ===
Total bytes: 5242880
Total chunks: 80
Times paused: 15
Times drained: 15
β
Copy completed successfully!
Notice how the pauses and drains are balanced? That's proper backpressure at work!
Step 3: Adding Error Handlingβ
A production-ready implementation must handle errors gracefully:
import fs from "fs";
import path from "path";
function copyFileWithBackpressure(
source: string,
destination: string
): Promise<{ bytesRead: number; pauseCount: number }> {
return new Promise((resolve, reject) => {
// Validate inputs
if (!source || !destination) {
return reject(new Error("Source and destination paths required"));
}
// Check if source exists
if (!fs.existsSync(source)) {
return reject(new Error(`Source file not found: ${source}`));
}
// Create streams
const readStream = fs.createReadStream(source);
const writeStream = fs.createWriteStream(destination);
// Track progress
let bytesRead = 0;
let pauseCount = 0;
let hasError = false;
// Cleanup function
const cleanup = () => {
readStream.destroy();
writeStream.destroy();
};
// Handle data with backpressure
readStream.on("data", (chunk: Buffer) => {
bytesRead += chunk.length;
const canContinue = writeStream.write(chunk);
if (!canContinue) {
pauseCount++;
readStream.pause();
}
});
// Resume on drain
writeStream.on("drain", () => {
if (!hasError) {
readStream.resume();
}
});
// Complete successfully
readStream.on("end", () => {
if (!hasError) {
writeStream.end();
}
});
writeStream.on("finish", () => {
if (!hasError) {
resolve({ bytesRead, pauseCount });
}
});
// Error handling - READ STREAM
readStream.on("error", (err) => {
hasError = true;
console.error("β Read error:", err.message);
cleanup();
reject(new Error(`Failed to read ${source}: ${err.message}`));
});
// Error handling - WRITE STREAM
writeStream.on("error", (err) => {
hasError = true;
console.error("β Write error:", err.message);
cleanup();
reject(new Error(`Failed to write ${destination}: ${err.message}`));
});
});
}
// Usage example with error handling
async function main() {
try {
console.log("Starting file copy...");
const stats = await copyFileWithBackpressure(
"large-input.txt",
"output.txt"
);
console.log("β
Success!");
console.log(` Copied ${stats.bytesRead} bytes`);
console.log(` Paused ${stats.pauseCount} times`);
} catch (error) {
console.error("β Copy failed:", error.message);
process.exit(1);
}
}
main();
Why this implementation is production-ready:
- Input validation - Checks for valid paths and file existence
- Error handling - Catches errors from both streams
- Cleanup - Destroys streams on error to prevent leaks
- Error flag - Prevents operations after error occurs
- Promise-based - Easy to use with async/await
- Statistics - Returns useful information about the operation
- Clear error messages - Helps with debugging
Step 4: Adding Timeout Protectionβ
Sometimes streams can hang. Let's add timeout protection:
import fs from "fs";
interface CopyOptions {
timeout?: number; // Milliseconds
highWaterMark?: number; // Buffer size
}
function copyFileWithTimeout(
source: string,
destination: string,
options: CopyOptions = {}
): Promise<{ bytesRead: number; duration: number }> {
const { timeout = 30000, highWaterMark = 64 * 1024 } = options;
return new Promise((resolve, reject) => {
const startTime = Date.now();
let bytesRead = 0;
let timeoutId: NodeJS.Timeout;
let hasCompleted = false;
// Set timeout
timeoutId = setTimeout(() => {
if (!hasCompleted) {
hasCompleted = true;
cleanup();
reject(new Error(`Operation timed out after ${timeout}ms`));
}
}, timeout);
// Create streams with custom buffer size
const readStream = fs.createReadStream(source, { highWaterMark });
const writeStream = fs.createWriteStream(destination, { highWaterMark });
const cleanup = () => {
clearTimeout(timeoutId);
readStream.destroy();
writeStream.destroy();
};
// Backpressure handling
readStream.on("data", (chunk: Buffer) => {
bytesRead += chunk.length;
const canContinue = writeStream.write(chunk);
if (!canContinue) {
readStream.pause();
}
});
writeStream.on("drain", () => {
if (!hasCompleted) {
readStream.resume();
}
});
// Success path
readStream.on("end", () => {
if (!hasCompleted) {
writeStream.end();
}
});
writeStream.on("finish", () => {
if (!hasCompleted) {
hasCompleted = true;
clearTimeout(timeoutId);
const duration = Date.now() - startTime;
resolve({ bytesRead, duration });
}
});
// Error handling
const handleError = (err: Error, context: string) => {
if (!hasCompleted) {
hasCompleted = true;
cleanup();
reject(new Error(`${context}: ${err.message}`));
}
};
readStream.on("error", (err) => handleError(err, "Read error"));
writeStream.on("error", (err) => handleError(err, "Write error"));
});
}
// Usage
async function main() {
try {
const result = await copyFileWithTimeout("large-file.txt", "output.txt", {
timeout: 60000, // 60 seconds
highWaterMark: 128 * 1024, // 128 KB buffer
});
console.log(`β
Copied ${result.bytesRead} bytes in ${result.duration}ms`);
console.log(
` Speed: ${(result.bytesRead / result.duration / 1024).toFixed(2)} MB/s`
);
} catch (error) {
console.error("β Failed:", error.message);
}
}
main();
Monitoring Buffer State in Real-Timeβ
Understanding what's happening inside the buffer helps you debug issues and optimize performance. Let's explore monitoring techniques.
The writableLength Propertyβ
Every writable stream has a writableLength property that tells you how much data is currently buffered:
import fs from "fs";
const writeStream = fs.createWriteStream("output.txt", {
highWaterMark: 16384, // 16 KB
});
console.log("Initial buffer:", writeStream.writableLength); // 0
// Write some data
writeStream.write("Hello"); // 5 bytes
console.log('After "Hello":', writeStream.writableLength); // 5
writeStream.write(" World"); // 6 bytes
console.log('After " World":', writeStream.writableLength); // 11
// Write a large chunk
const largeData = Buffer.alloc(20000); // 20 KB
const canContinue = writeStream.write(largeData);
console.log("After large write:", writeStream.writableLength); // 20011
console.log("Can continue?", canContinue); // false (exceeded highWaterMark)
console.log("highWaterMark:", writeStream.writableHighWaterMark); // 16384
writeStream.end();
Building a Real-Time Monitorβ
Let's create a visual monitor that shows buffer state during copying:
import fs from "fs";
function createBufferMonitor(writeStream: fs.WriteStream) {
const highWaterMark = writeStream.writableHighWaterMark;
return {
check: () => {
const current = writeStream.writableLength;
const percentage = (current / highWaterMark) * 100;
const bars = Math.floor(percentage / 5); // 20 bars total
const visual = "β".repeat(bars) + "β".repeat(20 - bars);
return {
bytes: current,
percentage: percentage.toFixed(1),
visual: `[${visual}] ${percentage.toFixed(1)}%`,
isFull: current >= highWaterMark,
};
},
log: () => {
const state = monitor.check();
console.log(
`Buffer: ${state.visual} (${state.bytes}/${highWaterMark} bytes)`
);
return state;
},
};
}
// Usage example
const readStream = fs.createReadStream("large-file.txt");
const writeStream = fs.createWriteStream("output.txt", {
highWaterMark: 65536, // 64 KB
});
const monitor = createBufferMonitor(writeStream);
let chunkCount = 0;
readStream.on("data", (chunk: Buffer) => {
chunkCount++;
const canContinue = writeStream.write(chunk);
// Log buffer state every 5 chunks
if (chunkCount % 5 === 0) {
const state = monitor.log();
if (!canContinue) {
console.log("β οΈ Buffer full! Pausing...\n");
readStream.pause();
}
}
if (!canContinue && chunkCount % 5 !== 0) {
readStream.pause();
}
});
writeStream.on("drain", () => {
console.log("β
Drain event:");
monitor.log();
console.log("");
readStream.resume();
});
readStream.on("end", () => {
writeStream.end();
});
Example output:
Buffer: [ββββββββββββββββββββ] 20.5% (13421/65536 bytes)
Buffer: [ββββββββββββββββββββ] 40.2% (26344/65536 bytes)
Buffer: [ββββββββββββββββββββ] 60.8% (39856/65536 bytes)
Buffer: [ββββββββββββββββββββ] 81.3% (53293/65536 bytes)
Buffer: [ββββββββββββββββββββ] 100.0% (65536/65536 bytes)
β οΈ Buffer full! Pausing...
β
Drain event:
Buffer: [ββββββββββββββββββββ] 0.0% (0/65536 bytes)
Buffer: [ββββββββββββββββββββ] 19.2% (12583/65536 bytes)
This visualization makes it crystal clear when backpressure kicks in!
Performance Comparison: Buffer Size Impactβ
Different buffer sizes affect both performance and memory usage. Let's compare:
import fs from "fs";
async function testBufferSize(
filePath: string,
highWaterMark: number
): Promise<{ duration: number; pauseCount: number }> {
return new Promise((resolve) => {
const startTime = Date.now();
let pauseCount = 0;
const readStream = fs.createReadStream(filePath, { highWaterMark });
const writeStream = fs.createWriteStream("/dev/null", { highWaterMark });
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
pauseCount++;
readStream.pause();
}
});
writeStream.on("drain", () => {
readStream.resume();
});
readStream.on("end", () => {
writeStream.end();
});
writeStream.on("finish", () => {
const duration = Date.now() - startTime;
resolve({ duration, pauseCount });
});
});
}
// Test different buffer sizes
async function runComparison() {
const testFile = "large-test-file.txt"; // 100 MB file
const bufferSizes = [
{ size: 16 * 1024, label: "16 KB (default)" },
{ size: 64 * 1024, label: "64 KB" },
{ size: 256 * 1024, label: "256 KB" },
{ size: 1024 * 1024, label: "1 MB" },
];
console.log("Testing different buffer sizes with 100 MB file\n");
for (const { size, label } of bufferSizes) {
const result = await testBufferSize(testFile, size);
console.log(`${label}:`);
console.log(` Duration: ${result.duration}ms`);
console.log(` Pauses: ${result.pauseCount}`);
console.log(` Memory: ~${(size / 1024).toFixed(0)} KB\n`);
}
}
runComparison();
Typical results:
Testing different buffer sizes with 100 MB file
16 KB (default):
Duration: 1523ms
Pauses: 3244
Memory: ~16 KB
64 KB:
Duration: 1102ms
Pauses: 811
Memory: ~64 KB
256 KB:
Duration: 945ms
Pauses: 203
Memory: ~256 KB
1 MB:
Duration: 891ms
Pauses: 51
Memory: ~1024 KB
Key insights:
- π Larger buffers = fewer pauses but more memory usage
- β‘ Larger buffers = faster processing due to fewer pause/resume cycles
- πΎ Smaller buffers = more stable memory but more overhead
- π― Sweet spot: Usually 64-256 KB for most use cases
Memory Usage Comparisonβ
Let's see the dramatic difference backpressure makes in real memory usage.
Measuring Memory Without Backpressureβ
First, let's see what happens when we ignore backpressure:
import fs from "fs";
function copyWithoutBackpressure(
source: string,
dest: string
): Promise<number[]> {
return new Promise((resolve, reject) => {
const memorySnapshots: number[] = [];
const readStream = fs.createReadStream(source);
const writeStream = fs.createWriteStream(dest);
// Take memory snapshots every 100ms
const interval = setInterval(() => {
const usedMB = process.memoryUsage().heapUsed / 1024 / 1024;
memorySnapshots.push(Math.round(usedMB));
}, 100);
// β NO backpressure handling
readStream.on("data", (chunk) => {
writeStream.write(chunk); // Ignore return value!
});
readStream.on("end", () => {
writeStream.end();
});
writeStream.on("finish", () => {
clearInterval(interval);
resolve(memorySnapshots);
});
readStream.on("error", reject);
writeStream.on("error", reject);
});
}
// Test it
async function testMemoryUsage() {
console.log("Testing WITHOUT backpressure handling...\n");
const snapshots = await copyWithoutBackpressure(
"large-file.txt",
"output1.txt"
);
console.log("Memory usage over time (MB):");
console.log(snapshots.join(", "));
console.log(`\nPeak memory: ${Math.max(...snapshots)} MB`);
console.log(
`Average memory: ${Math.round(
snapshots.reduce((a, b) => a + b) / snapshots.length
)} MB`
);
}
testMemoryUsage();
Example output (processing 500 MB file):
Testing WITHOUT backpressure handling...
Memory usage over time (MB):
52, 98, 187, 301, 445, 612, 789, 945, 1123, 1287, 1401
β¬οΈ Keeps climbing!
Peak memory: 1401 MB
Average memory: 749 MB
Measuring Memory With Backpressureβ
Now let's see the difference with proper backpressure handling:
import fs from "fs";
function copyWithBackpressure(source: string, dest: string): Promise<number[]> {
return new Promise((resolve, reject) => {
const memorySnapshots: number[] = [];
const readStream = fs.createReadStream(source);
const writeStream = fs.createWriteStream(dest);
// Take memory snapshots every 100ms
const interval = setInterval(() => {
const usedMB = process.memoryUsage().heapUsed / 1024 / 1024;
memorySnapshots.push(Math.round(usedMB));
}, 100);
// β
WITH backpressure handling
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
readStream.pause(); // Pause when buffer is full
}
});
writeStream.on("drain", () => {
readStream.resume(); // Resume when buffer clears
});
readStream.on("end", () => {
writeStream.end();
});
writeStream.on("finish", () => {
clearInterval(interval);
resolve(memorySnapshots);
});
readStream.on("error", reject);
writeStream.on("error", reject);
});
}
// Test it
async function testMemoryUsage() {
console.log("Testing WITH backpressure handling...\n");
const snapshots = await copyWithBackpressure("large-file.txt", "output2.txt");
console.log("Memory usage over time (MB):");
console.log(snapshots.join(", "));
console.log(`\nPeak memory: ${Math.max(...snapshots)} MB`);
console.log(
`Average memory: ${Math.round(
snapshots.reduce((a, b) => a + b) / snapshots.length
)} MB`
);
}
testMemoryUsage();
Example output (same 500 MB file):
Testing WITH backpressure handling...
Memory usage over time (MB):
52, 58, 61, 59, 62, 58, 61, 59, 60, 58, 61
β¬οΈ Stays stable!
Peak memory: 62 MB
Average memory: 59 MB
Side-by-Side Visual Comparisonβ
Memory Usage: WITHOUT Backpressure (500 MB file)
βββββββββββββββββββββββββββββββββββββββββ
1400 MB | β
1200 MB | βββββ
1000 MB | ββββββββ
800 MB | βββββββββββ
600 MB | ββββββββββββ
400 MB | βββββββββββ
200 MB | ββββββββ
0 MB |ββββββ
βββββββββββββββββββββββββββββββββββββ
β οΈ Memory grows uncontrollably!
π΄ High crash risk
Memory Usage: WITH Backpressure (500 MB file)
βββββββββββββββββββββββββββββββββββββββββ
1400 MB |
1200 MB |
1000 MB |
800 MB |
600 MB |
400 MB |
200 MB |
60 MB |ββββββββββββββββββββββββββββββββ
0 MB |
βββββββββββββββββββββββββββββββββββββ
β
Memory stays stable!
π’ Production-safe
The difference is dramatic:
- Without backpressure: 1401 MB peak (π₯ crash risk!)
- With backpressure: 62 MB peak (β safe!)
- Memory savings: 95.6%
Common Patterns and Real-World Scenariosβ
Let's explore practical patterns you'll use in production applications.
Pattern 1: Processing Multiple Files Sequentiallyβ
When you need to process multiple files, doing them one at a time with backpressure:
import fs from "fs";
import path from "path";
async function processFileWithBackpressure(
inputFile: string,
outputFile: string
): Promise<void> {
return new Promise((resolve, reject) => {
const readStream = fs.createReadStream(inputFile);
const writeStream = fs.createWriteStream(outputFile);
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) readStream.pause();
});
writeStream.on("drain", () => readStream.resume());
readStream.on("end", () => writeStream.end());
writeStream.on("finish", () => resolve());
readStream.on("error", reject);
writeStream.on("error", reject);
});
}
async function processMultipleFiles(files: string[], outputDir: string) {
console.log(`Processing ${files.length} files sequentially...\n`);
for (let i = 0; i < files.length; i++) {
const inputFile = files[i];
const outputFile = path.join(
outputDir,
`processed-${path.basename(inputFile)}`
);
console.log(`[${i + 1}/${files.length}] Processing: ${inputFile}`);
try {
await processFileWithBackpressure(inputFile, outputFile);
console.log(`β
Completed: ${outputFile}\n`);
} catch (error) {
console.error(`β Failed: ${error.message}\n`);
}
}
console.log("All files processed!");
}
// Usage
const files = ["file1.txt", "file2.txt", "file3.txt"];
processMultipleFiles(files, "output/");
Pattern 2: Processing with Transformationβ
Transforming data while respecting backpressure:
import fs from "fs";
function processWithTransform(
inputFile: string,
outputFile: string,
transform: (chunk: Buffer) => Buffer
): Promise<void> {
return new Promise((resolve, reject) => {
const readStream = fs.createReadStream(inputFile);
const writeStream = fs.createWriteStream(outputFile);
readStream.on("data", (chunk: Buffer) => {
// Transform the data
const transformed = transform(chunk);
// Write with backpressure check
const canContinue = writeStream.write(transformed);
if (!canContinue) {
readStream.pause();
}
});
writeStream.on("drain", () => {
readStream.resume();
});
readStream.on("end", () => writeStream.end());
writeStream.on("finish", () => resolve());
readStream.on("error", reject);
writeStream.on("error", reject);
});
}
// Example: Convert text to uppercase
async function convertToUppercase(input: string, output: string) {
await processWithTransform(input, output, (chunk) => {
const text = chunk.toString("utf8");
const uppercase = text.toUpperCase();
return Buffer.from(uppercase, "utf8");
});
console.log("β
Conversion complete!");
}
convertToUppercase("input.txt", "output-uppercase.txt");
Pattern 3: Rate-Limited Processingβ
Sometimes you need to limit processing speed (e.g., API rate limits):
import fs from "fs";
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function processWithRateLimit(
inputFile: string,
outputFile: string,
bytesPerSecond: number
): Promise<void> {
return new Promise((resolve, reject) => {
const readStream = fs.createReadStream(inputFile, {
highWaterMark: 64 * 1024, // 64 KB chunks
});
const writeStream = fs.createWriteStream(outputFile);
let bytesThisSecond = 0;
let lastReset = Date.now();
readStream.on("data", async (chunk: Buffer) => {
// Check if we've exceeded rate limit this second
const now = Date.now();
if (now - lastReset >= 1000) {
// Reset counter every second
bytesThisSecond = 0;
lastReset = now;
}
// If we've hit the limit, wait
if (bytesThisSecond + chunk.length > bytesPerSecond) {
const waitTime = 1000 - (now - lastReset);
console.log(`β³ Rate limit reached, waiting ${waitTime}ms...`);
await sleep(waitTime);
bytesThisSecond = 0;
lastReset = Date.now();
}
// Write with backpressure
const canContinue = writeStream.write(chunk);
bytesThisSecond += chunk.length;
if (!canContinue) {
readStream.pause();
}
});
writeStream.on("drain", () => {
readStream.resume();
});
readStream.on("end", () => writeStream.end());
writeStream.on("finish", () => resolve());
readStream.on("error", reject);
writeStream.on("error", reject);
});
}
// Process at 1 MB/second
processWithRateLimit("input.txt", "output.txt", 1024 * 1024).then(() =>
console.log("β
Complete")
);
Debugging Backpressure Issuesβ
Let's learn how to identify and fix common backpressure problems.
Debug Tool: Comprehensive Stream Loggerβ
Create a logging utility to track everything:
import fs from "fs";
interface StreamLogger {
log: (event: string, data?: any) => void;
getReport: () => string;
}
function createStreamLogger(name: string): StreamLogger {
const events: Array<{ time: number; event: string; data?: any }> = [];
const startTime = Date.now();
return {
log(event: string, data?: any) {
const elapsed = Date.now() - startTime;
events.push({ time: elapsed, event, data });
console.log(`[${elapsed}ms] ${name}: ${event}`, data || "");
},
getReport() {
let report = `\n=== Stream Report: ${name} ===\n`;
report += `Total events: ${events.length}\n`;
report += `Duration: ${Date.now() - startTime}ms\n\n`;
report += "Event Timeline:\n";
events.forEach(({ time, event, data }) => {
report += ` ${time}ms: ${event}`;
if (data) report += ` - ${JSON.stringify(data)}`;
report += "\n";
});
return report;
},
};
}
// Usage
function copyWithLogging(source: string, dest: string) {
const readLogger = createStreamLogger("ReadStream");
const writeLogger = createStreamLogger("WriteStream");
const readStream = fs.createReadStream(source);
const writeStream = fs.createWriteStream(dest);
// Log all events
readStream.on("data", (chunk) => {
readLogger.log("data", { bytes: chunk.length });
const canContinue = writeStream.write(chunk);
writeLogger.log("write", {
bytes: chunk.length,
canContinue,
bufferSize: writeStream.writableLength,
});
if (!canContinue) {
readLogger.log("pause");
readStream.pause();
}
});
writeStream.on("drain", () => {
writeLogger.log("drain", { bufferSize: writeStream.writableLength });
readLogger.log("resume");
readStream.resume();
});
readStream.on("end", () => {
readLogger.log("end");
writeStream.end();
});
writeStream.on("finish", () => {
writeLogger.log("finish");
// Print reports
console.log(readLogger.getReport());
console.log(writeLogger.getReport());
});
}
copyWithLogging("test.txt", "output.txt");
Example debug output:
[0ms] ReadStream: data { bytes: 65536 }
[1ms] WriteStream: write { bytes: 65536, canContinue: true, bufferSize: 65536 }
[15ms] ReadStream: data { bytes: 65536 }
[16ms] WriteStream: write { bytes: 65536, canContinue: false, bufferSize: 131072 }
[16ms] ReadStream: pause
[45ms] WriteStream: drain { bufferSize: 0 }
[45ms] ReadStream: resume
[50ms] ReadStream: data { bytes: 65536 }
...
=== Stream Report: ReadStream ===
Total events: 24
Duration: 523ms
Event Timeline:
0ms: data - {"bytes":65536}
15ms: data - {"bytes":65536}
16ms: pause
45ms: resume
...
Identifying Stalled Streamsβ
Sometimes streams get stuck. Here's how to detect it:
function copyWithStallDetection(
source: string,
dest: string,
stallTimeout: number = 5000
): Promise<void> {
return new Promise((resolve, reject) => {
const readStream = fs.createReadStream(source);
const writeStream = fs.createWriteStream(dest);
let lastActivity = Date.now();
let isStalled = false;
// Check for stalls every second
const stallChecker = setInterval(() => {
const timeSinceActivity = Date.now() - lastActivity;
if (timeSinceActivity > stallTimeout && !isStalled) {
isStalled = true;
console.error(
`β οΈ Stream stalled! No activity for ${timeSinceActivity}ms`
);
console.error(` Read stream paused: ${readStream.isPaused()}`);
console.error(` Write buffer size: ${writeStream.writableLength}`);
clearInterval(stallChecker);
readStream.destroy();
writeStream.destroy();
reject(new Error("Stream stalled"));
}
}, 1000);
// Update activity timestamp on any event
const updateActivity = () => {
lastActivity = Date.now();
};
readStream.on("data", (chunk) => {
updateActivity();
const canContinue = writeStream.write(chunk);
if (!canContinue) readStream.pause();
});
writeStream.on("drain", () => {
updateActivity();
readStream.resume();
});
readStream.on("end", () => {
updateActivity();
writeStream.end();
});
writeStream.on("finish", () => {
clearInterval(stallChecker);
resolve();
});
readStream.on("error", (err) => {
clearInterval(stallChecker);
reject(err);
});
writeStream.on("error", (err) => {
clearInterval(stallChecker);
reject(err);
});
});
}
// Usage
copyWithStallDetection("input.txt", "output.txt", 5000)
.then(() => console.log("β
Success"))
.catch((err) => console.error("β Failed:", err.message));
Common Misconceptionsβ
β Misconception: "I should always use the largest possible buffer"β
Reality: Larger buffers use more memory and can cause issues with many concurrent streams.
Why this matters: If you process 100 files concurrently, each with a 10 MB buffer, that's 1 GB of memory just for buffers!
Example:
// β Bad: Huge buffer for every stream
const writeStream = fs.createWriteStream("output.txt", {
highWaterMark: 10 * 1024 * 1024, // 10 MB!
});
// If processing 100 files:
// 100 files Γ 10 MB = 1 GB of buffer memory!
// β
Good: Reasonable buffer size
const writeStream = fs.createWriteStream("output.txt", {
highWaterMark: 64 * 1024, // 64 KB
});
// If processing 100 files:
// 100 files Γ 64 KB = 6.4 MB of buffer memory
β Misconception: "Backpressure only matters for huge files"β
Reality: Even small files can cause issues if you process many concurrently or the destination is slow.
Why this matters: Network streams, slow disks, or API rate limits can make even small writes slow.
Example:
// Scenario: Uploading 1000 small files to a slow API
for (let i = 0; i < 1000; i++) {
const readStream = fs.createReadStream(`file${i}.txt`); // Each file: 100 KB
readStream.on("data", (chunk) => {
// β Slow API takes 100ms per chunk
// Without backpressure, all 1000 files load into memory at once!
slowAPI.write(chunk); // Ignore backpressure
});
}
// Result: 1000 Γ 100 KB = 100 MB in memory!
// With backpressure: Only active chunks in memory
β Misconception: "pipe() and manual backpressure are the same"β
Reality: pipe() handles backpressure automatically, but gives you less control.
Why this matters: Sometimes you need custom logic during streaming.
Example:
// Using pipe() - automatic backpressure
readStream.pipe(writeStream);
// β
Backpressure handled automatically
// β Can't transform or monitor data easily
// Manual backpressure - more control
readStream.on("data", (chunk) => {
// β
Can transform data
const processed = processChunk(chunk);
// β
Can monitor progress
console.log(`Processed ${chunk.length} bytes`);
// β
Still handle backpressure
const canContinue = writeStream.write(processed);
if (!canContinue) readStream.pause();
});
writeStream.on("drain", () => readStream.resume());
Troubleshooting Common Issuesβ
Problem: Stream completes but output file is incompleteβ
Symptoms:
- No errors thrown
- Output file is smaller than input
- Some data is missing
Common Causes:
- Called
end()before all data was written (80% of cases) - Drain event handler not waiting properly
- Stream destroyed prematurely
Diagnostic Steps:
let bytesRead = 0;
let bytesWritten = 0;
readStream.on("data", (chunk) => {
bytesRead += chunk.length;
console.log(`Read total: ${bytesRead}`);
writeStream.write(chunk, () => {
bytesWritten += chunk.length;
console.log(`Written total: ${bytesWritten}`);
});
});
readStream.on("end", () => {
console.log(`End: Read ${bytesRead}, Written ${bytesWritten}`);
// Are they equal?
});
Solution:
// β
Correct: Wait for 'end' before calling end()
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) readStream.pause();
});
writeStream.on("drain", () => readStream.resume());
// Important: Only end write stream AFTER reading is complete
readStream.on("end", () => {
console.log("All data read, now ending write stream");
writeStream.end(); // Call this AFTER 'end' event
});
// Verify completion
writeStream.on("finish", () => {
console.log("All data written successfully");
});
Problem: Memory still grows even with backpressure handlingβ
Symptoms:
- Implemented pause/resume correctly
- Memory usage still increases
- Eventually runs out of memory
Common Causes:
- Processing multiple streams without limiting concurrency
- Storing references to chunks
- Event listeners accumulating
Diagnostic Steps:
// Check if you're storing chunk references
const chunks: Buffer[] = []; // β This accumulates memory!
readStream.on("data", (chunk) => {
chunks.push(chunk); // Don't do this!
writeStream.write(chunk);
});
// Check event listener count
console.log("Drain listeners:", writeStream.listenerCount("drain"));
// If this number keeps growing, you're leaking listeners!
Solution:
// β
Don't store chunk references
readStream.on("data", (chunk) => {
// Process immediately, don't store
const canContinue = writeStream.write(chunk);
if (!canContinue) readStream.pause();
// chunk will be garbage collected after this
});
// β
Use 'once' to prevent listener accumulation
writeStream.once("drain", function handleDrain() {
readStream.resume();
// Re-register for next drain if still reading
if (!readStream.destroyed) {
writeStream.once("drain", handleDrain);
}
});
// β
Limit concurrent streams
async function processFilesWithLimit(files: string[], concurrency: number) {
const queue = [...files];
const active = new Set();
while (queue.length > 0 || active.size > 0) {
while (active.size < concurrency && queue.length > 0) {
const file = queue.shift()!;
const promise = processFile(file).finally(() => active.delete(promise));
active.add(promise);
}
await Promise.race(active);
}
}
Problem: Pause/resume causes stream to never completeβ
Symptoms:
- Stream pauses but never resumes
- Process doesn't exit
- No errors, just hangs
Common Causes:
- Drain event never fires
- Forgot to register drain listener
- Registered drain listener too late
Diagnostic Steps:
// Add comprehensive logging
readStream.on("data", (chunk) => {
console.log("Data received, isPaused:", readStream.isPaused());
const canContinue = writeStream.write(chunk);
console.log("Write returned:", canContinue);
if (!canContinue) {
console.log("Pausing read stream");
readStream.pause();
console.log("Is paused now?", readStream.isPaused());
}
});
writeStream.on("drain", () => {
console.log("Drain event fired!");
console.log("Resuming, isPaused:", readStream.isPaused());
readStream.resume();
console.log("Resumed, isPaused:", readStream.isPaused());
});
// If you see "Pausing" but never see "Drain event",
// the drain listener isn't registered or working
Solution:
const readStream = fs.createReadStream(source);
const writeStream = fs.createWriteStream(dest);
// β
Register drain listener BEFORE any writes
writeStream.on("drain", () => {
console.log("Draining, resuming read");
readStream.resume();
});
// Now start reading
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
readStream.pause();
}
});
Check Your Understandingβ
Question 1: When does the drain event fire?β
Show Answer
Answer: The drain event fires when the write buffer was full (write() returned false) and has now been processed enough to go below the highWaterMark threshold.
Explanation: The drain event is Node.js telling you: "Remember when I said I was full? Well, I've processed enough data now and there's space available again. You can safely write more."
Important: The drain event only fires if write() previously returned false. If the buffer never fills up, you'll never see a drain event (and that's normal!).
Example:
writeStream.write("data1"); // returns true
writeStream.write("data2"); // returns true
writeStream.write("data3"); // returns false - buffer full!
// Stream processes data...
// Drain event fires when buffer clears
Question 2: What's wrong with this code?β
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
readStream.pause();
// Wait for drain then resume
writeStream.on("drain", () => {
readStream.resume();
});
}
});
Show Answer
Answer: The drain listener is registered inside the data handler, which means a new listener gets added every time backpressure occurs. After 10 pauses, you have 10 drain listeners all calling resume()!
Why this is bad:
- Memory leak from accumulating listeners
- resume() gets called multiple times per drain
- Can cause unpredictable behavior
Fixed version:
// β
Register drain listener ONCE, outside data handler
writeStream.on("drain", () => {
readStream.resume();
});
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
readStream.pause();
// Don't register listener here!
}
});
Question 3: Why might you choose a smaller highWaterMark?β
Show Answer
Answer: Smaller buffers use less memory, which is important when:
- Processing many streams concurrently
- Running in memory-constrained environments
- You want more responsive backpressure (pause sooner)
Trade-offs:
Smaller buffer (e.g., 16 KB):
- β Lower memory usage
- β More responsive to slow consumers
- β More pause/resume cycles (slight overhead)
- β Slightly slower overall
Larger buffer (e.g., 256 KB):
- β Fewer pause/resume cycles
- β Slightly faster overall
- β Higher memory usage
- β Less responsive to slow consumers
Example:
// Processing 100 files concurrently
// Small buffers: 100 Γ 16 KB = 1.6 MB total
const stream1 = fs.createWriteStream("out.txt", { highWaterMark: 16 * 1024 });
// Large buffers: 100 Γ 1 MB = 100 MB total!
const stream2 = fs.createWriteStream("out.txt", { highWaterMark: 1024 * 1024 });
Hands-On Challengeβ
Challenge: Build a file copy utility with progress reporting, rate limiting, and proper backpressure handling.
Requirements:
- Copy file with backpressure
- Report progress every 5%
- Limit speed to specific MB/s
- Handle errors gracefully
- Show final statistics
Starter Code:
import fs from "fs";
async function copyWithProgress(
source: string,
dest: string,
maxSpeedMBps: number
) {
// Your implementation here
}
// Test it
copyWithProgress("large-file.txt", "output.txt", 5); // 5 MB/s limit
Show Solution
import fs from "fs";
interface CopyStats {
totalBytes: number;
duration: number;
pauseCount: number;
avgSpeed: number;
}
async function copyWithProgress(
source: string,
dest: string,
maxSpeedMBps: number
): Promise<CopyStats> {
return new Promise((resolve, reject) => {
// Get file size for progress calculation
const fileSize = fs.statSync(source).size;
const maxBytesPerSecond = maxSpeedMBps * 1024 * 1024;
const readStream = fs.createReadStream(source);
const writeStream = fs.createWriteStream(dest);
// Statistics
const startTime = Date.now();
let bytesProcessed = 0;
let pauseCount = 0;
let lastReportedProgress = 0;
// Rate limiting
let bytesThisSecond = 0;
let secondStart = Date.now();
// Helper: Calculate and report progress
const reportProgress = () => {
const progress = (bytesProcessed / fileSize) * 100;
const currentSpeed =
bytesProcessed / ((Date.now() - startTime) / 1000) / 1024 / 1024;
console.log(
`Progress: ${progress.toFixed(1)}% ` +
`(${(bytesProcessed / 1024 / 1024).toFixed(2)} MB / ` +
`${(fileSize / 1024 / 1024).toFixed(2)} MB) ` +
`- Speed: ${currentSpeed.toFixed(2)} MB/s`
);
};
// Helper: Rate limiting check
const checkRateLimit = async (chunkSize: number): Promise<void> => {
const now = Date.now();
const elapsedThisSecond = now - secondStart;
// Reset counter every second
if (elapsedThisSecond >= 1000) {
bytesThisSecond = 0;
secondStart = now;
return;
}
// Check if adding this chunk would exceed limit
if (bytesThisSecond + chunkSize > maxBytesPerSecond) {
const waitTime = 1000 - elapsedThisSecond;
await new Promise((resolve) => setTimeout(resolve, waitTime));
bytesThisSecond = 0;
secondStart = Date.now();
}
};
// Handle data with backpressure
readStream.on("data", async (chunk: Buffer) => {
// Rate limiting
await checkRateLimit(chunk.length);
// Write with backpressure check
const canContinue = writeStream.write(chunk);
bytesProcessed += chunk.length;
bytesThisSecond += chunk.length;
// Report progress every 5%
const currentProgress = (bytesProcessed / fileSize) * 100;
if (currentProgress - lastReportedProgress >= 5) {
reportProgress();
lastReportedProgress = currentProgress;
}
// Handle backpressure
if (!canContinue) {
pauseCount++;
readStream.pause();
}
});
// Resume on drain
writeStream.on("drain", () => {
readStream.resume();
});
// Complete
readStream.on("end", () => {
reportProgress(); // Final progress report
writeStream.end();
});
writeStream.on("finish", () => {
const duration = Date.now() - startTime;
const avgSpeed = bytesProcessed / (duration / 1000) / 1024 / 1024;
console.log("\n=== Copy Complete ===");
console.log(`Total: ${(bytesProcessed / 1024 / 1024).toFixed(2)} MB`);
console.log(`Duration: ${(duration / 1000).toFixed(2)}s`);
console.log(`Avg Speed: ${avgSpeed.toFixed(2)} MB/s`);
console.log(`Pause Count: ${pauseCount}`);
resolve({
totalBytes: bytesProcessed,
duration,
pauseCount,
avgSpeed,
});
});
// Error handling
const handleError = (err: Error) => {
readStream.destroy();
writeStream.destroy();
reject(err);
};
readStream.on("error", handleError);
writeStream.on("error", handleError);
});
}
// Test it
async function main() {
try {
console.log("Starting copy with 5 MB/s limit...\n");
const stats = await copyWithProgress(
"large-file.txt",
"output.txt",
5 // 5 MB/s
);
console.log("\nβ
Success!");
} catch (error) {
console.error("\nβ Failed:", error.message);
process.exit(1);
}
}
main();
Example output:
Starting copy with 5 MB/s limit...
Progress: 5.0% (5.24 MB / 104.86 MB) - Speed: 5.01 MB/s
Progress: 10.0% (10.49 MB / 104.86 MB) - Speed: 5.00 MB/s
Progress: 15.0% (15.73 MB / 104.86 MB) - Speed: 5.02 MB/s
...
Progress: 100.0% (104.86 MB / 104.86 MB) - Speed: 5.00 MB/s
=== Copy Complete ===
Total: 104.86 MB
Duration: 20.97s
Avg Speed: 5.00 MB/s
Pause Count: 78
β
Success!
What makes this solution production-ready:
- Progress reporting - User sees what's happening
- Rate limiting - Respects bandwidth constraints
- Backpressure - Prevents memory overflow
- Error handling - Graceful failure
- Statistics - Provides useful metrics
- Clean async/await - Easy to use
- Resource cleanup - Destroys streams on error
Summary: Key Takeawaysβ
Let's recap what we've mastered about practical backpressure handling:
Core Implementation Pattern:
- π― Check the return value of every
write()call - π― Call
pause()whenwrite()returnsfalse - π― Register
drainevent listener to know when to resume - π― Call
resume()in the drain handler - π― Always call
end()after reading completes
The Complete Pattern:
readStream.on("data", (chunk) => {
if (!writeStream.write(chunk)) {
readStream.pause();
}
});
writeStream.on("drain", () => readStream.resume());
readStream.on("end", () => writeStream.end());
Critical Best Practices:
- β Register drain listener before starting to write
- β
Use
once()for single-use handlers to prevent accumulation - β Always handle errors on both streams
- β Clean up resources (destroy streams) on errors
- β
Monitor buffer state with
writableLengthfor debugging
Performance Insights:
- π Proper backpressure keeps memory stable (typically 50-100 MB vs 1+ GB without)
- β‘ Larger buffers = faster but more memory
- πΎ Smaller buffers = more stable memory but slight overhead
- π― Sweet spot: 64-256 KB for most use cases
Common Pitfalls to Avoid:
- β Don't register drain listeners inside the data handler
- β Don't store references to chunks
- β Don't call
end()before reading completes - β Don't ignore error events
- β Don't process many large files concurrently without limiting
Why This Matters: With proper backpressure handling:
- β Memory usage stays stable regardless of file size
- β Applications don't crash in production
- β Performance is predictable and reliable
- β You can process files larger than available memory
Practice exercises:
- Build a file compression utility with progress reporting
- Create a network proxy with bandwidth limiting
- Implement a log file processor with rotation
Version Informationβ
Tested with:
- Node.js: v18.x, v20.x, v22.x
- TypeScript: v5.x
- File System: Works identically on Windows, macOS, and Linux
Compatibility Notes:
- Backpressure APIs are stable since Node.js v10
- All patterns work with both CommonJS and ES Modules
- Stream behavior is consistent across all platforms
Future-Proof:
- These patterns are fundamental to Node.js streams
- No breaking changes expected in future versions
- New Node.js versions may add features but won't break these patterns
Additional Resourcesβ
Official Documentation:
Community Resources:
Debugging Tools:
- Node.js Inspector
- Clinic.js - Performance profiling
- 0x - Flamegraph profiler