Streams API で読み取り可能ストリーム、書き込み可能ストリーム、変換ストリームを使用する方法について説明します。
Streams API を使用すると、ネットワーク経由で受信したデータやローカルで作成したデータのストリームにプログラムでアクセスし、JavaScript で処理できます。ストリーミングでは、受信、送信、変換するリソースを小さなチャンクに分割し、これらのチャンクをビット単位で処理します。ストリーミングは、ウェブページに表示する HTML や動画などのアセットを受信するときにブラウザが実行する処理ですが、2015 年に fetch
でストリームが導入されるまで、JavaScript でこの機能を利用することはできませんでした。
以前は、何らかのリソース(動画やテキスト ファイルなど)を処理する場合、ファイル全体をダウンロードし、適切な形式に逆シリアル化されるのを待ってから処理する必要がありました。JavaScript でストリームを利用できるようになると、すべてが変わります。バッファ、文字列、BLOB を生成することなく、クライアントで利用可能になったらすぐに JavaScript で生データを段階的に処理できるようになりました。これにより、さまざまなユースケースが可能になります。以下にその一部を示します。
- 動画エフェクト: 読み取り可能な動画ストリームを、エフェクトをリアルタイムで適用する変換ストリームにパイプします。
- データの(非)圧縮: 選択的に(非)圧縮する変換ストリームを介してファイル ストリームをパイプ処理します。
- 画像デコード: HTTP レスポンス ストリームを、バイトをビットマップ データにデコードする変換ストリームにパイプし、次にビットマップを PNG に変換する別の変換ストリームにパイプします。Service Worker の
fetch
ハンドラ内でインストールすると、AVIF などの新しい画像形式を透過的にポリフィルできます。
ブラウザ サポート
ReadableStream と WritableStream
TransformStream
基本コンセプト
さまざまなタイプのストリームについて詳しく説明する前に、いくつかの基本的なコンセプトを紹介します。
チャンク
チャンクは、ストリームに書き込まれるか、ストリームから読み取られる単一のデータです。任意の型にできます。ストリームには異なる型のチャンクを含めることもできます。ほとんどの場合、チャンクは特定のストリームの最もアトミックなデータ単位ではありません。たとえば、バイトストリームには、単一のバイトではなく、16 KiB の Uint8Array
単位で構成されるチャンクが含まれることがあります。
読み取り可能なストリーム
読み取り可能なストリームは、読み取り可能なデータソースを表します。つまり、データは読み取り可能なストリームから出てきます。具体的には、読み取り可能なストリームは ReadableStream
クラスのインスタンスです。
書き込み可能なストリーム
書き込み可能なストリームは、書き込み可能なデータの宛先を表します。つまり、データは書き込み可能なストリームに入ります。具体的には、書き込み可能なストリームは WritableStream
クラスのインスタンスです。
ストリーミングを変換する
変換ストリームは、書き込み可能なストリーム(書き込み側)と読み取り可能なストリーム(読み取り側)のストリームのペアで構成されます。この例えとして、ある言語から別の言語に即座に翻訳する同時通訳者を挙げることができます。変換ストリームに固有の方法で、書き込み可能な側に書き込むと、読み取り可能な側から読み取れる新しいデータが作成されます。具体的には、writable
プロパティと readable
プロパティを持つオブジェクトは、変換ストリームとして機能します。ただし、標準の TransformStream
クラスを使用すると、適切にエンタングルされたペアを簡単に作成できます。
パイプチェーン
ストリームは、主に互いにパイプして使用されます。読み取り可能なストリームは、読み取り可能なストリームの pipeTo()
メソッドを使用して書き込み可能なストリームに直接パイプできます。また、読み取り可能なストリームの pipeThrough()
メソッドを使用して、1 つ以上の変換ストリームを介してパイプすることもできます。このようにパイプで接続された一連のストリームは、パイプ チェーンと呼ばれます。
バックプレッシャー
パイプチェーンが構築されると、チャンクがパイプチェーンを流れる速度に関するシグナルが伝播されます。チェーン内のいずれかのステップでまだチャンクを受け入れられない場合、パイプチェーンを逆方向にシグナルが伝播し、最終的に元のソースにチャンクの生成を停止するよう指示が出されます。この正規化フローのプロセスはバックプレッシャーと呼ばれます。
ティーイング
読み取り可能なストリームは、tee()
メソッドを使用してティー(大文字の「T」の形にちなんで名付けられました)できます。これにより、ストリームがロックされ、直接使用できなくなります。ただし、ブランチと呼ばれる2 つの新しいストリームが作成され、これらは個別に使用できます。ティーイングは、ストリームを巻き戻したり再開したりできないため、重要です。詳しくは後述します。
読み取り可能なストリームの仕組み
読み取り可能なストリームは、基盤となるソースから流れる ReadableStream
オブジェクトによって JavaScript で表されるデータソースです。ReadableStream()
コンストラクタは、指定されたハンドラから読み取り可能なストリーム オブジェクトを作成して返します。基盤となるソースには次の 2 種類があります。
- プッシュソースは、アクセスしたときに常にデータをプッシュします。ストリームへのアクセスを開始、一時停止、キャンセルするかどうかはユーザー次第です。たとえば、ライブ動画ストリーム、サーバー送信イベント、WebSocket などがあります。
- プルソースでは、接続後に明示的にデータをリクエストする必要があります。たとえば、
fetch()
またはXMLHttpRequest
呼び出しによる HTTP オペレーションなどがあります。
ストリーム データは、チャンクと呼ばれる小さな単位で順次読み取られます。ストリームに配置されたチャンクは、エンキューされたと言われます。つまり、読み取り可能な状態でキューで待機しています。内部キューは、まだ読み取られていないチャンクを追跡します。
キューイング戦略は、内部キューの状態に基づいてストリームがバックプレッシャーをどのようにシグナル通知するかを決定するオブジェクトです。キューイング戦略では、各チャンクにサイズを割り当て、キュー内のすべてのチャンクの合計サイズを、ハイウォーター マークと呼ばれる指定された数値と比較します。
ストリーム内のチャンクは、リーダーによって読み取られます。このリーダーはデータを一度に 1 つのチャンクずつ取得するため、必要なオペレーションを自由に実行できます。リーダーとそれに関連する他の処理コードは、コンシューマーと呼ばれます。
このコンテキストの次のコンストラクトは、コントローラと呼ばれます。読み取り可能な各ストリームには、ストリームを制御できるコントローラが関連付けられています。
一度にストリームを読み取ることができるリーダーは 1 つだけです。リーダーが作成され、ストリームの読み取りを開始すると(つまり、アクティブなリーダーになると)、そのストリームにロックされます。別のリーダーにストリームの読み取りを引き継ぐ場合は、通常、他の処理を行う前に最初のリーダーをリリースする必要があります(ストリームをティーすることはできます)。
読み取り可能なストリームの作成
読み取り可能なストリームは、コンストラクタ ReadableStream()
を呼び出して作成します。コンストラクタには、構築されたストリーム インスタンスの動作を定義するメソッドとプロパティを持つオブジェクトを表すオプションの引数 underlyingSource
があります。
underlyingSource
これには、次のオプションのデベロッパー定義メソッドを使用できます。
start(controller)
: オブジェクトが構築されるとすぐに呼び出されます。このメソッドはストリーム ソースにアクセスし、ストリーム機能を設定するために必要な処理を行います。このプロセスを非同期で行う場合、メソッドは成功または失敗を通知する Promise を返すことができます。このメソッドに渡されるcontroller
パラメータはReadableStreamDefaultController
です。pull(controller)
: チャンクが取得されるたびにストリームを制御するために使用できます。ストリームの内部チャンクキューが満杯になるまで、キューがハイウォーター マークに達するまで繰り返し呼び出されます。pull()
の呼び出しの結果が Promise の場合、その Promise が完了するまでpull()
は再度呼び出されません。Promise が拒否されると、ストリームはエラー状態になります。cancel(reason)
: ストリーム コンシューマーがストリームをキャンセルしたときに呼び出されます。
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
は次のメソッドをサポートしています。
ReadableStreamDefaultController.close()
は、関連付けられているストリームを閉じます。ReadableStreamDefaultController.enqueue()
は、関連付けられたストリームで指定されたチャンクをキューに入れます。ReadableStreamDefaultController.error()
は、関連付けられたストリームとの今後のやり取りでエラーが発生します。
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
ReadableStream()
コンストラクタの 2 番目の引数(これも省略可能)は queuingStrategy
です。これは、ストリームのキューイング戦略を必要に応じて定義するオブジェクトです。次の 2 つのパラメータを取ります。
highWaterMark
: このキューイング戦略を使用するストリームの上限を示す非負の数値。size(chunk)
: 指定されたチャンク値の有限の非負のサイズを計算して返す関数。この結果はバックプレッシャーの決定に使用され、適切なReadableStreamDefaultController.desiredSize
プロパティを介して表されます。また、基盤となるソースのpull()
メソッドが呼び出されるタイミングも制御します。
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
getReader()
メソッドと read()
メソッド
読み取り可能なストリームから読み取るには、リーダーが必要です。これは ReadableStreamDefaultReader
になります。ReadableStream
インターフェースの getReader()
メソッドは、リーダーを作成し、ストリームをリーダーにロックします。ストリームがロックされている間は、このストリームが解放されるまで他のリーダーを取得できません。
ReadableStreamDefaultReader
インターフェースの read()
メソッドは、ストリームの内部キューの次のチャンクへのアクセスを提供する Promise を返します。ストリームの状態に応じて、結果を返して完了するか、拒否します。考えられる原因は次のとおりです。
- チャンクが利用可能な場合、Promise は
{ value: chunk, done: false }
形式のオブジェクトで解決されます。 - ストリームが閉じられると、Promise は
{ value: undefined, done: true }
形式のオブジェクトで解決されます。 - ストリームがエラーになった場合、Promise は関連するエラーで拒否されます。
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
locked
プロパティ
読み取り可能なストリームがロックされているかどうかは、ReadableStream.locked
プロパティにアクセスして確認できます。
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
読み取り可能なストリームのコードサンプル
次のコードサンプルは、すべてのアクションの手順を示しています。まず、underlyingSource
引数(TimestampSource
クラス)で start()
メソッドを定義する ReadableStream
を作成します。このメソッドは、ストリームの controller
に 10 秒間 1 秒ごとにタイムスタンプを enqueue()
するよう指示します。最後に、コントローラにストリームの close()
を指示します。このストリームを使用するには、getReader()
メソッドでリーダーを作成し、ストリームが done
になるまで read()
を呼び出します。
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
非同期イテレーション
各 read()
ループの反復処理でストリームが done
かどうかを確認することは、最も便利な API ではない可能性があります。幸いなことに、非同期イテレーションというより良い方法がまもなく登場します。
for await (const chunk of stream) {
console.log(chunk);
}
非同期イテレーションを使用する現在の回避策は、ポリフィルで動作を実装することです。
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
読み取り可能なストリームのティーイング
ReadableStream
インターフェースの tee()
メソッドは、現在の読み取り可能なストリームを複製し、結果として得られた 2 つのブランチを新しい ReadableStream
インスタンスとして含む 2 要素の配列を返します。これにより、2 つのリーダーが同時にストリームを読み取ることができます。たとえば、サーバーからレスポンスを取得してブラウザにストリーミングするだけでなく、Service Worker キャッシュにもストリーミングしたい場合は、Service Worker でこれを行うことができます。レスポンスの本文は複数回使用できないため、この処理を行うには 2 つのコピーが必要です。ストリームをキャンセルするには、結果として得られた両方のブランチをキャンセルする必要があります。ストリームをティーイングすると、通常は期間中ロックされ、他のリーダーがロックできなくなります。
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
読み取り可能なバイト ストリーム
バイトを表すストリームの場合、バイトを効率的に処理するために、特にコピーを最小限に抑えるために、読み取り可能なストリームの拡張バージョンが提供されます。バイト ストリームを使用すると、BYOB(Bring Your Own Buffer)リーダーを取得できます。デフォルトの実装では、WebSocket の場合は文字列や配列バッファなど、さまざまな出力が得られますが、バイトストリームではバイト出力が保証されます。また、BYOB リーダーには安定性のメリットもあります。これは、バッファが切り離されると、同じバッファに 2 回書き込まれないことが保証されるため、競合状態を回避できるためです。BYOB リーダーはバッファを再利用できるため、ブラウザがガベージ コレクションを実行する必要がある回数を減らすことができます。
読み取り可能なバイトストリームの作成
読み取り可能なバイトストリームを作成するには、追加の type
パラメータを ReadableStream()
コンストラクタに渡します。
new ReadableStream({ type: 'bytes' });
underlyingSource
読み取り可能なバイトストリームの基になるソースには、操作する ReadableByteStreamController
が指定されます。ReadableByteStreamController.enqueue()
メソッドは、値が ArrayBufferView
である chunk
引数を取ります。プロパティ ReadableByteStreamController.byobRequest
は、現在の BYOB プルリクエストを返します。存在しない場合は null を返します。最後に、ReadableByteStreamController.desiredSize
プロパティは、制御されたストリームの内部キューを埋めるために必要なサイズを返します。
queuingStrategy
ReadableStream()
コンストラクタの 2 番目の引数(これも省略可能)は queuingStrategy
です。これは、ストリームのキューイング戦略を必要に応じて定義するオブジェクトです。次の 1 つのパラメータを取ります。
highWaterMark
: このキューイング戦略を使用するストリームの上限を示す負でないバイト数。これは、適切なReadableByteStreamController.desiredSize
プロパティを介してバックプレッシャーを判断するために使用されます。また、基盤となるソースのpull()
メソッドが呼び出されるタイミングも制御します。
getReader()
メソッドと read()
メソッド
mode
パラメータを適切に設定することで、ReadableStreamBYOBReader
にアクセスできます。
ReadableStream.getReader({ mode: "byob" })
。これにより、コピーを回避するためにバッファ割り当てをより正確に制御できます。バイト ストリームから読み取るには、ReadableStreamBYOBReader.read(view)
を呼び出す必要があります。ここで、view
は ArrayBufferView
です。
読み取り可能なバイト ストリームのコードサンプル
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
次の関数は、ランダムに生成された配列の効率的なゼロコピー読み取りを可能にする、読み取り可能なバイト ストリームを返します。1,024 のチャンク サイズをあらかじめ使用するのではなく、デベロッパーが指定したバッファを埋めようとするため、完全に制御できます。
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
書き込み可能なストリームの仕組み
書き込み可能ストリームは、データを書き込むことができる宛先です。JavaScript では WritableStream
オブジェクトで表されます。これは、基盤となるシンク(未加工のデータが書き込まれる下位レベルの I/O シンク)の上に位置する抽象化として機能します。
データは、ライターを介して一度に 1 チャンクずつストリームに書き込まれます。チャンクは、リーダーのチャンクと同様に、さまざまな形式をとることができます。任意のコードを使用して、書き込み可能なチャンクを生成できます。ライターと関連するコードはプロデューサーと呼ばれます。
ライターが作成され、ストリームへの書き込みを開始すると(アクティブなライター)、そのストリームにロックされた状態になります。一度に書き込み可能なストリームに書き込むことができるのは 1 つのライターのみです。別のライターにストリームへの書き込みを開始させるには、通常、ストリームをリリースしてから、別のライターをアタッチする必要があります。
内部キューは、ストリームに書き込まれたが、基盤となるシンクによってまだ処理されていないチャンクを追跡します。
キューイング戦略は、内部キューの状態に基づいてストリームがバックプレッシャーをどのようにシグナル通知するかを決定するオブジェクトです。キューイング戦略では、各チャンクにサイズを割り当て、キュー内のすべてのチャンクの合計サイズを、ハイウォーター マークと呼ばれる指定された数値と比較します。
最終的な構成はコントローラと呼ばれます。各書き込み可能ストリームには、ストリームを制御(中止など)できるコントローラが関連付けられています。
書き込み可能なストリームの作成
Streams API の WritableStream
インターフェースは、シンクと呼ばれる宛先にストリーミング データを書き込むための標準的な抽象化を提供します。このオブジェクトには、バックプレッシャーとキューイングが組み込まれています。書き込み可能なストリームを作成するには、コンストラクタ WritableStream()
を呼び出します。オプションの underlyingSink
パラメータがあります。これは、構築されたストリーム インスタンスの動作を定義するメソッドとプロパティを持つオブジェクトを表します。
underlyingSink
underlyingSink
には、次のオプションのデベロッパー定義メソッドを含めることができます。一部のメソッドに渡される controller
パラメータは WritableStreamDefaultController
です。
start(controller)
: このメソッドは、オブジェクトが構築されるとすぐに呼び出されます。このメソッドの内容は、基盤となるシンクへのアクセスを取得することを目的とする必要があります。このプロセスを非同期で行う場合は、成功または失敗を通知する Promise を返すことができます。write(chunk, controller)
: このメソッドは、新しいデータチャンク(chunk
パラメータで指定)が基盤となるシンクに書き込まれる準備が整ったときに呼び出されます。書き込みオペレーションの成功または失敗を通知する Promise を返すことができます。このメソッドは、前の書き込みが成功した後にのみ呼び出され、ストリームが閉じられたり中止されたりした後は呼び出されません。close(controller)
: アプリがストリームへのチャンクの書き込みを終了したことを通知した場合、このメソッドが呼び出されます。コンテンツは、基盤となるシンクへの書き込みを完了し、そのシンクへのアクセス権を解放するために必要な処理を行う必要があります。このプロセスが非同期の場合、成功または失敗を通知する Promise を返すことができます。このメソッドは、キューに登録されたすべての書き込みが成功した後にのみ呼び出されます。abort(reason)
: アプリがストリームを突然閉じてエラー状態にしたいことを通知した場合、このメソッドが呼び出されます。close()
と同様に、保持されているリソースをクリーンアップできますが、書き込みがキューに登録されている場合でもabort()
が呼び出されます。これらのチャンクは破棄されます。このプロセスが非同期の場合、成功または失敗を通知する Promise を返すことができます。reason
パラメータには、ストリームが中止された理由を説明するDOMString
が含まれています。
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Streams API の WritableStreamDefaultController
インターフェースは、設定中、書き込みのためにさらに多くのチャンクが送信されたとき、または書き込みの終了時に WritableStream
の状態を制御できるコントローラを表します。WritableStream
を構築するときに、基盤となるシンクに操作用の対応する WritableStreamDefaultController
インスタンスが渡されます。WritableStreamDefaultController
には WritableStreamDefaultController.error()
というメソッドが 1 つだけあります。このメソッドは、関連付けられたストリームとの今後のやり取りでエラーが発生するようにします。WritableStreamDefaultController
は signal
プロパティもサポートしています。このプロパティは AbortSignal
のインスタンスを返し、必要に応じて WritableStream
オペレーションを停止できます。
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
WritableStream()
コンストラクタの 2 番目の引数(これも省略可能)は queuingStrategy
です。これは、ストリームのキューイング戦略を必要に応じて定義するオブジェクトです。次の 2 つのパラメータを取ります。
highWaterMark
: このキューイング戦略を使用するストリームの上限を示す非負の数値。size(chunk)
: 指定されたチャンク値の有限の非負のサイズを計算して返す関数。この結果はバックプレッシャーの決定に使用され、適切なWritableStreamDefaultWriter.desiredSize
プロパティを介して表されます。
getWriter()
メソッドと write()
メソッド
書き込み可能なストリームに書き込むには、ライター(WritableStreamDefaultWriter
)が必要です。WritableStream
インターフェースの getWriter()
メソッドは、WritableStreamDefaultWriter
の新しいインスタンスを返し、ストリームをそのインスタンスにロックします。ストリームがロックされている間は、現在のライターが解放されるまで、他のライターを取得できません。
WritableStreamDefaultWriter
インターフェースの write()
メソッドは、渡されたデータのチャンクを WritableStream
とその基盤となるシンクに書き込み、書き込みオペレーションの成功または失敗を示す Promise を返します。「成功」の意味は基盤となるシンクによって異なります。チャンクが受け入れられたことを示す場合もありますが、最終的な宛先に安全に保存されたことを必ずしも示すわけではありません。
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
locked
プロパティ
書き込み可能なストリームがロックされているかどうかは、その WritableStream.locked
プロパティにアクセスして確認できます。
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
書き込み可能なストリームのコードサンプル
次のコードサンプルは、すべてのアクションの手順を示しています。
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
読み取り可能なストリームを書き込み可能なストリームにパイプ処理する
読み取り可能なストリームは、読み取り可能なストリームの pipeTo()
メソッドを介して、書き込み可能なストリームにパイプできます。ReadableStream.pipeTo()
は、現在の ReadableStream
を指定された WritableStream
にパイプし、パイプ処理が正常に完了したときにフルフィルされる Promise を返します。エラーが発生した場合は拒否されます。
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
変換ストリームの作成
Streams API の TransformStream
インターフェースは、変換可能なデータのセットを表します。変換ストリームを作成するには、そのコンストラクタ TransformStream()
を呼び出します。これにより、指定されたハンドラから変換ストリーム オブジェクトが作成されて返されます。TransformStream()
コンストラクタは、最初の引数として transformer
を表す JavaScript オブジェクト(省略可)を受け取ります。このようなオブジェクトには、次のいずれかのメソッドを含めることができます。
transformer
start(controller)
: このメソッドは、オブジェクトが構築されるとすぐに呼び出されます。通常、これはcontroller.enqueue()
を使用して接頭辞チャンクをキューに入れるために使用されます。これらのチャンクは読み取り可能な側から読み取られますが、書き込み可能な側への書き込みには依存しません。この初期プロセスが非同期の場合(たとえば、プレフィックス チャンクの取得に手間がかかる場合)、関数は成功または失敗を通知するプロミスを返すことができます。拒否されたプロミスはストリームをエラーにします。スローされた例外はTransformStream()
コンストラクタによって再スローされます。transform(chunk, controller)
: 書き込み可能側に最初に書き込まれた新しいチャンクが変換の準備を整えたときに、このメソッドが呼び出されます。ストリーム実装では、この関数は前の変換が成功した後にのみ呼び出され、start()
が完了する前やflush()
が呼び出された後に呼び出されることはありません。この関数は、変換ストリームの実際の変換作業を行います。controller.enqueue()
を使用して結果をキューに追加できます。これにより、書き込み可能な側に書き込まれた 1 つのチャンクが、controller.enqueue()
が呼び出された回数に応じて、読み取り可能な側に 0 個または複数のチャンクをもたらすことができます。変換のプロセスが非同期の場合、この関数は変換の成功または失敗を通知する Promise を返すことができます。拒否された Promise は、変換ストリームの読み取り可能側と書き込み可能側の両方でエラーを返します。transform()
メソッドが指定されていない場合は、ID 変換が使用されます。これにより、書き込み可能な側から読み取り可能な側へチャンクが変更されずにキューに登録されます。flush(controller)
: このメソッドは、書き込み可能な側に書き込まれたすべてのチャンクがtransform()
を正常に通過して変換され、書き込み可能な側が閉じられようとしているときに呼び出されます。通常、これは、読み取り可能な側が閉じられる前に、接尾辞チャンクを読み取り可能な側にキューに入れるために使用されます。フラッシュ処理が非同期の場合、関数は成功または失敗を通知する Promise を返すことができます。結果はstream.writable.write()
の呼び出し元に通知されます。また、拒否された Promise は、ストリームの読み取り可能側と書き込み可能側の両方でエラーを返します。例外をスローすることは、拒否された Promise を返すことと同じように扱われます。
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
writableStrategy
キューイング戦略と readableStrategy
キューイング戦略
TransformStream()
コンストラクタの 2 番目と 3 番目のオプション パラメータは、オプションの writableStrategy
と readableStrategy
のキューイング戦略です。これらは、それぞれ読み取り可能ストリームと書き込み可能ストリームのセクションで説明されているように定義されます。
変換ストリームのコードサンプル
次のコードサンプルは、シンプルな変換ストリームの動作を示しています。
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
読み取り可能なストリームを変換ストリームにパイプする
ReadableStream
インターフェースの pipeThrough()
メソッドは、現在のストリームを変換ストリームまたはその他の書き込み可能/読み取り可能なペアにパイプ処理するためのチェーン可能な方法を提供します。ストリームをパイプすると、通常はパイプの期間中ストリームがロックされ、他のリーダーがストリームをロックできなくなります。
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
次のコードサンプル(少し不自然ですが)は、返されたレスポンス プロミスをストリームとして使用し、チャンクごとに大文字にすることで、すべてのテキストを大文字にする fetch()
の「シャウト」バージョンを実装する方法を示しています。このアプローチの利点は、ドキュメント全体がダウンロードされるのを待つ必要がないことです。これは、大きなファイルを扱う場合に大きな違いが生じます。
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
デモ
次のデモは、読み取り可能ストリーム、書き込み可能ストリーム、変換ストリームの動作を示しています。また、pipeThrough()
と pipeTo()
のパイプチェーンの例も含まれており、tee()
も示されています。必要に応じて、デモを独自のウィンドウで実行したり、ソースコードを表示したりできます。
ブラウザで利用できる便利なストリーム
ブラウザには、便利なストリームが多数組み込まれています。Blob から ReadableStream
を簡単に作成できます。Blob
インターフェースの stream() メソッドは、読み取り時に BLOB 内のデータを返す ReadableStream
を返します。また、File
オブジェクトは Blob
の一種であり、BLOB が使用できるコンテキストで使用できることも思い出してください。
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
TextDecoder.decode()
と TextEncoder.encode()
のストリーミング バリアントは、それぞれ TextDecoderStream
と TextEncoderStream
と呼ばれます。
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
CompressionStream
変換ストリームと DecompressionStream
変換ストリームを使用すると、ファイルの圧縮と解凍を簡単に行うことができます。次のコードサンプルは、Streams 仕様をダウンロードし、ブラウザで直接圧縮(gzip)して、圧縮ファイルをディスクに直接書き込む方法を示しています。
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
File System Access API の FileSystemWritableFileStream
と試験運用版の fetch()
リクエスト ストリームは、書き込み可能なストリームの例です。
シリアル API は、読み取り可能ストリームと書き込み可能ストリームの両方を多用します。
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
最後に、WebSocketStream
API はストリームを WebSocket API と統合します。
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
関連リソース
謝辞
この記事は、Jake Archibald、François Beaufort、Sam Dutton、Mattias Buelens、Surma、Joe Medley、Adam Rice によってレビューされました。Jake Archibald 氏のブログ投稿は、ストリームを理解するうえで非常に役立ちました。コードサンプルのいくつかは、GitHub ユーザー @bellbind 氏の調査に触発されたものであり、散文の一部は MDN Web Docs の Streams に大きく依存しています。Streams Standard の作成者は、この仕様の作成に多大な貢献をしました。ヒーロー画像は Ryan Lara 氏が Unsplash で提供しています。