Streams: Node.js

Harsh Mishra - Oct 7 - - Dev Community

Complete Guide to Streams in Node.js

Streams in Node.js are a powerful way to handle reading and writing data in a continuous manner. They allow you to process data efficiently, especially when dealing with large amounts of information or I/O operations. This guide will cover the types of streams, how to use them, and practical examples to help you understand how streams work in Node.js.

What Are Streams?

Streams are objects that allow you to read data from a source or write data to a destination in a continuous manner. They are ideal for processing data piece-by-piece rather than reading or writing entire files or buffers at once. This is especially useful when working with large datasets, as it can significantly reduce memory usage.

Key Concepts

  • Flowing Mode: Data flows automatically from the source to the destination.
  • Paused Mode: Data needs to be manually read from the source.
  • Readable Streams: Streams from which you can read data.
  • Writable Streams: Streams to which you can write data.
  • Duplex Streams: Streams that can both read and write data.
  • Transform Streams: Streams that modify or transform the data as it is read or written.

Types of Streams

  1. Readable Streams: These streams allow you to read data. Examples include fs.createReadStream() and http.IncomingMessage.

  2. Writable Streams: These streams allow you to write data. Examples include fs.createWriteStream() and http.ServerResponse.

  3. Duplex Streams: These streams can read and write data. Examples include TCP sockets and net.Duplex.

  4. Transform Streams: These are a type of duplex stream that can modify the data as it is being read or written. Examples include zlib.createGzip() for compression.

Creating Readable Streams

You can create a readable stream using the built-in fs module to read files or using stream.Readable to create custom readable streams.

Example: Reading a File with a Readable Stream

const fs = require('fs');

// Create a readable stream
const readableStream = fs.createReadStream('example.txt', { encoding: 'utf8' });

// Handling the 'data' event
readableStream.on('data', (chunk) => {
    console.log('New chunk received:', chunk);
});

// Handling the 'end' event
readableStream.on('end', () => {
    console.log('No more data to read.');
});
Enter fullscreen mode Exit fullscreen mode

Example: Custom Readable Stream

const { Readable } = require('stream');

class MyReadableStream extends Readable {
    constructor(options) {
        super(options);
        this.current = 0;
    }

    _read(size) {
        if (this.current < 5) {
            this.push(`Chunk ${this.current++}\n`);
        } else {
            this.push(null); // No more data
        }
    }
}

const myReadableStream = new MyReadableStream();
myReadableStream.on('data', (chunk) => {
    console.log('Received:', chunk.toString());
});
Enter fullscreen mode Exit fullscreen mode

Creating Writable Streams

You can create writable streams using the fs module or by extending the stream.Writable class.

Example: Writing to a File with a Writable Stream

const fs = require('fs');

// Create a writable stream
const writableStream = fs.createWriteStream('output.txt');

// Write data to the stream
writableStream.write('Hello, World!\n');
writableStream.write('Writing to a file using streams.\n');

// End the stream
writableStream.end(() => {
    console.log('Finished writing to file.');
});
Enter fullscreen mode Exit fullscreen mode

Example: Custom Writable Stream

const { Writable } = require('stream');

class MyWritableStream extends Writable {
    _write(chunk, encoding, callback) {
        console.log('Writing:', chunk.toString());
        callback(); // Call when done
    }
}

const myWritableStream = new MyWritableStream();
myWritableStream.write('Hello, World!\n');
myWritableStream.write('Writing to custom writable stream.\n');
myWritableStream.end();
Enter fullscreen mode Exit fullscreen mode

Using Duplex Streams

Duplex streams can read and write data simultaneously. A common use case is TCP sockets.

Example: Creating a Duplex Stream

const { Duplex } = require('stream');

class MyDuplexStream extends Duplex {
    _read(size) {
        this.push('Data from duplex stream\n');
        this.push(null); // No more data
    }

    _write(chunk, encoding, callback) {
        console.log('Received:', chunk.toString());
        callback();
    }
}

const myDuplexStream = new MyDuplexStream();
myDuplexStream.on('data', (chunk) => {
    console.log('Reading:', chunk.toString());
});

// Write to the duplex stream
myDuplexStream.write('Hello, Duplex!\n');
myDuplexStream.end();
Enter fullscreen mode Exit fullscreen mode

Using Transform Streams

Transform streams are useful for modifying data as it flows through the stream. For example, you might use a transform stream to compress data.

Example: Creating a Transform Stream

const { Transform } = require('stream');

class MyTransformStream extends Transform {
    _transform(chunk, encoding, callback) {
        const upperChunk = chunk.toString().toUpperCase();
        this.push(upperChunk);
        callback();
    }
}

const myTransformStream = new MyTransformStream();
myTransformStream.on('data', (chunk) => {
    console.log('Transformed:', chunk.toString());
});

// Pipe data through the transform stream
process.stdin.pipe(myTransformStream).pipe(process.stdout);
Enter fullscreen mode Exit fullscreen mode

Piping Streams

One of the powerful features of streams is the ability to pipe them together. Piping allows you to connect a readable stream to a writable stream, which makes it easy to transfer data.

Example: Piping Streams

const fs = require('fs');

// Create a readable stream
const readableStream = fs.createReadStream('input.txt');

// Create a writable stream
const writableStream = fs.createWriteStream('output.txt');

// Pipe the readable stream to the writable stream
readableStream.pipe(writableStream);

writableStream.on('finish', () => {
    console.log('Data has been written to output.txt');
});
Enter fullscreen mode Exit fullscreen mode

Stream Events in Node.js

1. Readable Streams Events

Readable streams emit several important events that help you manage data flow:

  • data: Emitted when a chunk of data is available to be read.
  • end: Emitted when there is no more data to read.
  • error: Emitted when an error occurs during reading.
  • close: Emitted when the stream and any underlying resources (like file descriptors) have been closed.

Example: Readable Stream Events

const fs = require('fs');

const readableStream = fs.createReadStream('example.txt');

readableStream.on('data', (chunk) => {
    console.log('Received chunk:', chunk.toString());
});

readableStream.on('end', () => {
    console.log('No more data to read.');
});

readableStream.on('error', (err) => {
    console.error('Error occurred:', err);
});

readableStream.on('close', () => {
    console.log('Stream closed.');
});
Enter fullscreen mode Exit fullscreen mode

2. Writable Streams Events

Writable streams also emit several events:

  • drain: Emitted when the stream is ready to accept more data after being full.
  • finish: Emitted when all data has been written to the stream and the end() method has been called.
  • error: Emitted when an error occurs during writing.
  • close: Emitted when the stream and any underlying resources have been closed.

Example: Writable Stream Events

const fs = require('fs');

const writableStream = fs.createWriteStream('output.txt');

writableStream.on('finish', () => {
    console.log('All data has been written to output.txt');
});

writableStream.on('error', (err) => {
    console.error('Error occurred:', err);
});

// Writing data
writableStream.write('Hello, World!\n');
writableStream.write('Writing to a file using streams.\n');
writableStream.end(); // Call end to finish the writing process
Enter fullscreen mode Exit fullscreen mode

3. Transform Streams Events

Transform streams inherit events from both readable and writable streams, and they emit:

  • data: Emitted when a transformed chunk is available to read.
  • end: Emitted when there is no more data to transform.
  • error: Emitted when an error occurs during transformation.
  • finish: Emitted when all data has been processed and written.

Example: Transform Stream Events

const { Transform } = require('stream');

class MyTransformStream extends Transform {
    _transform(chunk, encoding, callback) {
        const upperChunk = chunk.toString().toUpperCase();
        this.push(upperChunk);
        callback();
    }
}

const myTransformStream = new MyTransformStream();

myTransformStream.on('data', (chunk) => {
    console.log('Transformed chunk:', chunk.toString());
});

myTransformStream.on('end', () => {
    console.log('No more data to transform.');
});

myTransformStream.on('error', (err) => {
    console.error('Error occurred:', err);
});

// Write data to the transform stream
myTransformStream.write('Hello, World!\n');
myTransformStream.write('Transforming this text.\n');
myTransformStream.end(); // End the stream
Enter fullscreen mode Exit fullscreen mode

Summary of Events

  • Readable Streams: data, end, error, close
  • Writable Streams: drain, finish, error, close
  • Transform Streams: Inherits data, end, error, finish from both readable and writable streams

Conclusion

Streams in Node.js provide a powerful and efficient way to handle data in a continuous manner. They allow you to read and write data piece-by-piece, making them particularly useful for I/O operations and working with large datasets. Understanding how to create and use different types of streams, as well as how to handle events, will help you build more efficient and scalable applications in Node.js.

Whether you're creating readable, writable, duplex, or transform streams, the flexibility of the stream API allows you to handle data processing in a way that best suits your application's needs.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .