Deno logoDeno

§Namespaces

§Classes

  • Duplex

    Duplex streams are streams that implement both the Readable and Writable interfaces.

    Examples of Duplex streams include:

    • TCP sockets
    • zlib streams
    • crypto streams
  • PassThrough

    The stream.PassThrough class is a trivial implementation of a Transform stream that simply passes the input bytes across to the output. Its purpose is primarily for examples and testing, but there are some use cases wherestream.PassThrough is useful as a building block for novel sorts of streams.

  • Readable

  • Stream

  • Transform

    Transform streams are Duplex streams where the output is in some way related to the input. Like all Duplex streams, Transform streams implement both the Readable and Writable interfaces.

    Examples of Transform streams include:

    • zlib streams
    • crypto streams
  • Writable

§Functions

  • _isUint8Array

  • _uint8ArrayToBuffer

  • addAbortSignal

    Attaches an AbortSignal to a readable or writeable stream. This lets code control stream destruction using an AbortController.

    Calling abort on the AbortController corresponding to the passedAbortSignal will behave the same way as calling .destroy(new AbortError())on the stream.

    const fs = require('fs');
    
    const controller = new AbortController();
    const read = addAbortSignal(
      controller.signal,
      fs.createReadStream(('object.json'))
    );
    // Later, abort the operation closing the stream
    controller.abort();
    

    Or using an AbortSignal with a readable stream as an async iterable:

    const controller = new AbortController();
    setTimeout(() => controller.abort(), 10_000); // set a timeout
    const stream = addAbortSignal(
      controller.signal,
      fs.createReadStream(('object.json'))
    );
    (async () => {
      try {
        for await (const chunk of stream) {
          await process(chunk);
        }
      } catch (e) {
        if (e.name === 'AbortError') {
          // The operation was cancelled
        } else {
          throw e;
        }
      }
    })();
    
  • finished

    A function to get notified when a stream is no longer readable, writable or has experienced an error or a premature close event.

    const { finished } = require('stream');
    
    const rs = fs.createReadStream('archive.tar');
    
    finished(rs, (err) => {
      if (err) {
        console.error('Stream failed.', err);
      } else {
        console.log('Stream is done reading.');
      }
    });
    
    rs.resume(); // Drain the stream.
    

    Especially useful in error handling scenarios where a stream is destroyed prematurely (like an aborted HTTP request), and will not emit 'end'or 'finish'.

    The finished API provides promise version:

    const { finished } = require('stream/promises');
    
    const rs = fs.createReadStream('archive.tar');
    
    async function run() {
      await finished(rs);
      console.log('Stream is done reading.');
    }
    
    run().catch(console.error);
    rs.resume(); // Drain the stream.
    

    stream.finished() leaves dangling event listeners (in particular'error', 'end', 'finish' and 'close') after callback has been invoked. The reason for this is so that unexpected 'error' events (due to incorrect stream implementations) do not cause unexpected crashes. If this is unwanted behavior then the returned cleanup function needs to be invoked in the callback:

    const cleanup = finished(rs, (err) => {
      cleanup();
      // ...
    });
    
  • pipeline

    A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.

    const { pipeline } = require('stream');
    const fs = require('fs');
    const zlib = require('zlib');
    
    // Use the pipeline API to easily pipe a series of streams
    // together and get notified when the pipeline is fully done.
    
    // A pipeline to gzip a potentially huge tar file efficiently:
    
    pipeline(
      fs.createReadStream('archive.tar'),
      zlib.createGzip(),
      fs.createWriteStream('archive.tar.gz'),
      (err) => {
        if (err) {
          console.error('Pipeline failed.', err);
        } else {
          console.log('Pipeline succeeded.');
        }
      }
    );
    

    The pipeline API provides a promise version, which can also receive an options argument as the last parameter with asignal AbortSignal property. When the signal is aborted,destroy will be called on the underlying pipeline, with anAbortError.

    const { pipeline } = require('stream/promises');
    
    async function run() {
      await pipeline(
        fs.createReadStream('archive.tar'),
        zlib.createGzip(),
        fs.createWriteStream('archive.tar.gz')
      );
      console.log('Pipeline succeeded.');
    }
    
    run().catch(console.error);
    

    To use an AbortSignal, pass it inside an options object, as the last argument:

    const { pipeline } = require('stream/promises');
    
    async function run() {
      const ac = new AbortController();
      const signal = ac.signal;
    
      setTimeout(() => ac.abort(), 1);
      await pipeline(
        fs.createReadStream('archive.tar'),
        zlib.createGzip(),
        fs.createWriteStream('archive.tar.gz'),
        { signal },
      );
    }
    
    run().catch(console.error); // AbortError
    

    The pipeline API also supports async generators:

    const { pipeline } = require('stream/promises');
    const fs = require('fs');
    
    async function run() {
      await pipeline(
        fs.createReadStream('lowercase.txt'),
        async function* (source, signal) {
          source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
          for await (const chunk of source) {
            yield await processChunk(chunk, { signal });
          }
        },
        fs.createWriteStream('uppercase.txt')
      );
      console.log('Pipeline succeeded.');
    }
    
    run().catch(console.error);
    

    Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.

    const { pipeline } = require('stream/promises');
    const fs = require('fs');
    
    async function run() {
      await pipeline(
        async function * (signal) {
          await someLongRunningfn({ signal });
          yield 'asd';
        },
        fs.createWriteStream('uppercase.txt')
      );
      console.log('Pipeline succeeded.');
    }
    
    run().catch(console.error);
    

    stream.pipeline() will call stream.destroy(err) on all streams except:

    • Readable streams which have emitted 'end' or 'close'.
    • Writable streams which have emitted 'finish' or 'close'.

    stream.pipeline() leaves dangling event listeners on the streams after the callback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.