訊息串:這份指南

瞭解如何使用 Streams API 使用可讀、可寫和轉換資料流。

Streams API 可讓您透過程式輔助存取透過網路接收或以任何方式在本機建立的資料串流,並使用 JavaScript 加以處理。串流處理涉及將您要接收、傳送或轉換的資源分解為小型區塊,然後逐位處理這些區塊。雖然瀏覽器在接收 HTML 或影片等素材資源時,會在網頁上顯示串流內容,但在 2015 年推出 fetch 串流之前,JavaScript 並未提供這項功能。

先前,如果您想處理某種資源 (例如影片或文字檔等),就必須下載整個檔案,等待其以適當格式進行反序列化,然後再進行處理。有了可供 JavaScript 使用的串流,這一切都會改變。您現在可以使用 JavaScript 逐步處理原始資料,只要在用戶端上可用,即可處理,不必產生緩衝區、字串或 blob。這項功能可用於多種用途,以下列舉其中幾項:

  • 影片特效:透過即時套用特效的轉換串流,傳送可讀的影片串流。
  • 資料 (解) 壓縮:透過轉換串流將檔案串流傳送至系統,以便系統選擇性地對檔案進行 (解) 壓縮。
  • 圖片解碼:將 HTTP 回應串流透過轉換串流傳送,該串流會將位元組解碼為位塊資料,然後透過另一個轉換串流將位塊轉換為 PNG。如果在服務工作者的 fetch 處理常式中安裝,您就能以透明方式為 AVIF 等新圖片格式提供 polyfill。

瀏覽器支援

ReadableStream 和 WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

核心概念

在深入探討各種串流類型之前,讓我先介紹一些核心概念。

區塊

區塊是寫入或讀取串流的單一資料。可以是任何類型,甚至可以包含不同類型的區塊。在大多數情況下,資料流的資料單位並非最細微的單元。舉例來說,位元組串流可能包含由 16 KiB Uint8Array 單位組成的區塊,而不是單一位元組。

可讀取的串流

可讀取的串流代表可讀取的資料來源。換句話說,資料會從可讀取的串流中傳出。具體來說,可讀取的串流是 ReadableStream 類別的例項。

可寫入的串流

可寫入的串流代表可寫入資料的目的地。換句話說,資料會進入可寫入式串流。具體來說,可寫入的串流是 WritableStream 類別的例項。

轉換串流

轉換串流由一組串流組成:可寫串流 (稱為可寫端) 和可讀串流 (稱為可讀端)。這就像是同聲傳譯員,他們會即時將某種語言翻譯成另一種語言。在轉換串流的特定方式中,寫入可寫入端會導致新資料可供讀取端讀取。具體來說,任何含有 writable 屬性和 readable 屬性的物件都可以做為轉換串流。不過,標準 TransformStream 類別可讓您更輕鬆地建立正確交錯的這類組合。

管鏈

串流主要用於管道彼此之間。可讀取的串流可以使用可讀取串流的 pipeTo() 方法,直接傳送至可寫入的串流,也可以先使用可讀取串流的 pipeThrough() 方法,透過一或多個轉換串流傳送。以這種方式匯集在一起的一系列串流稱為管道鏈。

回壓

管道鏈結建構完成後,就會傳播信號,指出應以多快的速度將區塊流經管道。如果鏈條中的任何步驟都無法接受區塊,就會透過管道鏈條向後傳播訊號,直到最終原始來源停止快速產生區塊為止。這項流量規格化程序稱為回壓。

開球

可讀取的資料流可使用其 tee() 方法進行分流 (以大寫字母「T」的形狀命名)。這會鎖定串流,也就是讓串流無法再直接使用;不過,這會建立兩個新的串流 (稱為分支),可獨立使用。分流也是重要的步驟,因為串流無法倒帶或重新開始,我們會在稍後進一步說明。

管道鏈結圖表,其中包含可讀取串流,該串流來自對 fetch API 的呼叫,然後透過轉換串流管道傳送,該串流的輸出內容會分流,並傳送至瀏覽器 (第一個結果可讀取串流) 和服務工作者快取 (第二個結果可讀取串流)。
管道鏈結。

可讀取串流的機制

可讀取的串流是透過 ReadableStream 物件在 JavaScript 中表示的資料來源,該物件會從底層來源流入。ReadableStream() 建構函式會根據指定的處理常式建立並傳回可讀取的串流物件。基礎來源分為兩種類型:

  • 推送來源會在您存取時持續推送資料,您可以自行決定是否要開始、暫停或取消存取串流。例如即時影像串流、伺服器傳送的事件或 WebSocket。
  • 拉取來源:連線後,您必須明確要求資料。例如透過 fetch()XMLHttpRequest 呼叫的 HTTP 作業。

串流資料會以稱為「區塊」的小型片段依序讀取。放置在串流中的區塊稱為「已排入佇列」。這表示這些訊息正在佇列中等待讀取。內部佇列會追蹤尚未讀取的區塊。

佇列策略是一種物件,可決定串流應如何根據內部佇列的狀態,發出回壓信號。佇列策略會為每個區塊指派大小,並將佇列中所有區塊的總大小與指定數字 (稱為「高水位標記」) 進行比較。

串流中的區塊會由讀取器讀取。這個讀取器會一次擷取一小部分資料,讓您執行所需的任何作業。讀取器加上其他相關的處理程式碼,稱為「消費者」

這個情境中的下一個結構體稱為「控制器」。每個可讀取的串流都會與控制器建立關聯,如名稱所示,可讓您控制串流。

一次只能有一個讀取器讀取串流;當讀取器建立並開始讀取串流 (也就是成為「有效讀取器」) 時,就會與該串流「上鎖」。如果您希望其他讀取器接管讀取串流,通常需要先釋放第一個讀取器,才能執行其他操作 (但您可以分流串流)。

建立可讀取的串流

您可以呼叫建構函式 ReadableStream() 來建立可讀取的串流。建構函式有一個選用引數 underlyingSource,代表物件,其中包含定義建構的串流例項行為方式的方法和屬性。

underlyingSource

這可以使用下列選用的開發人員定義方法:

  • start(controller):在物件建構時立即呼叫。該方法可存取串流來源,並執行設定串流功能所需的其他任何操作。如果要以非同步方式執行這個程序,方法可以傳回承諾,用來表示成功或失敗。傳遞至此方法的 controller 參數是 ReadableStreamDefaultController
  • pull(controller):可用於在擷取更多區塊時控制串流。只要串流的內部區塊佇列未滿,就會重複呼叫,直到佇列達到高水位為止。如果呼叫 pull() 的結果是承諾,則在承諾完成之前,系統不會再次呼叫 pull()。如果承諾遭到拒絕,串流就會發生錯誤。
  • cancel(reason):當串流使用者取消串流時呼叫。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController 支援下列方法:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() 建構函式的第二個 (同樣為選用) 引數為 queuingStrategy。這是一個物件,可視需要為串流定義佇列策略,並接受兩個參數:

  • highWaterMark:非負數,表示使用此佇列策略的串流高水位標記。
  • size(chunk):這個函式會計算並傳回指定區塊值的有限非負大小。結果會用來判斷回壓,並透過適當的 ReadableStreamDefaultController.desiredSize 屬性顯示。它也會決定何時呼叫基礎來源的 pull() 方法。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader()read() 方法

如要從可讀取的串流讀取資料,您需要使用讀取器,也就是 ReadableStreamDefaultReaderReadableStream 介面的 getReader() 方法會建立讀取器,並將串流鎖定在該讀取器上。當串流處於鎖定狀態時,您必須等到這個讀取器釋出,才能取得其他讀取器。

ReadableStreamDefaultReader 介面的 read() 方法會傳回承諾,提供對串流內部佇列中下一個區塊的存取權。它會根據串流的狀態,以結果滿足或拒絕。可能的情況如下:

  • 如果有可用區塊,承諾會以
    { value: chunk, done: false } 形式的物件完成。
  • 如果串流關閉,承諾會以
    { value: undefined, done: true } 表單的物件來履行。
  • 如果串流發生錯誤,承諾會因相關錯誤而遭到拒絕。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 屬性

您可以存取可讀取串流的 ReadableStream.locked 屬性,確認可讀取串流是否已鎖定。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可讀取的串流程式碼範例

以下程式碼範例會顯示所有步驟的運作情形。您首先建立 ReadableStream,並在其 underlyingSource 引數 (即 TimestampSource 類別) 中定義 start() 方法。這個方法會告知串流的 controller,在十秒內每秒將時間戳記傳送至 enqueue()。最後,它會指示控制器 close() 串流。您可以透過 getReader() 方法建立讀取器,然後呼叫 read(),直到串流為 done 為止,藉此使用此串流。

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

非同步疊代

在每次 read() 迴圈疊代時,檢查串流是否為 done 可能不是最方便的 API。幸好,我們很快就會推出更好的方法:非同步疊代。

for await (const chunk of stream) {
  console.log(chunk);
}

目前使用非同步疊代功能的解決方法,是使用 polyfill 實作行為。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

分流可讀取的串流

ReadableStream 介面的 tee() 方法會分流目前可讀取的串流,並傳回包含兩個結果分支的兩個元素陣列,做為新的 ReadableStream 例項。這樣一來,兩個讀取器就能同時讀取資料流。舉例來說,如果您想從伺服器擷取回應並串流傳輸至瀏覽器,但也想將其串流傳輸至服務工作站快取,就可以在服務工作站中執行這項操作。由於回應主體無法使用超過一次,因此您需要兩個副本才能執行此操作。如要取消串流,您必須取消兩個產生的分支。分流串流通常會在整個過程中鎖定,防止其他讀取器鎖定。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

可讀取的位元組串流

針對代表位元的串流,我們會提供可讀串流的擴充版本,以便有效處理位元,特別是盡量減少複製次數。位元組串流可讓您取得自備緩衝區 (BYOB) 讀取器。預設實作方式可提供一系列不同的輸出內容,例如字串或陣列緩衝區 (在 WebSocket 的情況下),而位元組串流可保證位元組輸出內容。此外,自備讀取器具有穩定性優勢。這是因為如果緩衝區分離,則可確保不會寫入相同的緩衝區兩次,進而避免競爭狀態。BYOB 讀取器可重複使用緩衝區,因此可減少瀏覽器執行垃圾收集的次數。

建立可讀取的位元組串流

您可以將額外的 type 參數傳遞至 ReadableStream() 建構函式,藉此建立可讀取的位元組串流。

new ReadableStream({ type: 'bytes' });

underlyingSource

可讀位元組串流的基礎來源會提供 ReadableByteStreamController 供操控。其 ReadableByteStreamController.enqueue() 方法會採用 chunk 引數,其值為 ArrayBufferView。屬性 ReadableByteStreamController.byobRequest 會傳回目前的 BYOB 提取要求,如果沒有則傳回空值。最後,ReadableByteStreamController.desiredSize 屬性會傳回所需大小,以填滿受控流量的內部佇列。

queuingStrategy

ReadableStream() 建構函式的第二個 (同樣為選用) 引數為 queuingStrategy。這是一個可選物件,可為串流定義佇列策略,並採用一個參數:

  • highWaterMark:非負的位元組數,表示使用此佇列策略的串流高水位標記。這會用來判斷回壓,並透過適當的 ReadableByteStreamController.desiredSize 屬性顯示。它也會決定何時呼叫基礎來源的 pull() 方法。

getReader()read() 方法

接著,您可以據此設定 mode 參數,以便存取 ReadableStreamBYOBReaderReadableStream.getReader({ mode: "byob" })。這可讓您更精確地控管緩衝區配置,以免產生副本。如要從位元組串流讀取資料,您需要呼叫 ReadableStreamBYOBReader.read(view),其中 viewArrayBufferView

可讀取的位元組串流程式碼範例

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

下列函式會傳回可讀取的位元組串流,可讓您有效地讀取隨機產生的陣列,而無需複製資料。它不會使用預先設定的 1,024 位元組大小,而是嘗試填入開發人員提供的緩衝區,以便完全控制。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

可寫入串流的機制

可寫入的串流是可寫入資料的目的地,在 JavaScript 中以 WritableStream 物件表示。這會在底層匯出端 (也就是寫入原始資料的較低層級 I/O 匯出端) 之上,提供抽象層。

資料會透過writer 寫入串流,一次一個區塊。就像讀取器中的區塊一樣,區塊可以採用多種形式。您可以使用任何程式碼產生可供編寫的區塊;編寫器加上相關程式碼稱為產生器

當寫入器建立並開始寫入串流 (有效寫入器) 時,就會被視為鎖定串流。一次只能有一個寫入器寫入可寫入式串流。如要讓其他寫入器開始寫入串流,通常需要先釋放串流,然後再將其他寫入器附加至串流。

內部佇列會追蹤已寫入串流但尚未由基礎接收端處理的區塊。

佇列策略是一種物件,可決定串流應如何根據內部佇列的狀態,發出回壓信號。佇列策略會為每個區塊指派大小,並將佇列中所有區塊的總大小與指定數字 (稱為「高水位標記」) 進行比較。

最終的建構項目稱為控制器。每個可寫入的串流都有相關聯的控制器,可讓您控制串流 (例如中止串流)。

建立可寫入的串流

Streams API 的 WritableStream 介面提供標準抽象,可將串流資料寫入目的地 (稱為接收器)。此物件內建回壓和佇列功能。您可以呼叫其建構函式 WritableStream() 來建立可寫入的串流。它具有選用的 underlyingSink 參數,可代表物件,其中包含定義建構的串流例項行為方式的方法和屬性。

underlyingSink

underlyingSink 可包含下列選用的開發人員定義方法。傳遞至部分方法的 controller 參數為 WritableStreamDefaultController

  • start(controller):系統會在建構物件時立即呼叫此方法。此方法的內容應旨在取得對基礎接收器的存取權。如果要以非同步方式執行這項程序,則可傳回承諾,以示成功或失敗。
  • write(chunk, controller):當新資料區塊 (在 chunk 參數中指定) 準備寫入基礎接收來源時,系統會呼叫這個方法。它可以傳回承諾,用於指出寫入作業的成功或失敗。只有在先前的寫入作業成功後才會呼叫此方法,且不會在串流關閉或中止後呼叫。
  • close(controller):如果應用程式傳送已完成將區塊寫入串流的信號,系統就會呼叫這個方法。內容應採取必要措施,將寫入作業完成並釋出對基礎接收端的存取權。如果這個程序是非同步的,則可傳回承諾,用來表示成功或失敗。只有在所有排隊的寫入作業成功後,系統才會呼叫這個方法。
  • abort(reason):如果應用程式傳送訊號表示要突然關閉串流並將其置於錯誤狀態,系統就會呼叫這個方法。它可以清理任何已保留的資源,與 close() 非常相似,但即使寫入作業已排入佇列,系統仍會呼叫 abort()。系統會丟棄這些區塊。如果這項程序是非同步的,則可傳回承諾,用來表示成功或失敗。reason 參數包含 DOMString,說明流程中斷的原因。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API 的 WritableStreamDefaultController 介面代表控制器,可在設定期間 (更多區塊提交供寫入) 或寫入結束時控制 WritableStream 的狀態。建構 WritableStream 時,基礎接收端會獲得對應的 WritableStreamDefaultController 例項,以便進行操作。WritableStreamDefaultController 只有一個方法:WritableStreamDefaultController.error(),這會導致日後與相關串流的任何互動都會發生錯誤。WritableStreamDefaultController 也支援 signal 屬性,可傳回 AbortSignal 的例項,讓您在需要時停止 WritableStream 作業。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() 建構函式的第二個 (同樣為選用) 引數為 queuingStrategy。這是一個物件,可選擇定義串流的佇列策略,該策略會採用兩個參數:

  • highWaterMark:非負數,表示使用此佇列策略的串流高水位標記。
  • size(chunk):函式會計算並傳回指定區塊值的有限非負大小。結果會用來判斷回壓,並透過適當的 WritableStreamDefaultWriter.desiredSize 屬性顯示。

getWriter()write() 方法

如要寫入可寫入式串流,您需要使用寫入器,也就是 WritableStreamDefaultWriterWritableStream 介面的 getWriter() 方法會傳回新的 WritableStreamDefaultWriter 例項,並將串流鎖定至該例項。當串流處於鎖定狀態時,除非釋放目前的寫入器,否則無法取得其他寫入器。

WritableStreamDefaultWriter 介面的 write() 方法會將傳入的資料片段寫入 WritableStream 及其底層接收器,然後傳回解析的應許,用於指出寫入作業的成功或失敗。請注意,「成功」一詞的意思取決於基礎接收端;它可能表示已接受區塊,但不一定表示已安全地儲存至最終目的地。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 屬性

您可以存取可寫串流的 WritableStream.locked 屬性,確認串流是否已上鎖。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可寫入串流程式碼範例

以下程式碼範例會顯示所有步驟的運作情形。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

將可讀取的串流管道傳送至可寫入的串流

可讀取的串流可透過可讀取串流的 pipeTo() 方法,傳送至可寫入的串流。ReadableStream.pipeTo() 會將目前的 ReadableStream 管道傳送至指定的 WritableStream,並傳回承諾,如果管道程序順利完成,則會履行承諾,如果發生任何錯誤,則會拒絕。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

建立轉換串流

Streams API 的 TransformStream 介面代表一組可轉換的資料。您可以透過呼叫其建構函式 TransformStream() 來建立轉換串流,該函式會從指定的處理常建立並傳回轉換串流物件。TransformStream() 建構函式會接受代表 transformer 的選用 JavaScript 物件做為第一個引數。這類物件可包含下列任一方法:

transformer

  • start(controller):系統會在建構物件時立即呼叫此方法。通常用於使用 controller.enqueue() 將前置字串分段排入佇列。這些區塊會從可讀端讀取,但不依賴對可寫端的任何寫入作業。如果這個初始程序是異步的,例如因為取得前置字串區塊需要花費一些力氣,函式可以傳回承諾,用來表示成功或失敗;遭到拒絕的承諾會導致串流發生錯誤。TransformStream() 建構函式會重新擲回任何擲回的例外狀況。
  • transform(chunk, controller):當原本寫入可寫端的新區塊已準備好進行轉換時,系統會呼叫這個方法。串流實作可確保系統只會在先前的轉換成功後呼叫此函式,不會在 start() 完成前或 flush() 呼叫後呼叫此函式。這個函式會執行轉換串流的實際轉換工作。它可以使用 controller.enqueue() 將結果排入佇列。這可讓寫入可寫端的單一區塊,在可讀端產生零或多個區塊,具體取決於 controller.enqueue() 的呼叫次數。如果轉換程序是非同步的,此函式可以傳回承諾,用來表示轉換是否成功。遭到拒絕的應許會在轉換串流的可讀和可寫端都發生錯誤。如果未提供 transform() 方法,系統會使用身分轉換,將未變更的區塊從可寫端排入佇列,傳送至可讀端。
  • flush(controller):當寫入可寫入端的所有區塊已成功傳送至 transform() 並轉換完成,且可寫入端即將關閉時,系統會呼叫這個方法。通常,這會用於將尾碼區塊排入可讀取端,然後才關閉。如果清除程序是非同步的,函式可以傳回承諾,用於表示成功或失敗;結果會傳達給 stream.writable.write() 的呼叫端。此外,遭到拒絕的承諾會在可讀和可寫的串流端都發生錯誤。擲回例外狀況的處理方式與傳回已拒絕的承諾相同。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy 排隊策略

TransformStream() 建構函式的第二個和第三個選用參數是選用的 writableStrategyreadableStrategy 佇列策略。這些定義分別在「可讀取」和「可寫入」串流部分中說明。

轉換串流程式碼範例

以下程式碼範例顯示簡單的轉換串流運作情形。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

透過轉換串流管道傳送可讀串流

ReadableStream 介面的 pipeThrough() 方法提供可鏈結的方式,可透過轉換串流或任何其他可寫入/可讀取組合,管道傳送目前串流。管道化串流通常會在管道期間鎖定串流,以免其他讀取器鎖定串流。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

下列程式碼範例 (稍微複雜) 說明如何實作「大聲」版本的 fetch(),這個版本會將傳回的回應應承諾視為流量,並逐個區塊將所有文字轉為大寫。這種做法的好處是,您不必等待整份文件下載完畢,這在處理大型檔案時,可帶來極大的差異。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

示範

以下示範可讀、可寫和轉換串流的實際運作情形。並提供 pipeThrough()pipeTo() 管道鏈結的範例,以及 tee() 的示範。您可以選擇在專屬視窗中執行示範,或查看原始碼

瀏覽器提供的實用串流

瀏覽器內建了許多實用的串流。您可以輕鬆從 blob 建立 ReadableStreamBlob 介面的 stream() 方法會傳回 ReadableStream,該方法在讀取時會傳回 Blob 內含的資料。請注意,File 物件是 Blob 的特定類型,可用於 Blob 可用的任何情境。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() 的串流變體分別稱為 TextDecoderStreamTextEncoderStream

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

您可以分別使用 CompressionStreamDecompressionStream 轉換串流,輕鬆壓縮或解壓縮檔案。下列程式碼範例說明如何下載 Streams 規格、直接在瀏覽器中壓縮 (gzip) 檔案,並將壓縮檔案直接寫入磁碟。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream 和實驗性 fetch() 要求串流,就是實際可寫入串流的例子。

Serial API 大量使用可讀取和可寫入的串流。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最後,WebSocketStream API 會將串流與 WebSocket API 整合。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

實用資源

特別銘謝

本文由 Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe MedleyAdam Rice 共同審查。Jake Archibald 的網誌文章對我瞭解串流非常有幫助。部分程式碼範例的靈感來自 GitHub 使用者 @bellbind 的探索,而部分文字則大量參考 MDN Web Streams 說明文件Streams Standard作者在撰寫此規格時付出極大努力。主圖由 Ryan LaraUnsplash 上提供。