ストリーム - 決定版ガイド

Streams API でストリームの読み取り、書き込み、変換を行う方法について説明します。

Streams API を使用すると、ネットワーク経由で受信したデータ ストリームやローカルでなんらかの手段で作成されたデータのストリームに、プログラムからアクセスし、JavaScript で処理できます。ストリーミングでは、受信、送信、変換するリソースを小さなチャンクに分割し、これらのチャンクをビット単位で処理します。ストリーミングは、ウェブページに表示する HTML や動画などのアセットを受信する際にブラウザが行う操作ですが、2015 年にストリーミングを含む fetch が導入されるまでは、JavaScript でこの機能を利用できませんでした。

以前は、なんらかのリソース(動画やテキスト ファイルなど)を処理する場合、ファイル全体をダウンロードし、適切な形式にシリアル化されるのを待ってから処理する必要がありました。ストリームが JavaScript で利用可能になると、すべてが変わります。バッファ、文字列、BLOB を生成することなく、クライアントで利用可能になり次第、JavaScript で未加工データを段階的に処理できるようになりました。これにより、次のようなさまざまなユースケースが可能になります。

  • 動画エフェクト: 読み取り可能な動画ストリームを変換ストリームにパイプし、エフェクトをリアルタイムで適用します。
  • データの圧縮と解凍: ファイル ストリームを変換ストリームにパイプし、選択的に圧縮と解凍を行います。
  • 画像のデコード: HTTP レスポンス ストリームを、バイトをビットマップ データにデコードする変換ストリームにパイプし、次にビットマップを PNG に変換する別の変換ストリームにパイプします。Service Worker の fetch ハンドラ内にインストールすると、AVIF などの新しい画像形式を透過的にポリフィルできます。

ブラウザ サポート

ReadableStream と WritableStream

対応ブラウザ

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65。
  • Safari: 10.1。

ソース

TransformStream

対応ブラウザ

  • Chrome: 67。
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1。

ソース

基本コンセプト

さまざまなタイプのストリームについて詳しく説明する前に、いくつかのコアコンセプトについて説明します。

チャンク

チャンクは、ストリームに書き込まれるかストリームから読み取られる単一のデータです。任意のタイプにできます。ストリームには、異なるタイプのチャンクを含めることもできます。ほとんどの場合、チャンクは特定のストリームの最もアトミックなデータ単位ではありません。たとえば、バイト ストリームには、1 バイトではなく 16 KiB の Uint8Array ユニットで構成されるチャンクが含まれている場合があります。

読み取り可能なストリーム

読み取り可能なストリームは、読み取り可能なデータソースを表します。つまり、読み取り可能なストリームからデータが出力されます。具体的には、読み取り可能なストリームは ReadableStream クラスのインスタンスです。

書き込み可能なストリーム

書き込み可能なストリームは、書き込み可能なデータの宛先を表します。つまり、データは書き込み可能なストリームに書き込まれます。具体的には、書き込み可能なストリームは WritableStream クラスのインスタンスです。

ストリームを変換する

変換ストリームは、書き込み可能なストリーム(書き込み側)と読み取り可能なストリーム(読み取り側)のストリームペアで構成されます。これを現実世界に例えると、ある言語から別の言語に即時に翻訳する同時通訳者です。変換ストリームに固有の方法で、書き込み可能な側に書き込むと、読み取り可能な側から読み取ることができる新しいデータが作成されます。具体的には、writable プロパティと readable プロパティを持つオブジェクトは変換ストリームとして機能します。ただし、標準の TransformStream クラスを使用すると、適切にエンタングルされたこのようなペアを簡単に作成できます。

パイプチェーン

ストリームは主に、ストリームを相互にパイプするために使用されます。読み取り可能なストリームは、読み取り可能なストリームの pipeTo() メソッドを使用して書き込み可能なストリームに直接パイプできます。または、読み取り可能なストリームの pipeThrough() メソッドを使用して、1 つ以上の変換ストリームを介してパイプすることもできます。この方法でパイプで接続された一連のストリームは、パイプチェーンと呼ばれます。

バックプレッシャー

パイプチェーンが構築されると、チャンクがパイプチェーン内をどのくらいの速さで流すべきかに関するシグナルが伝播されます。チェーンのいずれかのステップでチャンクを受け入れられない場合、パイプチェーンを通じてシグナルが後方に伝播され、最終的に元のソースにチャンクの生成を停止するよう指示されます。フローを正規化するこのプロセスは、バックプレッシャーと呼ばれます。

ティーアップ

読み取り可能なストリームは、tee() メソッドを使用してテイク(大文字の「T」の形にちなんで命名)できます。これにより、ストリームがロックされます。つまり、ストリームを直接使用できなくなります。ただし、ブランチと呼ばれる2 つの新しいストリームが作成され、個別に使用できます。ストリーミングは巻き戻しや再開できないため、ティーイングも重要です。詳しくは後述します。

フェッチ API の呼び出しからの読み取り可能なストリームで構成されるパイプ チェーンの図。このストリームは変換ストリームにパイプされ、出力が T アウトされ、ブラウザに送られる最初の読み取り可能なストリーム、2 番目の読み取り可能なストリームは Service Worker のキャッシュに送信されます。
パイプチェーン。

読み取り可能なストリームの仕組み

読み取り可能なストリームは、基盤となるソースから流れる ReadableStream オブジェクトによって JavaScript で表されるデータソースです。ReadableStream() コンストラクタは、指定されたハンドラから読み取り可能なストリーム オブジェクトを作成して返します。基盤となるソースには次の 2 種類があります。

  • push ソースは、アクセスした時点で常にデータを push します。ストリームへのアクセスの開始、一時停止、キャンセルはユーザーが自由に行えます。例としては、ライブ動画ストリーム、サーバー送信イベント、WebSocket があります。
  • プルソースでは、接続後に明示的にデータをリクエストする必要があります。たとえば、fetch() 呼び出しまたは XMLHttpRequest 呼び出しを介した HTTP オペレーションがあります。

ストリームデータは、チャンクと呼ばれる小さな部分で順番に読み取られます。ストリームに配置されたチャンクは「エンキューされる」と呼ばれます。つまり、読み取りの準備ができているキュー内で待機中です。内部キューは、まだ読み込まれていないチャンクを記録します。

キュー戦略は、内部キューの状態に基づいてストリームがバックプレッシャーを通知する方法を決定するオブジェクトです。キューイング戦略では、各チャンクにサイズを割り当て、キュー内のすべてのチャンクの合計サイズを指定された数(ハイ ウォーターマーク)と比較します。

ストリーム内のチャンクは、リーダーによって読み取られます。このリーダーはデータを一度に 1 つずつ取得するため、どのようなオペレーションでも実行できます。リーダーとそれに付随する他の処理コードは、コンシューマと呼ばれます。

このコンテキストの次のコンストラクトはコントローラと呼ばれます。読み取り可能なストリームには、名前が示すようにストリームを制御できるコントローラが関連付けられています。

ストリームを読み取ることができるリーダーは一度に 1 つのみです。リーダーが作成されてストリームの読み取りを開始する(つまり、アクティブなリーダーになる)と、そのリーダーはストリームにロックされます。ストリームの読み取りを別のリーダーに引き継ぐ場合は、通常、他の操作を行う前に、最初のリーダーを「解放」する必要があります(ストリームを「ティー」することは可能です)。

読み取り可能なストリームの作成

読み取り可能なストリームを作成するには、コンストラクタ ReadableStream() を呼び出します。コンストラクタにはオプションの引数 underlyingSource があります。これは、作成されたストリーム インスタンスの動作を定義するメソッドとプロパティを持つオブジェクトを表します。

underlyingSource

これには、次のデベロッパー定義のメソッド(省略可)を使用できます。

  • start(controller): オブジェクトが作成されるとすぐに呼び出されます。このメソッドはストリームソースにアクセスし、ストリーム機能の設定に必要なその他の処理を行うことができます。このプロセスを非同期で行う場合は、メソッドが Promise を返して成功または失敗を通知できます。このメソッドに渡される controller パラメータは ReadableStreamDefaultController です。
  • pull(controller): チャンクの取得が進むにつれてストリームを制御するために使用できます。ストリームの内部のチャンクのキューがいっぱいでなければ、キューがハイ ウォーターマークに達するまで繰り返し呼び出されます。pull() の呼び出しの結果がプロミスの場合、そのプロミスが完了するまで pull() は再び呼び出されません。Promise が拒否されると、ストリームはエラーになります。
  • cancel(reason): ストリーム コンシューマがストリームをキャンセルしたときに呼び出されます。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController は次のメソッドをサポートしています。

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() コンストラクタの 2 つ目の引数(こちらも省略可)は queuingStrategy です。このオブジェクトはオプションで、ストリームのキューイング戦略を定義するオブジェクトで、次の 2 つのパラメータを取ります。

  • highWaterMark: このキュー戦略を使用するストリームのハイウォーターマークを示す正の整数。
  • size(chunk): 指定されたチャンク値の有限の正のサイズを計算して返す関数。結果はバックプレッシャーの決定に使用され、適切な ReadableStreamDefaultController.desiredSize プロパティを介して表示されます。また、基盤となるソースの pull() メソッドが呼び出されるタイミングも制御します。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader() メソッドと read() メソッド

読み取り可能なストリームから読み取るには、読み取りツール(ReadableStreamDefaultReader)が必要です。ReadableStream インターフェースの getReader() メソッドはリーダーを作成し、そのリーダーにストリームをロックします。ストリームがロックされている間は、このリーダーが解放されるまで他のリーダーを取得できません。

ReadableStreamDefaultReader インターフェースの read() メソッドは、ストリームの内部キュー内の次のチャンクにアクセスするためのプロミスを返します。ストリームの状態に応じて、結果を満たすか拒否します。次のようなさまざまな可能性があります。

  • チャンクが利用可能な場合、Promise は
    { value: chunk, done: false } という形式のオブジェクトで履行されます。
  • ストリームが閉じられると、Promise は
    { value: undefined, done: true } 形式のオブジェクトで解決されます。
  • ストリームでエラーが発生すると、Promise は関連するエラーで拒否されます。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked プロパティ

読み取り可能なストリームがロックされているかどうかを確認するには、その ReadableStream.locked プロパティにアクセスします。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

読み取り可能なストリームのコードサンプル

次のコードサンプルは、すべての手順を示しています。まず、underlyingSource 引数(TimestampSource クラス)で start() メソッドを定義する ReadableStream を作成します。このメソッドは、10 秒間、1 秒ごとにタイムスタンプを enqueue() するようにストリームの controller に指示します。最後に、ストリームを close() するようにコントローラに指示します。このストリームを消費するには、getReader() メソッドを使用してリーダーを作成し、ストリームが done になるまで read() を呼び出します。

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

非同期反復処理

read() ループの反復処理ごとにストリームが done かどうかを確認することは、最も便利な API ではない場合があります。幸い、この問題を解決するより優れた方法がまもなく登場します。それが非同期反復処理です。

for await (const chunk of stream) {
  console.log(chunk);
}

非同期イテレーションを使用するための回避策として、ポリフィルを使用して動作を実装します。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

読み取り可能なストリームのティーイン

ReadableStream インターフェースの tee() メソッドは、現在読み取り可能なストリームを T 型化し、生成された 2 つのブランチを新しい ReadableStream インスタンスとして含む 2 要素の配列を返します。これにより、2 つのリーダーがストリームを同時に読み取ることができます。たとえば、サーバーからレスポンスを取得してブラウザにストリーミングするだけでなく、サービス ワーカー キャッシュにもストリーミングする場合は、サービス ワーカーでこれを行うことができます。レスポンスの本文は複数回使用できないため、2 つのコピーが必要です。ストリームをキャンセルするには、生成された両方のブランチをキャンセルする必要があります。通常、ストリームをティーリングすると、その間はロックされ、他のリーダーがストリームをロックできなくなります。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

読み取り可能なバイト ストリーム

バイトを表すストリームの場合、読み取り可能なストリームの拡張バージョンが提供され、特にコピーを最小限に抑えることで、バイトを効率的に処理できます。バイト ストリームを使用すると、BYOB リーダーを取得できます。デフォルトの実装では、WebSocket の場合は文字列や配列バッファなど、さまざまな出力を提供できますが、バイト ストリームではバイト出力が保証されます。また、BYOB リーダーは安定性にも優れています。これは、バッファが切断されると、同じバッファに 2 回書き込まれることが保証され、競合状態を回避できるためです。BYOB リーダーはバッファを再利用できるため、ブラウザでガベージ コレクションを実行する回数を減らすことができます。

読み取り可能なバイト ストリームを作成する

読み取り可能なバイト ストリームを作成するには、ReadableStream() コンストラクタに追加の type パラメータを渡します。

new ReadableStream({ type: 'bytes' });

underlyingSource

読み取り可能なバイト ストリームの基になるソースには、操作する ReadableByteStreamController が渡されます。ReadableByteStreamController.enqueue() メソッドは、値が ArrayBufferViewchunk 引数を取ります。プロパティ ReadableByteStreamController.byobRequest は、現在の BYOB プルリクエストを返します。リクエストが存在しない場合は null を返します。最後に、ReadableByteStreamController.desiredSize プロパティは、制御対象のストリームの内部キューを埋めるために必要なサイズを返します。

queuingStrategy

同様にオプションの ReadableStream() コンストラクタの 2 番目の引数は queuingStrategy です。これは、ストリームのキュー戦略を必要に応じて定義するオブジェクトで、1 つのパラメータを受け取ります。

  • highWaterMark: このキューイング ストラテジーを使用するストリームのハイウォーターマークを示す正のバイト数。これはバックプレッシャーを判断するために使用され、適切な ReadableByteStreamController.desiredSize プロパティによって表されます。また、基盤となるソースの pull() メソッドが呼び出されるタイミングも制御します。

getReader() メソッドと read() メソッド

次に、mode パラメータを ReadableStream.getReader({ mode: "byob" }) に設定して、ReadableStreamBYOBReader にアクセスできるようにします。これにより、コピーを回避するためにバッファ割り当てをより正確に制御できます。バイト ストリームから読み取るには、ReadableStreamBYOBReader.read(view) を呼び出す必要があります。ここで、viewArrayBufferView です。

読み取り可能なバイト ストリームのコードサンプル

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

次の関数は、ランダムに生成された配列を効率的にゼロコピーで読み取ることができる、読み取り可能なバイト ストリームを返します。事前定義された 1,024 のチャンクサイズを使用する代わりに、デベロッパーが指定したバッファを埋めようとします。これにより、完全な制御が可能になります。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

書き込み可能なストリームの仕組み

書き込み可能なストリームは、データを書き込むことができる宛先です。JavaScript では WritableStream オブジェクトで表されます。これは、基盤となるシンク(未加工データが書き込まれる下位レベルの I/O シンク)の上に位置する抽象化として機能します。

データは、ライターによって一度に 1 つのチャンクずつストリームに書き込まれます。リーダーのチャンクと同様に、チャンクにはさまざまな形態があります。書き込み可能なチャンクを生成するために、任意のコードを使用できます。書き込みツールと関連コードはプロデューサーと呼ばれます。

書き込みが作成され、ストリームへの書き込みを開始すると(アクティブな書き込み)、そのストリームにロックされます。書き込み可能なストリームに一度に書き込めるのは、1 つのライターだけです。別の書き込み元がストリームへの書き込みを開始するには、通常、ストリームを解放してから、別の書き込み元をストリームに接続する必要があります。

内部キューは、ストリームに書き込まれたが、基盤となるシンクによってまだ処理されていないチャンクを保持します。

キュー戦略は、内部キューの状態に基づいてストリームがバックプレッシャーを通知する方法を決定するオブジェクトです。キュー戦略は、各チャンクにサイズを割り当て、キュー内のすべてのチャンクの合計サイズを、ハイウォーターマークと呼ばれる指定された数と比較します。

最後のコンストラクトはコントローラと呼ばれます。書き込み可能なストリームには、ストリームを制御(中止など)できるコントローラが関連付けられています。

書き込み可能なストリームを作成する

Streams API の WritableStream インターフェースは、シンクと呼ばれる宛先にストリーミング データを書き込むための標準的な抽象化機能を提供します。このオブジェクトには、バックプレッシャーとキューイングが組み込まれています。書き込み可能なストリームを作成するには、コンストラクタ WritableStream() を呼び出します。オプションの underlyingSink パラメータがあります。これは、作成されたストリーム インスタンスの動作を定義するメソッドとプロパティを持つオブジェクトを表します。

underlyingSink

underlyingSink には、デベロッパーが定義した次のオプション メソッドを含めることができます。一部のメソッドに渡される controller パラメータは WritableStreamDefaultController です。

  • start(controller): このメソッドは、オブジェクトが作成されるとすぐに呼び出されます。このメソッドの内容は、基盤となるシンクにアクセスできるようにすることを目的としています。このプロセスを非同期で行う場合は、成功または失敗を通知する Promise を返すことができます。
  • write(chunk, controller): このメソッドは、新しいデータのチャンク(chunk パラメータで指定)を基盤となるシンクに書き込む準備が整うと呼び出されます。書き込みオペレーションの成功または失敗を通知する Promise を返すことができます。このメソッドは、前の書き込みが成功した後にのみ呼び出され、ストリームが閉じられたり中断されたりした後に呼び出されることはありません。
  • close(controller): アプリがストリームへのチャンクの書き込みが完了したことを通知すると、このメソッドが呼び出されます。コンテンツは、基盤となるシンクへの書き込みを確定させ、そのアクセス権を解放するために必要なことをすべて行う必要があります。このプロセスが非同期の場合、成功または失敗を通知する Promise を返すことができます。このメソッドは、キューに格納されたすべての書き込みが成功した場合にのみ呼び出されます。
  • abort(reason): このメソッドは、ストリームを突然閉じてエラー状態にするようにアプリがシグナルを送信すると呼び出されます。close() と同様に、保持されているリソースをクリーンアップできますが、書き込みがキューに追加されている場合でも abort() が呼び出されます。これらのチャンクは破棄されます。このプロセスが非同期の場合は、Promise を返して成功または失敗を知らせることができます。reason パラメータには、ストリームが中断された理由を説明する DOMString が含まれます。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API の WritableStreamDefaultController インターフェースは、書き込み用により多くのチャンクが送信されたとき、または書き込み終了時に設定中、WritableStream の状態を制御できるコントローラを表します。WritableStream を作成すると、基になるシンクに、操作する対応する WritableStreamDefaultController インスタンスが渡されます。WritableStreamDefaultController には WritableStreamDefaultController.error() というメソッドが 1 つだけあります。このメソッドを使用すると、関連するストリームとの今後のやり取りでエラーが発生します。WritableStreamDefaultController は、AbortSignal のインスタンスを返す signal プロパティもサポートしています。これにより、必要に応じて WritableStream オペレーションを停止できます。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() コンストラクタの 2 つ目の引数(こちらも省略可)は queuingStrategy です。ストリームのキューイング戦略を必要に応じて定義するオブジェクトです。このオブジェクトは次の 2 つのパラメータを取ります。

  • highWaterMark: このキュー戦略を使用するストリームのハイウォーターマークを示す正の整数。
  • size(chunk): 指定されたチャンク値の有限の正のサイズを計算して返す関数。結果はバックプレッシャーを判断するために使用され、適切な WritableStreamDefaultWriter.desiredSize プロパティによって示されます。

getWriter() メソッドと write() メソッド

書き込み可能なストリームに書き込むには、WritableStreamDefaultWriter という書き込みツールが必要です。WritableStream インターフェースの getWriter() メソッドは、WritableStreamDefaultWriter の新しいインスタンスを返して、そのインスタンスにストリームをロックします。ストリームがロックされている間、現在の書き込み元が解放されるまで、他の書き込み元を取得することはできません。

WritableStreamDefaultWriter インターフェースの write() メソッドは、渡されたデータのチャンクを WritableStream とその基盤となるシンクに書き込み、書き込みオペレーションの成功または失敗を示す Promise を返します。「成功」の意味は基盤となるシンクによって異なります。チャンクが受け入れられたことを示す場合もありますが、最終的な宛先に安全に保存されたことを示すとは限りません。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked プロパティ

書き込み可能なストリームがロックされているかどうかは、WritableStream.locked プロパティにアクセスすることで確認できます。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

書き込み可能なストリームのコードサンプル

次のコードサンプルは、すべての手順を示しています。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

読み取り可能なストリームを書き込み可能なストリームにパイプする

読み取り可能なストリームは、読み取り可能なストリームの pipeTo() メソッドを使用して、書き込み可能なストリームにパイプできます。ReadableStream.pipeTo() は、現在の ReadableStream を指定された WritableStream にパイプし、パイプ処理が正常に完了すると解決される Promise を返します。エラーが発生した場合は拒否します。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

変換ストリームの作成

Streams API の TransformStream インターフェースは、変換可能なデータのセットを表します。変換ストリームを作成するには、コンストラクタ TransformStream() を呼び出します。このコンストラクタは、指定されたハンドラから変換ストリーム オブジェクトを作成して返します。TransformStream() コンストラクタは、最初の引数として transformer を表す JavaScript オブジェクトを受け取ります(省略可)。このようなオブジェクトには、次のいずれかの方法を含めることができます。

transformer

  • start(controller): このメソッドは、オブジェクトが作成されるとすぐに呼び出されます。通常、これは controller.enqueue() を使用してプレフィックス チャンクをキューに追加するために使用されます。これらのチャンクは読み取り側から読み取られますが、書き込み可能側への書き込みには依存しません。この初期プロセスが非同期の場合(接頭辞チャンクの取得に時間がかかるためなど)、関数は成功または失敗を通知するプロミスを返すことができます。拒否されたプロミスはストリームにエラーを返します。スローされた例外は、TransformStream() コンストラクタによって再スローされます。
  • transform(chunk, controller): このメソッドは、書き込み可能な側に最初に書き込まれた新しいチャンクの変換が準備できたときに呼び出されます。ストリームの実装により、この関数は前の変換が成功した場合にのみ呼び出されることが保証され、start() が完了する前や flush() が呼び出された後に呼び出されることはありません。この関数は、変換ストリームの実際の変換作業を行います。controller.enqueue() を使用して結果をキューに追加できます。これにより、書き込み可能側に書き込まれる 1 つのチャンクが、controller.enqueue() の呼び出し回数に応じて読み取り側でゼロまたは複数のチャンクになることが許可されます。変換プロセスが非同期の場合、この関数は変換の成功または失敗を通知する Promise を返すことができます。拒否された Promise は、変換ストリームの読み取り側と書き込み側の両方でエラーになります。transform() メソッドが指定されていない場合は、ID 変換が使用されます。この変換では、書き込み側から読み取り側に変更されていないチャンクがキューに追加されます。
  • flush(controller): このメソッドは、書き込み可能な側に書き込まれたすべてのチャンクが transform() を正常に通過して変換され、書き込み可能な側が閉じられようとしているときに呼び出されます。通常、これは、読み取り側が閉じられてしまう前に、接尾辞チャンクを読み取り側にエンキューに追加するために使用されます。フラッシュ プロセスが非同期の場合、関数は成功または失敗を通知する Promise を返すことができます。結果は stream.writable.write() の呼び出し元に通知されます。また、Promise が拒否された場合、ストリームの読み取り側と書き込み可能側の両方でエラーが発生します。例外をスローすることは、拒否された Promise を返すことと同じように扱われます。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategy キューイング戦略と readableStrategy キューイング戦略

TransformStream() コンストラクタの 2 番目と 3 番目のオプション パラメータは、オプションの writableStrategy キュー戦略と readableStrategy キュー戦略です。これらは、読み取り可能ストリーム セクションと書き込み可能ストリーム セクションで説明されているように定義されます。

変換ストリームのコードサンプル

次のコードサンプルは、単純な変換ストリームの動作を示しています。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

読み取り可能なストリームを変換ストリームにパイプする

ReadableStream インターフェースの pipeThrough() メソッドは、変換ストリームまたは他の書き込み / 読み取りペアで現在のストリームをパイプする連結可能な方法を提供します。通常、ストリームをパイプすると、パイプ中はロックされ、他の読み取り側がロックできなくなります。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

次のコードサンプル(少し不自然ですが)は、返されたレスポンス プロミスをストリームとして消費し、チャンクごとに大文字に変換して、すべてのテキストを大文字に変換する fetch() の「大声」バージョンを実装する方法を示しています。この方法の利点は、ドキュメント全体がダウンロードされるのを待つ必要がないことです。これは、サイズの大きいファイルを扱う場合に大きな違いを生む可能性があります。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

デモ

次のデモは、読み取り可能ストリーム、書き込み可能ストリーム、変換ストリームの動作を示しています。また、pipeThrough()pipeTo() のパイプ チェーンの例を示し、tee() についても説明します。必要に応じて、デモを独自のウィンドウで実行したり、ソースコードを表示したりできます。

ブラウザで便利なストリームを利用可能

ブラウザには、便利なストリームが組み込まれています。ReadableStream は、Blob から簡単に作成できます。Blob インターフェースの stream() メソッドは ReadableStream を返します。この ReadableStream は、読み取り時に blob に含まれるデータを返します。また、File オブジェクトは特定の種類の Blob であり、blob が使用できるすべてのコンテキストで使用できることも覚えておいてください。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() のストリーミング バリアントは、それぞれ TextDecoderStreamTextEncoderStream と呼ばれます。

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

ファイルの圧縮と解凍は、それぞれ CompressionStream 変換ストリームと DecompressionStream 変換ストリームを使用して簡単に行えます。次のコードサンプルは、Streams 仕様をダウンロードし、ブラウザ内で圧縮(gzip)して、圧縮ファイルをディスクに直接書き込む方法を示しています。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream と試験運用版の fetch() リクエスト ストリームは、実環境で書き込み可能なストリームの例です。

Serial API は、読み取り可能ストリームと書き込み可能ストリームの両方を多用します。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最後に、WebSocketStream API はストリームを WebSocket API と統合します。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

関連リソース

謝辞

この記事は、Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe MedleyAdam Rice によるレビューを経て公開されました。Jake Archibald のブログ投稿は、ストリームを理解するのに大いに役立ちました。コードサンプルの一部は、GitHub ユーザー @bellbind の調査に基づいています。文章の一部は、MDN ウェブドキュメントのストリームをベースにしています。Streams Standard作成者は、この仕様の記述に多大な貢献をしています。ヒーロー画像(Ryan LaraUnsplash に投稿)。