ストリーム - 決定版ガイド

Streams API を使用して、読み取り可能なストリーム、書き込み可能なストリーム、変換ストリームを使用する方法について学びます。

Streams API を使用すると、ネットワーク経由で受信されたデータのストリームにプログラムでアクセスしたり、ローカルで任意の方法で作成されたデータのストリームをプログラムでアクセスしたりして、JavaScript で処理できます。ストリーミングでは、受信、送信、変換するリソースを小さなチャンクに分割し、これらのチャンクをビット単位で処理します。ストリーミングは、ウェブページに表示する HTML や動画などのアセットを受信する際にブラウザが行う処理ですが、2015 年にストリーミングを含む fetch が導入されるまでは、JavaScript でこの機能を利用できませんでした。

以前は、なんらかのリソース(動画やテキスト ファイルなど)を処理する場合、ファイル全体をダウンロードし、適切な形式にシリアル化されるのを待ってから処理する必要がありました。ストリームが JavaScript で利用可能になると、すべてが変わります。バッファ、文字列、BLOB を生成することなく、クライアントで利用可能になり次第、JavaScript で未加工データを段階的に処理できるようになりました。これにより、次のようなさまざまなユースケースが可能になります。

  • 動画エフェクト: 読み取り可能な動画ストリームを変換ストリームにパイプし、エフェクトをリアルタイムで適用します。
  • データの圧縮と解凍: ファイル ストリームを変換ストリームにパイプし、選択的に圧縮と解凍を行います。
  • 画像のデコード: HTTP レスポンス ストリームを、バイトをビットマップ データにデコードする変換ストリームにパイプし、次にビットマップを PNG に変換する別の変換ストリームにパイプします。Service Worker の fetch ハンドラ内にインストールすると、AVIF などの新しい画像形式を透過的にポリフィルできます。

ブラウザ サポート

ReadableStream と WritableStream

対応ブラウザ

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65。
  • Safari: 10.1。

ソース

TransformStream

対応ブラウザ

  • Chrome: 67。
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1。

ソース

基本コンセプト

さまざまなタイプのストリームについて詳しく説明する前に、いくつかのコアコンセプトについて説明します。

チャンク

チャンクは、ストリームに書き込まれるかストリームから読み取られる単一のデータです。任意のタイプにできます。ストリームには、異なるタイプのチャンクを含めることもできます。ほとんどの場合、チャンクは特定のストリームの最もアトミックなデータ単位ではありません。たとえば、バイト ストリームには、単一バイトではなく、16 KiB の Uint8Array 単位で構成されるチャンクが含まれている場合があります。

読み取り可能なストリーム

読み取り可能なストリームは、読み取り可能なデータソースを表します。つまり、データは読み取り可能なストリームから出力されます。具体的には、読み取り可能なストリームは ReadableStream クラスのインスタンスです。

書き込み可能なストリーム

書き込み可能なストリームは、書き込み可能なデータの宛先を表します。つまり、データは書き込み可能なストリームに書き込まれます。具体的には、書き込み可能なストリームは WritableStream クラスのインスタンスです。

ストリームを変換する

変換ストリームは、書き込み可能なストリーム(書き込み側)と読み取り可能なストリーム(読み取り側)のストリームペアで構成されます。これを現実世界で例えるなら、ある言語から別の言語に即時に翻訳する同時通訳者です。変換ストリームに固有の方法で、書き込み可能な側に書き込むと、読み取り可能な側から読み取ることができる新しいデータが作成されます。具体的には、writable プロパティと readable プロパティを持つオブジェクトは、変換ストリームとして機能できます。ただし、標準の TransformStream クラスを使用すると、適切にエンタングルされたこのようなペアを簡単に作成できます。

パイプチェーン

ストリームは主に、ストリームを相互にパイプするために使用されます。読み取り可能なストリームは、読み取り可能なストリームの pipeTo() メソッドを使用して書き込み可能なストリームに直接パイプできます。または、読み取り可能なストリームの pipeThrough() メソッドを使用して、1 つ以上の変換ストリームを介してパイプすることもできます。この方法でパイプで接続された一連のストリームは、パイプチェーンと呼ばれます。

バックプレッシャー

パイプチェーンが構築されると、チャンクがパイプチェーン内をどのくらいの速さで流すべきかに関するシグナルが伝播されます。チェーン内のいずれかのステップがまだチャンクを受け入れられない場合、パイプチェーンを通じてシグナルが後方に伝播し、最終的に元のソースにチャンクの生成を停止するよう指示されます。このフロー正規化のプロセスをバックプレッシャーと呼びます。

ティーアップ

読み取り可能なストリームは、tee() メソッドを使用してテイク(大文字の「T」の形にちなんで命名)できます。これにより、ストリームがロックされます。つまり、ストリームを直接使用できなくなります。ただし、ブランチと呼ばれる2 つの新しいストリームが作成され、個別に使用できます。ストリーミングは巻き戻しや再開できないため、ティーイングも重要です。これについては後で説明します。

fetch API の呼び出しから取得した読み取り可能なストリームで構成されるパイプチェーンの図。このストリームは、出力が分岐され、最初の読み取り可能なストリームの場合はブラウザに、2 番目の読み取り可能なストリームの場合は Service Worker キャッシュに送信される変換ストリームにパイプされます。
パイプチェーン。

読み取り可能なストリームの仕組み

読み取り可能なストリームは、基盤となるソースから流れてくる ReadableStream オブジェクトによって JavaScript で表されるデータソースです。ReadableStream() コンストラクタは、指定されたハンドラから読み取り可能なストリーム オブジェクトを作成して返します。基盤となるソースには次の 2 種類があります。

  • プッシュ ソースは、アクセスすると常にデータをプッシュします。ストリームへのアクセスを開始、一時停止、キャンセルするのはユーザーの判断です。たとえば、ライブ動画ストリーミング、サーバー送信イベント、WebSocket などです。
  • プルソースでは、接続後に明示的にデータをリクエストする必要があります。たとえば、fetch() 呼び出しまたは XMLHttpRequest 呼び出しを介した HTTP オペレーションがあります。

ストリームデータは、チャンクと呼ばれる小さな部分で順番に読み取られます。ストリームに配置されたチャンクは、キューに追加されたと見なされます。つまり、読み取りの準備ができているキュー内で待機中です。内部キューは、まだ読み込まれていないチャンクを記録します。

キュー戦略は、内部キューの状態に基づいてストリームがバックプレッシャーを通知する方法を決定するオブジェクトです。キュー戦略は、各チャンクにサイズを割り当て、キュー内のすべてのチャンクの合計サイズを、ハイウォーターマークと呼ばれる指定された数と比較します。

ストリーム内のチャンクは、リーダーによって読み取られます。このリーダーはデータを 1 チャンクずつ取得するため、任意のオペレーションを実行できます。リーダーとそれに付随する他の処理コードは、コンシューマと呼ばれます。

このコンテキストの次のコンストラクトはコントローラと呼ばれます。読み取り可能なストリームには、名前が示すようにストリームを制御できるコントローラが関連付けられています。

一度にストリームを読み取ることができるリーダーは 1 つだけです。リーダーが作成されてストリームの読み取りを開始すると(つまり、アクティブなリーダーになると)、そのストリームにロックされます。別のリーダーにストリームの読み取りを譲渡する場合は、通常、他の操作を行う前に最初のリーダーを解放する必要があります(ただし、ストリームをtee することは可能です)。

読み取り可能なストリームを作成する

読み取り可能なストリームを作成するには、コンストラクタ ReadableStream() を呼び出します。コンストラクタにはオプションの引数 underlyingSource があります。これは、作成されたストリーム インスタンスの動作を定義するメソッドとプロパティを持つオブジェクトを表します。

underlyingSource

次のオプションのデベロッパー定義メソッドを使用できます。

  • start(controller): オブジェクトが作成されるとすぐに呼び出されます。このメソッドは、ストリームソースにアクセスし、ストリーム機能の設定に必要なその他の処理を行うことができます。このプロセスを非同期で行う場合は、メソッドが Promise を返して成功または失敗を通知できます。このメソッドに渡される controller パラメータは ReadableStreamDefaultController です。
  • pull(controller): チャンクの取得が進むにつれてストリームを制御するために使用できます。ストリームの内部チャンク キューがいっぱいにならない限り、キューがハイウォーターマークに達するまで繰り返し呼び出されます。pull() の呼び出しの結果がプロミスの場合、そのプロミスが完了するまで pull() は再び呼び出されません。Promise が拒否されると、ストリームはエラーになります。
  • cancel(reason): ストリーム コンシューマがストリームをキャンセルしたときに呼び出されます。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController は次のメソッドをサポートしています。

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() コンストラクタの 2 つ目の引数(こちらも省略可)は queuingStrategy です。ストリームのキューイング戦略を必要に応じて定義するオブジェクトです。このオブジェクトは次の 2 つのパラメータを取ります。

  • highWaterMark: このキュー戦略を使用するストリームのハイウォーターマークを示す正の整数。
  • size(chunk): 指定されたチャンク値の有限の正のサイズを計算して返す関数。結果はバックプレッシャーの決定に使用され、適切な ReadableStreamDefaultController.desiredSize プロパティを介して表示されます。また、基盤となるソースの pull() メソッドが呼び出されるタイミングも制御します。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader() メソッドと read() メソッド

読み取り可能なストリームから読み取るには、読み取り元(ReadableStreamDefaultReader)が必要です。ReadableStream インターフェースの getReader() メソッドは、リーダーを作成し、ストリームをロックします。ストリームがロックされている間は、このリーダーが解放されるまで他のリーダーを取得できません。

ReadableStreamDefaultReader インターフェースの read() メソッドは、ストリームの内部キュー内の次のチャンクにアクセスするためのプロミスを返します。ストリームの状態に応じて、結果を満たすか拒否します。考えられるケースは次のとおりです。

  • チャンクが利用可能な場合、Promise は
    { value: chunk, done: false } 形式のオブジェクトで満たされます。
  • ストリームが閉じられると、Promise は
    { value: undefined, done: true } 形式のオブジェクトで解決されます。
  • ストリームでエラーが発生すると、Promise は関連するエラーで拒否されます。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked プロパティ

読み取り可能なストリームがロックされているかどうかは、ReadableStream.locked プロパティにアクセスして確認できます。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

読み取り可能なストリームのコードサンプル

次のコードサンプルは、すべての手順を示しています。まず、underlyingSource 引数(TimestampSource クラス)で start() メソッドを定義する ReadableStream を作成します。このメソッドは、10 秒間、1 秒ごとにタイムスタンプを enqueue() するようにストリームの controller に指示します。最後に、ストリームを close() するようにコントローラに指示します。このストリームを使用するには、getReader() メソッドでリーダーを作成し、ストリームが done になるまで read() を呼び出します。

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

非同期反復処理

ストリームが done かどうかを read() ループの反復処理ごとに確認するのは、最も便利な API とは言えません。幸い、この問題を解決するより優れた方法がまもなく登場します。それが非同期反復処理です。

for await (const chunk of stream) {
  console.log(chunk);
}

非同期イテレーションを使用するための回避策として、ポリフィルを使用して動作を実装します。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

読み取り可能なストリームのティーイン

ReadableStream インターフェースの tee() メソッドは、現在の読み取り可能なストリームを分岐し、結果として得られる 2 つのブランチを含む 2 要素の配列を新しい ReadableStream インスタンスとして返します。これにより、2 つのリーダーがストリームを同時に読み取ることができます。たとえば、サーバーからレスポンスを取得してブラウザにストリーミングするだけでなく、サービス ワーカー キャッシュにもストリーミングする場合は、サービス ワーカーでこれを行うことができます。レスポンスの本文は複数回使用できないため、2 つのコピーが必要です。ストリームをキャンセルするには、生成された両方のブランチをキャンセルする必要があります。ストリームをティーすると、通常はロックが適用され、他の読者がロックできなくなります。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

読み取り可能なバイト ストリーム

バイトを表すストリームの場合、特にコピーを最小限に抑えることで、バイトを効率的に処理するために、読み取り可能なストリームの拡張バージョンが用意されています。バイト ストリームを使用すると、BYOB リーダーを取得できます。デフォルトの実装では、WebSocket の場合は文字列や配列バッファなど、さまざまな出力を提供できますが、バイト ストリームではバイト出力が保証されます。また、BYOB リーダーは安定性にも優れています。これは、バッファが切断されると、同じバッファに 2 回書き込まれることが保証され、競合状態を回避できるためです。BYOB リーダーはバッファを再利用できるため、ブラウザでガベージ コレクションを実行する回数を減らすことができます。

読み取り可能なバイト ストリームを作成する

読み取り可能なバイト ストリームを作成するには、ReadableStream() コンストラクタに追加の type パラメータを渡します。

new ReadableStream({ type: 'bytes' });

underlyingSource

読み取り可能なバイト ストリームの基になるソースには、操作する ReadableByteStreamController が渡されます。ReadableByteStreamController.enqueue() メソッドは、値が ArrayBufferViewchunk 引数を取ります。プロパティ ReadableByteStreamController.byobRequest は、現在の BYOB プルリクエストを返します。存在しない場合は null を返します。最後に、ReadableByteStreamController.desiredSize プロパティは、制御対象のストリームの内部キューを埋めるために必要なサイズを返します。

queuingStrategy

ReadableStream() コンストラクタの 2 つ目の引数(こちらも省略可)は queuingStrategy です。ストリームのキューイング戦略を必要に応じて定義するオブジェクトです。このオブジェクトは 1 つのパラメータを受け取ります。

  • highWaterMark: このキューイング ストラテジーを使用するストリームのハイウォーターマークを示す正のバイト数。これは、適切な ReadableByteStreamController.desiredSize プロパティを介して表示されるバックプレッシャーを決定するために使用されます。また、基盤となるソースの pull() メソッドが呼び出されるタイミングも制御します。

getReader() メソッドと read() メソッド

次に、mode パラメータを適切に設定して ReadableStreamBYOBReader にアクセスします。ReadableStream.getReader({ mode: "byob" })これにより、コピーを回避するためにバッファ割り当てをより正確に制御できます。バイト ストリームから読み取るには、ReadableStreamBYOBReader.read(view) を呼び出す必要があります。ここで、viewArrayBufferView です。

読み取り可能なバイト ストリームのコードサンプル

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

次の関数は、ランダムに生成された配列を効率的にゼロコピーで読み取ることができる、読み取り可能なバイト ストリームを返します。事前定義された 1,024 のチャンクサイズを使用する代わりに、デベロッパーが指定したバッファを埋めようとします。これにより、完全な制御が可能になります。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

書き込み可能なストリームの仕組み

書き込み可能なストリームは、データを書き込むことができる宛先です。JavaScript では WritableStream オブジェクトで表されます。これは、基盤となるシンク(未加工データが書き込まれる下位レベルの I/O シンク)の上に位置する抽象化として機能します。

データは、書き込みツールを介して、一度に 1 つのチャンクずつストリームに書き込まれます。チャンクは、リーダー内のチャンクと同様に、さまざまな形式にすることができます。書き込み可能なチャンクを生成するために、任意のコードを使用できます。書き込みツールと関連コードはプロデューサーと呼ばれます。

書き込み元が作成され、ストリームへの書き込みを開始すると(アクティブな書き込み元)、そのストリームにロックされていると見なされます。書き込み可能なストリームに一度に書き込みできるのは 1 つの書き込み元のみです。別の書き込み元がストリームへの書き込みを開始するには、通常、ストリームを解放してから、別の書き込み元をストリームに接続する必要があります。

内部キューは、ストリームに書き込まれたが、基盤となるシンクによってまだ処理されていないチャンクを保持します。

キュー戦略は、内部キューの状態に基づいてストリームがバックプレッシャーを通知する方法を決定するオブジェクトです。キュー戦略は、各チャンクにサイズを割り当て、キュー内のすべてのチャンクの合計サイズを、ハイウォーターマークと呼ばれる指定された数と比較します。

最後のコンストラクトはコントローラと呼ばれます。書き込み可能なストリームには、ストリームを制御(中止など)できるコントローラが関連付けられています。

書き込み可能なストリームを作成する

Streams API の WritableStream インターフェースは、シンクと呼ばれる宛先にストリーミング データを書き込むための標準の抽象化を提供します。このオブジェクトには、バックプレッシャーとキューイングが組み込まれています。書き込み可能なストリームを作成するには、コンストラクタ WritableStream() を呼び出します。オプションの underlyingSink パラメータがあります。これは、作成されたストリーム インスタンスの動作を定義するメソッドとプロパティを持つオブジェクトを表します。

underlyingSink

underlyingSink には、デベロッパーが定義した次のオプション メソッドを含めることができます。一部のメソッドに渡される controller パラメータは WritableStreamDefaultController です。

  • start(controller): このメソッドは、オブジェクトが作成されるとすぐに呼び出されます。このメソッドの内容は、基盤となるシンクへのアクセスを取得することを目的とする必要があります。このプロセスを非同期で行う場合は、成功または失敗を通知する Promise を返すことができます。
  • write(chunk, controller): このメソッドは、新しいデータのチャンク(chunk パラメータで指定)が基盤となるシンクに書き込まれる準備ができたときに呼び出されます。書き込みオペレーションの成功または失敗を通知する Promise を返すことができます。このメソッドは、前の書き込みが成功した後にのみ呼び出され、ストリームが閉じられたり中断されたりした後に呼び出されることはありません。
  • close(controller): アプリがストリームへのチャンクの書き込みを完了したことを通知すると、このメソッドが呼び出されます。コンテンツは、基盤となるシンクへの書き込みを確定させ、そのアクセス権を解放するために必要なことをすべて行う必要があります。このプロセスが非同期の場合、成功または失敗を通知する Promise を返すことができます。このメソッドは、キューに登録されたすべての書き込みが成功した後にのみ呼び出されます。
  • abort(reason): アプリがストリームを突然閉じてエラー状態にすることを通知すると、このメソッドが呼び出されます。close() と同様に、保持されているリソースをクリーンアップできますが、書き込みがキューに登録されている場合でも abort() が呼び出されます。これらのチャンクは破棄されます。このプロセスが非同期の場合、成功または失敗を通知する Promise を返すことができます。reason パラメータには、ストリームが中止された理由を説明する DOMString が含まれます。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API の WritableStreamDefaultController インターフェースは、設定時、書き込みのためにさらにチャンクが送信されたとき、または書き込みの終了時に WritableStream の状態を制御できるコントローラを表します。WritableStream を作成すると、基盤となるシンクに、操作する対応する WritableStreamDefaultController インスタンスが渡されます。WritableStreamDefaultController には WritableStreamDefaultController.error() というメソッドが 1 つだけあります。このメソッドを使用すると、関連するストリームとの今後のやり取りでエラーが発生します。WritableStreamDefaultController は、AbortSignal のインスタンスを返す signal プロパティもサポートしています。これにより、必要に応じて WritableStream オペレーションを停止できます。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() コンストラクタの 2 つ目の引数(こちらも省略可)は queuingStrategy です。ストリームのキューイング戦略を必要に応じて定義するオブジェクトです。このオブジェクトは次の 2 つのパラメータを取ります。

  • highWaterMark: このキュー戦略を使用するストリームのハイウォーターマークを示す正の整数。
  • size(chunk): 指定されたチャンク値の有限の正のサイズを計算して返す関数。結果はバックプレッシャーの決定に使用され、適切な WritableStreamDefaultWriter.desiredSize プロパティを介して表示されます。

getWriter() メソッドと write() メソッド

書き込み可能なストリームに書き込むには、WritableStreamDefaultWriter である書き込み元が必要です。WritableStream インターフェースの getWriter() メソッドは、WritableStreamDefaultWriter の新しいインスタンスを返して、そのインスタンスにストリームをロックします。ストリームがロックされている間、現在の書き込み元が解放されるまで、他の書き込み元を取得することはできません。

WritableStreamDefaultWriter インターフェースの write() メソッドは、渡されたデータのチャンクを WritableStream とその基盤となるシンクに書き込み、書き込みオペレーションの成功または失敗を示す Promise を返します。「成功」の意味は基盤となるシンクによって異なります。チャンクが受け入れられたことを示す場合もありますが、最終的な宛先に安全に保存されたことを示すとは限りません。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked プロパティ

書き込み可能なストリームがロックされているかどうかは、WritableStream.locked プロパティにアクセスすることで確認できます。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

書き込み可能なストリームのコードサンプル

次のコードサンプルは、すべての手順を示しています。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

読み取り可能なストリームを書き込み可能なストリームにパイプする

読み取り可能なストリームは、読み取り可能なストリームの pipeTo() メソッドを介して書き込み可能なストリームにパイプできます。ReadableStream.pipeTo() は、現在の ReadableStream を指定された WritableStream にパイプし、パイプ処理が正常に完了すると解決される Promise を返します。エラーが発生した場合は拒否されます。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

変換ストリームの作成

Streams API の TransformStream インターフェースは、変換可能なデータのセットを表します。変換ストリームを作成するには、コンストラクタ TransformStream() を呼び出します。このコンストラクタは、指定されたハンドラから変換ストリーム オブジェクトを作成して返します。TransformStream() コンストラクタは、最初の引数として transformer を表す JavaScript オブジェクトを受け取ります(省略可)。このようなオブジェクトには、次のいずれかの方法を含めることができます。

transformer

  • start(controller): このメソッドは、オブジェクトが作成されるとすぐに呼び出されます。通常、これは controller.enqueue() を使用してプレフィックス チャンクをキューに追加するために使用されます。これらのチャンクは読み取り側から読み取られますが、書き込み側への書き込みには依存しません。この初期プロセスが非同期の場合(接頭辞チャンクの取得に時間がかかるためなど)、関数は成功または失敗を通知するプロミスを返すことができます。拒否されたプロミスはストリームにエラーを返します。スローされた例外は、TransformStream() コンストラクタによって再スローされます。
  • transform(chunk, controller): このメソッドは、書き込み可能な側に最初に書き込まれた新しいチャンクの変換が準備できたときに呼び出されます。ストリームの実装では、この関数が呼び出されるのは、前の変換が成功した後のみであり、start() の完了前や flush() の呼び出し後には呼び出されません。この関数は、変換ストリームの実際の変換処理を行います。controller.enqueue() を使用して結果をキューに追加できます。これにより、書き込み側に書き込まれた 1 つのチャンクが、controller.enqueue() が呼び出される回数に応じて、読み取り側でゼロまたは複数のチャンクになることができます。変換プロセスが非同期の場合、この関数は変換の成功または失敗を通知する Promise を返すことができます。拒否された Promise は、変換ストリームの読み取り側と書き込み側の両方でエラーになります。transform() メソッドが指定されていない場合は、ID 変換が使用されます。この変換では、書き込み側から読み取り側に変更されていないチャンクがキューに追加されます。
  • flush(controller): このメソッドは、書き込み可能な側に書き込まれたすべてのチャンクが transform() を正常に通過して変換され、書き込み可能な側が閉じられようとしているときに呼び出されます。通常、これは、読み取り側が閉じられてしまう前に、接尾辞チャンクを読み取り側にエンキューに追加するために使用されます。フラッシュ プロセスが非同期の場合、関数は成功または失敗を通知する Promise を返すことができます。結果は stream.writable.write() の呼び出し元に通知されます。また、拒否された Promise は、ストリームの読み取り側と書き込み側の両方でエラーになります。例外をスローすることは、拒否された Promise を返すことと同じように扱われます。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategy キューイング戦略と readableStrategy キューイング戦略

TransformStream() コンストラクタの 2 番目と 3 番目のオプション パラメータは、オプションの writableStrategy キュー戦略と readableStrategy キュー戦略です。これらは、読み取り可能ストリーム セクションと書き込み可能ストリーム セクションで説明されているように定義されます。

変換ストリームのコードサンプル

次のコードサンプルは、シンプルな変換ストリームの動作を示しています。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

読み取り可能なストリームを変換ストリームにパイプする

ReadableStream インターフェースの pipeThrough() メソッドは、変換ストリームまたは他の書き込み/読み取りペアで現在のストリームをパイプする連結可能な方法を提供します。通常、ストリームをパイプすると、パイプ中はロックされ、他の読み取り側がロックできなくなります。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

次のコードサンプル(少し不自然ですが)は、返されたレスポンス プロミスをストリームとして消費し、チャンクごとに大文字に変換して、すべてのテキストを大文字に変換する fetch() の「大声」バージョンを実装する方法を示しています。この方法の利点は、ドキュメント全体がダウンロードされるのを待つ必要がないことです。これは、サイズの大きいファイルを扱う場合に大きな違いを生む可能性があります。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

デモ

次のデモは、読み取り可能ストリーム、書き込み可能ストリーム、変換ストリームの動作を示しています。また、pipeThrough()pipeTo() のパイプチェーンの例も含まれており、tee() も示されています。必要に応じて、デモを独自のウィンドウで実行したり、ソースコードを表示したりできます。

ブラウザで利用できる便利なストリーム

ブラウザには、便利なストリームが組み込まれています。ReadableStream は、Blob から簡単に作成できます。Blob インターフェースの stream() メソッドは ReadableStream を返します。この ReadableStream は、読み取り時に blob に含まれるデータを返します。また、File オブジェクトは特定の種類の Blob であり、blob が使用できるすべてのコンテキストで使用できることも覚えておいてください。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() のストリーミング バリアントは、それぞれ TextDecoderStreamTextEncoderStream と呼ばれます。

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

ファイルの圧縮と解凍は、それぞれ CompressionStream 変換ストリームと DecompressionStream 変換ストリームを使用して簡単に行えます。次のコードサンプルは、Streams 仕様をダウンロードし、ブラウザ内で圧縮(gzip)して、圧縮ファイルをディスクに直接書き込む方法を示しています。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream と試験運用版の fetch() リクエスト ストリームは、実環境で書き込み可能なストリームの例です。

Serial API は、読み取り可能ストリームと書き込み可能ストリームの両方を多用します。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最後に、WebSocketStream API はストリームを WebSocket API と統合します。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

関連リソース

謝辞

この記事は、Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe MedleyAdam Rice によるレビューを経て公開されました。Jake Archibald のブログ投稿は、ストリームの理解に役立ちました。コードサンプルの一部は、GitHub ユーザー @bellbind の調査に基づいています。文章の一部は、MDN ウェブドキュメントのストリームをベースにしています。Streams Standard作成者は、この仕様の作成に多大な貢献をしました。ヘッダー画像は UnsplashRyan Lara によるものです。