数据流 - 权威指南

了解如何通过 Streams API 使用可读、可写和转换数据流。

借助 Streams API,您可以以编程方式访问通过网络接收或以任何方式在本地创建的数据流,并使用 JavaScript 处理这些数据流。流式传输是指将您要接收、发送或转换的资源分解为小块,然后逐块处理这些小块。虽然浏览器在接收要在网页上显示的 HTML 或视频等资源时会进行流式传输,但 JavaScript 在 2015 年引入流之前从未具备此功能。fetch

以前,如果您想处理某种资源(无论是视频还是文本文件等),都必须下载整个文件,等待它反序列化为合适的格式,然后才能进行处理。随着 JavaScript 可使用流,这一切都发生了变化。现在,您可以在原始数据在客户端上可用时立即使用 JavaScript 逐步处理这些数据,而无需生成缓冲区、字符串或 Blob。这可实现多种使用场景,下面列出了一些:

  • 视频效果:通过转换流传输可读的视频流,从而实时应用效果。
  • 数据(解)压缩:通过选择性地(解)压缩文件流的转换流来传输文件流。
  • 图片解码:通过转换流将 HTTP 响应流传输到解码字节的位图数据,然后再通过另一个将位图转换为 PNG 的转换流。如果安装在 service worker 的 fetch 处理程序中,则可以透明地对 AVIF 等新图片格式进行 Polyfill 处理。

浏览器支持

ReadableStream 和 WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

核心概念

在详细介绍各种类型的流之前,我先介绍一些核心概念。

数据块是写入数据流或从数据流中读取的一段数据。它可以是任何类型;数据流甚至可以包含不同类型的数据块。在大多数情况下,块并不是给定流的最原子数据单位。例如,字节流可能包含由 16 KiB Uint8Array 单位组成的块,而不是单个字节。

可读数据流

可读数据流表示您可以从中读取数据的数据源。换句话说,数据来自可读的流。具体来说,可读的流是 ReadableStream 类的实例。

可写数据流

可写入的流表示您可以写入数据的目标位置。换句话说,数据进入可写入的流。具体来说,可写入的流是 WritableStream 类的实例。

转换流

转换流由一对流组成:一个可写流(称为其可写端)和一个可读流(称为其可读端)。 一个现实世界的比喻是同声传译员,他们可以实时将一种语言翻译成另一种语言。以特定于转换流的方式,写入可写端会导致新数据可从可读端读取。具体而言,任何具有 writable 属性和 readable 属性的对象都可以用作转换流。不过,标准 TransformStream 类可让您更轻松地创建适当纠缠的此类配对。

管道链

流主要通过相互管道传输来使用。可读流可以直接通过管道传输到可写流(使用可读流的 pipeTo() 方法),也可以先通过一个或多个转换流进行管道传输(使用可读流的 pipeThrough() 方法)。以这种方式将一组流通过管道连接在一起称为管道链。

背压

构建管道链后,它会传播有关块应以多快速度流经它的信号。如果链中的任何步骤尚无法接受块,它会通过管道链向后传播信号,直到最终告知原始来源停止如此快速地生成块。这种归一化流的过程称为反压。

发球

可读的流可以使用其 tee() 方法进行分叉(以大写字母“T”的形状命名)。 这将锁定该流,也就是说,该流将无法再直接使用;不过,系统会创建两个新流(称为分支),这两个新流可以单独使用。分流也很重要,因为流无法回放或重新开始,稍后会详细介绍。

管道链的示意图,该管道链包含来自对 fetch API 的调用的可读数据流,该数据流随后通过转换数据流进行管道传输,转换数据流的输出被分叉,然后发送到浏览器(对于第一个生成的可读数据流)和服务工作线程缓存(对于第二个生成的可读数据流)。
管道链。

可读数据流的机制

可读数据流是一种数据源,在 JavaScript 中由 ReadableStream 对象表示,该对象从底层来源流出。ReadableStream() 构造函数会根据给定的处理程序创建并返回可读的数据流对象。底层来源有两种类型:

  • 推送源会在您访问时不断向您推送数据,您可以自行决定开始、暂停或取消对该数据流的访问。例如,实时视频流、服务器发送的事件或 WebSocket。
  • 拉取来源要求您在连接到来源后明确请求数据。例如,通过 fetch()XMLHttpRequest 调用进行的 HTTP 操作。

流式数据以称为的小块形式按顺序读取。放置在流中的块称为已入队。这意味着它们正在队列中等待读取。内部队列会跟踪尚未读取的块。

排队策略是一个对象,用于根据流的内部队列状态确定流应如何发出背压信号。排队策略会为每个块分配一个大小,并将队列中所有块的总大小与一个指定数字(称为高水位标记)进行比较。

流中的块由读取器读取。此读取器一次检索一个数据块,让您可以对数据执行任何想要的操作。读取器及其附带的其他处理代码称为使用方

此上下文中的下一个结构称为控制器。每个可读数据流都有一个关联的控制器,顾名思义,该控制器可让您控制数据流。

一次只能有一个读取器读取一个流;当读取器创建并开始读取流(即成为活跃读取器)时,它会被锁定到该流。如果您希望另一个读取器接管您的流的读取,通常需要先释放第一个读取器,然后才能执行其他操作(不过您可以分叉流)。

创建可读的数据流

您可以通过调用其构造函数 ReadableStream() 来创建可读的流。构造函数具有一个可选实参 underlyingSource,该实参表示一个对象,其中包含用于定义所构造的流实例的行为的方法和属性。

underlyingSource

这可以使用以下可选的开发者定义的方法:

  • start(controller):在构建对象时立即调用。该方法可以访问流来源,并执行设置流功能所需的任何其他操作。如果此过程以异步方式完成,该方法可以返回一个 promise 来指示成功或失败。传递给此方法的 controller 参数是 ReadableStreamDefaultController
  • pull(controller):可用于在提取更多块时控制流。只要数据流的内部块队列未满,系统就会重复调用该函数,直到队列达到其高水位。如果调用 pull() 的结果是一个 promise,则在 promise 实现之前不会再次调用 pull()。如果 promise 拒绝,则流将变为错误状态。
  • cancel(reason):在流的使用方取消流时调用。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController 支持以下方法:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() 构造函数的第二个实参(同样是可选实参)是 queuingStrategy。这是一个可选择性地为流定义排队策略的对象,它接受两个参数:

  • highWaterMark:一个非负数,用于指示使用此排队策略的流的高水位标记。
  • size(chunk):一种用于计算并返回指定块值的有限非负大小的函数。 该结果用于确定反压,并通过相应的 ReadableStreamDefaultController.desiredSize 属性体现出来。它还控制何时调用底层来源的 pull() 方法。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader()read() 方法

如需从可读数据流中读取数据,您需要一个读取器,即 ReadableStreamDefaultReaderReadableStream 接口的 getReader() 方法会创建一个读取器并将流锁定到该读取器。在流被锁定时,其他读取器无法获取该流,直到此读取器释放该流为止。

ReadableStreamDefaultReader 接口的 read() 方法返回一个 promise,用于提供对流的内部队列中下一个块的访问权限。它会根据流的状态完成或拒绝,并返回相应的结果。可能的情况如下:

  • 如果有可用的块,系统将使用
    { value: chunk, done: false } 形式的对象兑现相应 promise。
  • 如果流变为关闭状态,系统将使用
    形式的对象{ value: undefined, done: true }兑现 promise。
  • 如果视频流出错,promise 将被拒绝并返回相关错误。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 属性

您可以通过访问可读流的 ReadableStream.locked 属性来检查该流是否已锁定。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可读数据流代码示例

以下代码示例展示了所有步骤的实际操作。您首先创建一个 ReadableStream,该 ReadableStream 在其 underlyingSource 实参(即 TimestampSource 类)中定义了一个 start() 方法。此方法指示流的 controller 在 10 秒内每秒 enqueue() 一次时间戳。最后,它会告知控制器 close() 该视频流。您可以通过 getReader() 方法创建一个读取器,并通过调用 read() 来使用此数据流,直到数据流变为 done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

异步迭代

在每次 read() 循环迭代时检查流是否为 done 可能不是最方便的 API。幸运的是,很快就会有一种更好的方法来实现这一点:异步迭代。

for await (const chunk of stream) {
  console.log(chunk);
}

目前,使用异步迭代的解决方法是通过填充实现该行为。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

对可读数据流进行分流

ReadableStream 接口的 tee() 方法会分流当前可读的流,并返回一个包含两个结果分支(作为新的 ReadableStream 实例)的双元素数组。这样,两个读取器就可以同时读取一个流。例如,如果您想从服务器获取响应并将其流式传输到浏览器,但同时也想将其流式传输到服务工作线程缓存,则可以在服务工作线程中执行此操作。由于响应正文只能使用一次,因此您需要两个副本才能实现此目的。如需取消该流,您需要取消这两个分支。对流进行分流通常会锁定该流,防止其他读取器锁定该流。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

可读字节流

对于表示字节的流,系统会提供可读流的扩展版本,以高效处理字节,尤其是通过最大限度地减少复制操作。字节流允许获取自带缓冲区 (BYOB) 读取器。默认实现可以提供一系列不同的输出,例如 WebSocket 的字符串或数组缓冲区,而字节流则保证字节输出。此外,自带设备阅读器还具有稳定性优势。这是因为,如果缓冲区分离,则可以保证不会将内容写入同一缓冲区两次,从而避免出现竞态条件。BYOB 阅读器可以重复使用缓冲区,因此可以减少浏览器需要运行垃圾回收的次数。

创建可读的字节流

您可以通过向 ReadableStream() 构造函数传递额外的 type 参数来创建可读的字节流。

new ReadableStream({ type: 'bytes' });

underlyingSource

可读字节流的底层来源会获得一个 ReadableByteStreamController 以供操作。其 ReadableByteStreamController.enqueue() 方法采用一个 chunk 实参,该实参的值为 ArrayBufferView。属性 ReadableByteStreamController.byobRequest 会返回当前的 BYOB 拉取请求,如果没有,则返回 null。最后,ReadableByteStreamController.desiredSize 属性会返回填充受控流的内部队列所需的预期大小。

queuingStrategy

ReadableStream() 构造函数的第二个实参(同样是可选实参)是 queuingStrategy。这是一个可选择性地为流定义排队策略的对象,它接受一个参数:

  • highWaterMark:一个非负字节数,用于指示使用此排队策略的流的高水位标记。 这用于确定反压,通过相应的 ReadableByteStreamController.desiredSize 属性体现出来。它还控制何时调用底层来源的 pull() 方法。

getReader()read() 方法

然后,您可以通过相应地设置 mode 参数来获取对 ReadableStreamBYOBReader 的访问权限:ReadableStream.getReader({ mode: "byob" })。这样可以更精确地控制缓冲区分配,从而避免复制。如需从字节流中读取数据,您需要调用 ReadableStreamBYOBReader.read(view),其中 viewArrayBufferView

可读字节流代码示例

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

以下函数返回可读的字节流,从而可以高效地对随机生成的数组进行零复制读取。它不会使用预定的 1,024 块大小,而是尝试填充开发者提供的缓冲区,从而实现完全控制。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

可写数据流的机制

可写数据流是您可以写入数据的目标,在 JavaScript 中由 WritableStream 对象表示。它充当底层接收器(原始数据写入其中的较低级 I/O 接收器)之上的抽象层。

数据通过写入器一次一个数据块地写入到流中。块可以采用多种形式,就像阅读器中的块一样。您可以使用任何喜欢的代码来生成可供写入的块;写入器和关联的代码称为生产者

当创建写入器并开始向流写入数据时(即活跃写入器),该写入器即被锁定到该流。一次只能有一个写入器写入可写入的流。如果您希望其他写入器开始写入您的流,通常需要先释放该流,然后再将另一个写入器附加到该流。

内部队列用于跟踪已写入流但尚未由底层接收器处理的块。

排队策略是一个对象,用于根据流的内部队列状态确定流应如何发出背压信号。排队策略会为每个块分配一个大小,并将队列中所有块的总大小与一个指定数字(称为高水位标记)进行比较。

最终的构造称为控制器。每个可写入的流都有一个关联的控制器,可用于控制该流(例如,中止该流)。

创建可写入的流

Streams API 的 WritableStream 接口提供了一种标准抽象,用于将流式数据写入目的地(称为接收器)。此对象自带内置反压和排队功能。您可以通过调用其构造函数 WritableStream() 来创建可写入的流。 它有一个可选的 underlyingSink 参数,表示一个包含方法和属性的对象,用于定义所构建的流实例的行为方式。

underlyingSink

underlyingSink 可以包含以下可选的开发者定义的方法。传递给某些方法的 controller 参数是 WritableStreamDefaultController

  • start(controller):此方法会在对象构建时立即调用。此方法的内容应旨在获取对底层接收器的访问权限。如果此过程要异步完成,它可以返回一个 promise 来指示成功或失败。
  • write(chunk, controller):当新数据块(在 chunk 参数中指定)准备好写入底层接收器时,系统会调用此方法。它可以返回一个 promise 来指示写入操作的成功或失败。此方法仅在之前的写入操作成功完成后调用,绝不会在流关闭或中止后调用。
  • close(controller):如果应用发出信号表示已完成向流写入块,系统将调用此方法。内容应执行完成对底层接收器的写入操作并释放对该接收器的访问权限所需的一切操作。如果此过程是异步的,它可以返回一个 promise 来指示成功或失败。只有在所有排队的写入操作都成功完成后,才会调用此方法。
  • abort(reason):如果应用发出信号,表明其希望突然关闭流并将其置于错误状态,系统将调用此方法。它可以清理所有已保留的资源,与 close() 非常相似,但即使写入操作已排队,也会调用 abort()。这些块将被舍弃。如果此过程是异步的,则可以返回一个 promise 来指示成功或失败。reason 参数包含一个 DOMString,用于说明数据流中止的原因。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API 的 WritableStreamDefaultController 接口表示一个控制器,用于在设置期间、提交更多块以进行写入时或写入结束时控制 WritableStream 的状态。构建 WritableStream 时,系统会为底层 sink 提供相应的 WritableStreamDefaultController 实例以供操作。WritableStreamDefaultController 只有一个方法:WritableStreamDefaultController.error(),该方法会导致与关联数据流的任何未来互动出现错误。WritableStreamDefaultController 还支持 signal 属性,该属性会返回 AbortSignal 的实例,从而在需要时停止 WritableStream 操作。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() 构造函数的第二个实参(同样是可选实参)是 queuingStrategy。这是一个可选择性地为流定义排队策略的对象,它接受两个参数:

  • highWaterMark:一个非负数,用于指示使用此排队策略的流的高水位标记。
  • size(chunk):一种用于计算并返回指定块值的有限非负大小的函数。 结果用于确定反压力,并通过相应的 WritableStreamDefaultWriter.desiredSize 属性体现出来。

getWriter()write() 方法

如需写入可写出流,您需要一个写入器,该写入器将是一个 WritableStreamDefaultWriterWritableStream 接口的 getWriter() 方法会返回 WritableStreamDefaultWriter 的新实例,并将流锁定到该实例。在流被锁定时,在当前写入器释放之前,无法获取其他写入器。

WritableStreamDefaultWriter 接口的 write() 方法会将传递的数据块写入 WritableStream 及其底层接收器,然后返回一个 promise,该 promise 会解析以指示写入操作成功还是失败。请注意,“成功”的含义取决于底层接收器;它可能表示块已被接受,但不一定表示块已安全地保存到其最终目的地。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 属性

您可以通过访问可写入流的 WritableStream.locked 属性来检查该流是否已锁定。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可写入的流代码示例

以下代码示例展示了所有步骤的实际操作。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

将可读数据流通过管道传输到可写数据流

可读数据流可以通过其 pipeTo() 方法传输到可写数据流。 ReadableStream.pipeTo() 将当前 ReadableStream 通过管道传输到给定的 WritableStream,并返回一个 promise,该 promise 在管道传输过程成功完成时兑现,或在遇到任何错误时拒绝。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

创建转换流

Streams API 的 TransformStream 接口表示一组可转换的数据。您可以通过调用其构造函数 TransformStream() 来创建转换流,该构造函数会根据给定的处理程序创建并返回转换流对象。TransformStream() 构造函数接受一个可选的 JavaScript 对象作为其第一个实参,该对象表示 transformer。此类对象可以包含以下任一方法:

transformer

  • start(controller):此方法会在对象构建时立即调用。通常,这用于使用 controller.enqueue() 将前缀块加入队列。这些块将从可读侧读取,但不依赖于对可写侧的任何写入。如果此初始过程是异步的,例如,因为获取前缀块需要一些时间,则该函数可以返回一个 promise 来表示成功或失败;被拒绝的 promise 会导致流出错。所有抛出的异常都将由 TransformStream() 构造函数重新抛出。
  • transform(chunk, controller):当最初写入可写端的新块准备好进行转换时,系统会调用此方法。流实现保证此函数仅在之前的转换成功后调用,并且绝不会在 start() 完成之前或 flush() 调用之后调用。此函数执行转换数据流的实际转换工作。它可以使用 controller.enqueue() 将结果加入队列。这样一来,写入到可写侧的单个块可能会在可读侧产生零个或多个块,具体取决于 controller.enqueue() 的调用次数。如果转换过程是异步的,此函数可以返回一个 promise 来指示转换成功或失败。被拒绝的 promise 会导致转换流的可读端和可写端都出错。如果未提供任何 transform() 方法,则使用身份转换,该转换会将可写侧的块原封不动地加入可读侧的队列。
  • flush(controller):在写入到可写侧的所有块都已成功通过 transform() 进行转换,并且可写侧即将关闭后,系统会调用此方法。通常,这用于在后缀块也变为关闭状态之前,将其排队到可读的一侧。如果刷新过程是异步的,该函数可以返回一个 promise 来指示成功或失败;结果将传达给 stream.writable.write() 的调用方。此外,被拒绝的 promise 会导致可读端和可写端都出现错误。抛出异常与返回被拒绝的 promise 的处理方式相同。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy 排队策略

TransformStream() 构造函数的第二和第三个可选参数是可选的 writableStrategyreadableStrategy 排队策略。它们分别定义为可读可写部分中所述。

转换流代码示例

以下代码示例展示了一个简单的转换流的实际应用。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

通过转换流传输可读数据流

ReadableStream 接口的 pipeThrough() 方法提供了一种可链接的方式,用于通过转换流或任何其他可写/可读对来传输当前流。管道传输流通常会在管道传输期间锁定该流,从而阻止其他读取器锁定该流。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

下一个代码示例(有点牵强)展示了如何实现 fetch() 的“大喊”版本,该版本通过将返回的响应 promise 作为流使用并逐块转换为大写,将所有文本转换为大写。这种方法的优势在于,您无需等待整个文档下载完毕,这在处理大型文件时会带来巨大差异。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

演示

以下演示展示了可读、可写和转换数据流的实际应用。它还包含 pipeThrough()pipeTo() 管道链的示例,并演示了 tee()。您可以选择在自己的窗口中运行演示,也可以查看源代码

浏览器中可用的实用信息流

浏览器中内置了许多实用的信息流。您可以轻松地从 blob 创建 ReadableStreamBlob 接口的 stream() 方法会返回一个 ReadableStream,该 ReadableStream 在读取时会返回 blob 中包含的数据。另请注意,File 对象是一种特殊的 Blob,可在任何可以使用 blob 的情境中使用。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() 的流式变体分别称为 TextDecoderStreamTextEncoderStream

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

使用 CompressionStreamDecompressionStream 转换流可以轻松压缩或解压缩文件。以下代码示例展示了如何在浏览器中直接下载 Streams 规范、对其进行压缩 (gzip),并将压缩后的文件直接写入磁盘。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream 和实验性 fetch() 请求流 是实际应用中的可写入流示例。

Serial API 大量使用可读写的数据流。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最后,WebSocketStream API 将流与 WebSocket API 集成。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

实用资源

致谢

本文已由 Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe MedleyAdam Rice 审核。Jake Archibald 的博文帮助我更好地了解了数据流。部分代码示例的灵感来自 GitHub 用户 @bellbind 的探索,部分散文内容在很大程度上借鉴了 MDN Web 文档中的 StreamsStreams Standard作者在撰写此规范方面做得非常出色。英雄图片由 Ryan LaraUnsplash 上提供。