Streaming—Panduan definitif

Pelajari cara menggunakan streaming yang dapat dibaca, ditulis, dan diubah dengan Streams API.

Streams API memungkinkan Anda mengakses aliran data yang diterima melalui jaringan secara terprogram atau dibuat dengan cara apa pun secara lokal dan memprosesnya dengan JavaScript. Streaming melibatkan pemecahan resource yang ingin Anda terima, kirim, atau ubah menjadi potongan kecil, lalu memproses potongan ini sedikit demi sedikit. Meskipun streaming adalah hal yang dilakukan browser saat menerima aset seperti HTML atau video untuk ditampilkan di halaman web, kemampuan ini belum pernah tersedia untuk JavaScript sebelum fetch dengan streaming diperkenalkan pada tahun 2015.

Sebelumnya, jika ingin memproses resource apa pun (baik video, file teks, dll.), Anda harus mendownload seluruh file, menunggu file tersebut dideserialisasi ke dalam format yang sesuai, lalu memprosesnya. Dengan streaming yang tersedia untuk JavaScript, semua ini akan berubah. Anda kini dapat memproses data mentah dengan JavaScript secara bertahap begitu data tersedia di klien, tanpa perlu membuat buffering, string, atau blob. Hal ini membuka sejumlah kasus penggunaan, beberapa di antaranya saya cantumkan di bawah:

  • Efek video: menyalurkan streaming video yang dapat dibaca melalui streaming transformasi yang menerapkan efek secara real time.
  • (De)kompresi data: menyalurkan aliran file melalui aliran transformasi yang secara selektif (mendekompresi) data.
  • Dekode gambar: menyalurkan aliran respons HTTP melalui aliran transformasi yang mendekode byte menjadi data bitmap, lalu melalui aliran transformasi lain yang menerjemahkan bitmap menjadi PNG. Jika diinstal di dalam pengendali fetch pekerja layanan, hal ini memungkinkan Anda melakukan polyfill format gambar baru seperti AVIF secara transparan.

Dukungan browser

ReadableStream dan WritableStream

Dukungan Browser

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Sumber

TransformStream

Dukungan Browser

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Sumber

Konsep inti

Sebelum membahas berbagai jenis streaming secara mendetail, izinkan saya memperkenalkan beberapa konsep inti.

Potongan

Bagian adalah satu bagian data yang ditulis ke atau dibaca dari aliran data. Jenisnya dapat berupa apa saja; aliran data bahkan dapat berisi potongan dari berbagai jenis. Sering kali, bagian tidak akan menjadi unit data yang paling atomik untuk aliran tertentu. Misalnya, aliran byte mungkin berisi potongan yang terdiri dari 16 unit Uint8Array kibibyte, bukan byte tunggal.

Aliran data yang dapat dibaca

Aliran data yang dapat dibaca mewakili sumber data yang dapat Anda baca. Dengan kata lain, data keluar dari aliran data yang dapat dibaca. Secara konkret, aliran yang dapat dibaca adalah instance class ReadableStream.

Aliran data yang dapat ditulis

Aliran data yang dapat ditulis mewakili tujuan untuk data yang dapat Anda tulis. Dengan kata lain, data masuk ke aliran yang dapat ditulis. Secara konkret, aliran yang dapat ditulis adalah instance dari class WritableStream.

Mentransformasi aliran data

Aliran transformasi terdiri dari sepasang aliran: aliran yang dapat ditulis, yang dikenal sebagai sisi yang dapat ditulis, dan aliran yang dapat dibaca, yang dikenal sebagai sisi yang dapat dibaca. Metafora dunia nyata untuk hal ini adalah penerjemah simultan yang menerjemahkan dari satu bahasa ke bahasa lain secara langsung. Dengan cara khusus untuk aliran transformasi, penulisan ke sisi yang dapat ditulis akan menghasilkan data baru yang tersedia untuk dibaca dari sisi yang dapat dibaca. Secara konkret, setiap objek dengan properti writable dan properti readable dapat berfungsi sebagai aliran transformasi. Namun, class TransformStream standar memudahkan pembuatan pasangan tersebut yang terjalin dengan benar.

Rantai pipa

Streaming terutama digunakan dengan menggabungkan satu sama lain. Streaming yang dapat dibaca dapat disalurkan langsung ke streaming yang dapat ditulis, menggunakan metode pipeTo() streaming yang dapat dibaca, atau dapat disalurkan melalui satu atau beberapa streaming transformasi terlebih dahulu, menggunakan metode pipeThrough() streaming yang dapat dibaca. Kumpulan aliran yang digabungkan dengan cara ini disebut sebagai rantai pipa.

Tekanan balik

Setelah rantai pipa dibuat, rantai tersebut akan menyebarkan sinyal terkait seberapa cepat bagian harus mengalir melalui rantai tersebut. Jika ada langkah dalam rantai yang belum dapat menerima potongan, langkah tersebut akan menyebarkan sinyal ke belakang melalui rantai pipa, hingga akhirnya sumber asli diberi tahu untuk berhenti menghasilkan potongan begitu cepat. Proses menormalisasi alur ini disebut backpressure.

Teeing

Streaming yang dapat dibaca dapat di-tee (dinamai sesuai bentuk huruf besar 'T') menggunakan metode tee()-nya. Tindakan ini akan mengunci streaming, yaitu membuatnya tidak dapat digunakan secara langsung; namun, tindakan ini akan membuat dua streaming baru, yang disebut cabang, yang dapat digunakan secara independen. Memulai streaming juga penting karena streaming tidak dapat diputar ulang atau dimulai ulang. Nanti kita akan membahasnya lebih lanjut.

Diagram rantai pipa yang terdiri dari aliran data yang dapat dibaca yang berasal dari panggilan ke fetch API yang kemudian disalurkan melalui aliran transformasi yang outputnya dihubungkan, lalu dikirim ke browser untuk aliran data yang dapat dibaca pertama yang dihasilkan dan ke cache pekerja layanan untuk aliran data yang dapat dibaca kedua yang dihasilkan.
Rantai pipa.

Mekanisme streaming yang dapat dibaca

Aliran data yang dapat dibaca adalah sumber data yang direpresentasikan dalam JavaScript oleh objek ReadableStream yang mengalir dari sumber pokok. Konstruktor ReadableStream() membuat dan menampilkan objek streaming yang dapat dibaca dari pengendali yang diberikan. Ada dua jenis sumber pokok:

  • Sumber push terus mendorong data kepada Anda saat Anda mengaksesnya, dan Anda dapat memulai, menjeda, atau membatalkan akses ke streaming. Contohnya mencakup streaming video live, peristiwa yang dikirim server, atau WebSocket.
  • Sumber pull mengharuskan Anda meminta data secara eksplisit dari sumber tersebut setelah terhubung. Contohnya meliputi operasi HTTP melalui panggilan fetch() atau XMLHttpRequest.

Data streaming dibaca secara berurutan dalam potongan kecil yang disebut chunk. Potongan yang ditempatkan dalam streaming disebut diantrekan. Artinya, data tersebut menunggu dalam antrean yang siap dibaca. Antrean internal melacak bagian yang belum dibaca.

Strategi antrean adalah objek yang menentukan cara streaming memberikan sinyal backpressure berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran untuk setiap bagian, dan membandingkan ukuran total semua bagian dalam antrean dengan angka yang ditentukan, yang dikenal sebagai high water mark.

Potongan di dalam aliran dibaca oleh pembaca. Pembaca ini mengambil data satu bagian sekaligus, sehingga Anda dapat melakukan jenis operasi apa pun yang ingin dilakukan. Pembaca beserta kode pemrosesan lainnya yang menyertainya disebut konsumen.

Konstruksi berikutnya dalam konteks ini disebut pengontrol. Setiap streaming yang dapat dibaca memiliki pengontrol terkait yang, seperti namanya, memungkinkan Anda mengontrol streaming.

Hanya satu pembaca yang dapat membaca streaming dalam satu waktu; saat pembaca dibuat dan mulai membaca streaming (yaitu, menjadi pembaca aktif), pembaca akan dikunci ke streaming tersebut. Jika Anda ingin pembaca lain mengambilalih pembacaan streaming, Anda biasanya perlu melepaskan pembaca pertama sebelum melakukan hal lain (meskipun Anda dapat memisahkan streaming).

Membuat streaming yang dapat dibaca

Anda membuat streaming yang dapat dibaca dengan memanggil konstruktornya ReadableStream(). Konstruktor memiliki argumen opsional underlyingSource, yang mewakili objek dengan metode dan properti yang menentukan perilaku instance streaming yang dibuat.

underlyingSource

Hal ini dapat menggunakan metode opsional yang ditentukan developer berikut:

  • start(controller): Segera dipanggil saat objek dibuat. Metode ini dapat mengakses sumber aliran data, dan melakukan hal lain yang diperlukan untuk menyiapkan fungsi aliran data. Jika proses ini akan dilakukan secara asinkron, metode ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Parameter controller yang diteruskan ke metode ini adalah ReadableStreamDefaultController.
  • pull(controller): Dapat digunakan untuk mengontrol streaming saat lebih banyak bagian diambil. Fungsi ini dipanggil berulang kali selama antrean internal chunk streaming tidak penuh, hingga antrean mencapai nilai maksimumnya. Jika hasil pemanggilan pull() adalah promise, pull() tidak akan dipanggil lagi hingga promise tersebut terpenuhi. Jika promise ditolak, streaming akan mengalami error.
  • cancel(reason): Dipanggil saat konsumen streaming membatalkan streaming.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController mendukung metode berikut:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Argumen kedua, yang juga bersifat opsional, dari konstruktor ReadableStream() adalah queuingStrategy. Ini adalah objek yang secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua parameter:

  • highWaterMark: Angka non-negatif yang menunjukkan nilai maksimum aliran data menggunakan strategi antrean ini.
  • size(chunk): Fungsi yang menghitung dan menampilkan ukuran non-negatif yang terbatas dari nilai bagian yang diberikan. Hasilnya digunakan untuk menentukan backpressure, yang ditampilkan melalui properti ReadableStreamDefaultController.desiredSize yang sesuai. Hal ini juga mengatur kapan metode pull() sumber yang mendasarinya dipanggil.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Metode getReader() dan read()

Untuk membaca dari aliran yang dapat dibaca, Anda memerlukan pembaca, yang akan berupa ReadableStreamDefaultReader. Metode getReader() antarmuka ReadableStream membuat pembaca dan mengunci streaming ke antarmuka tersebut. Saat streaming terkunci, tidak ada pembaca lain yang dapat diperoleh hingga pembaca ini dirilis.

Metode read() antarmuka ReadableStreamDefaultReader menampilkan promise yang memberikan akses ke bagian berikutnya dalam antrean internal streaming. Fungsi ini memenuhi atau menolak dengan hasil yang bergantung pada status streaming. Kemungkinan yang berbeda adalah sebagai berikut:

  • Jika bagian tersedia, promise akan terpenuhi dengan objek dalam bentuk
    { value: chunk, done: false }.
  • Jika streaming ditutup, promise akan terpenuhi dengan objek dalam bentuk
    { value: undefined, done: true }.
  • Jika streaming mengalami error, promise akan ditolak dengan error yang relevan.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Properti locked

Anda dapat memeriksa apakah aliran data yang dapat dibaca terkunci dengan mengakses properti ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Contoh kode streaming yang dapat dibaca

Contoh kode di bawah menunjukkan semua langkah yang sedang berjalan. Pertama-tama, Anda membuat ReadableStream yang dalam argumen underlyingSource-nya (yaitu class TimestampSource) menentukan metode start(). Metode ini memberi tahu controller streaming untuk enqueue() stempel waktu setiap detik selama sepuluh detik. Terakhir, kode ini akan memberi tahu pengontrol untuk close() streaming. Anda menggunakan streaming ini dengan membuat pembaca melalui metode getReader() dan memanggil read() hingga streaming done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iterasi asinkron

Memeriksa setiap iterasi loop read() jika streamingnya adalah done mungkin bukan API yang paling praktis. Untungnya, akan segera ada cara yang lebih baik untuk melakukannya: iterasi asinkron.

for await (const chunk of stream) {
  console.log(chunk);
}

Solusi untuk menggunakan iterasi asinkron saat ini adalah dengan menerapkan perilaku dengan polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Membuat tee streaming yang dapat dibaca

Metode tee() dari antarmuka ReadableStream menghubungkan aliran yang dapat dibaca saat ini, yang menampilkan array dua elemen yang berisi dua cabang yang dihasilkan sebagai instance ReadableStream baru. Hal ini memungkinkan dua pembaca membaca streaming secara bersamaan. Anda dapat melakukannya, misalnya, di pekerja layanan jika ingin mengambil respons dari server dan melakukan streaming ke browser, tetapi juga melakukan streaming ke cache pekerja layanan. Karena isi respons tidak dapat digunakan lebih dari sekali, Anda memerlukan dua salinan untuk melakukannya. Untuk membatalkan streaming, Anda harus membatalkan kedua cabang yang dihasilkan. Memulai streaming umumnya akan menguncinya selama durasi, sehingga mencegah pembaca lain menguncinya.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Aliran byte yang dapat dibaca

Untuk aliran yang mewakili byte, versi yang diperluas dari aliran yang dapat dibaca disediakan untuk menangani byte secara efisien, terutama dengan meminimalkan salinan. Byte stream memungkinkan pembaca bring-your-own-buffer (BYOB) diperoleh. Implementasi default dapat memberikan berbagai output seperti string atau buffering array dalam kasus WebSocket, sedangkan byte stream menjamin output byte. Selain itu, pembaca BYOB memiliki manfaat stabilitas. Hal ini karena jika buffer dilepas, buffer tersebut dapat menjamin bahwa buffer tidak ditulis dua kali, sehingga menghindari kondisi race. Pembaca BYOB dapat mengurangi frekuensi browser perlu menjalankan pembersihan sampah memori, karena dapat menggunakan kembali buffering.

Membuat aliran byte yang dapat dibaca

Anda dapat membuat aliran byte yang dapat dibaca dengan meneruskan parameter type tambahan ke konstruktor ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Sumber pokok aliran byte yang dapat dibaca diberi ReadableByteStreamController untuk dimanipulasi. Metode ReadableByteStreamController.enqueue()-nya menggunakan argumen chunk yang nilainya adalah ArrayBufferView. Properti ReadableByteStreamController.byobRequest menampilkan permintaan pull BYOB saat ini, atau null jika tidak ada. Terakhir, properti ReadableByteStreamController.desiredSize menampilkan ukuran yang diinginkan untuk mengisi antrean internal streaming yang dikontrol.

queuingStrategy

Argumen kedua, yang juga bersifat opsional, dari konstruktor ReadableStream() adalah queuingStrategy. Ini adalah objek yang secara opsional menentukan strategi antrean untuk streaming, yang menggunakan satu parameter:

  • highWaterMark: Jumlah byte non-negatif yang menunjukkan nilai maksimum aliran data menggunakan strategi antrean ini. Ini digunakan untuk menentukan backpressure, yang ditampilkan melalui properti ReadableByteStreamController.desiredSize yang sesuai. Hal ini juga mengatur kapan metode pull() sumber yang mendasarinya dipanggil.

Metode getReader() dan read()

Kemudian, Anda bisa mendapatkan akses ke ReadableStreamBYOBReader dengan menetapkan parameter mode yang sesuai: ReadableStream.getReader({ mode: "byob" }). Hal ini memungkinkan kontrol yang lebih akurat atas alokasi buffer untuk menghindari salinan. Untuk membaca dari aliran byte, Anda harus memanggil ReadableStreamBYOBReader.read(view), dengan view adalah ArrayBufferView.

Contoh kode byte stream yang dapat dibaca

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Fungsi berikut menampilkan aliran byte yang dapat dibaca yang memungkinkan pembacaan zero-copy yang efisien dari array yang dihasilkan secara acak. Alih-alih menggunakan ukuran bagian yang telah ditentukan sebelumnya sebesar 1.024, buffer ini mencoba mengisi buffer yang disediakan developer, sehingga memungkinkan kontrol penuh.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mekanisme streaming yang dapat ditulis

Aliran data yang dapat ditulis adalah tujuan tempat Anda dapat menulis data, yang direpresentasikan dalam JavaScript oleh objek WritableStream. Hal ini berfungsi sebagai abstraksi di atas sink yang mendasari—sink I/O tingkat rendah tempat data mentah ditulis.

Data ditulis ke aliran melalui penulis, satu bagian dalam satu waktu. Bagian dapat memiliki berbagai bentuk, seperti bagian dalam pembaca. Anda dapat menggunakan kode apa pun yang Anda sukai untuk menghasilkan bagian yang siap ditulis; penulis ditambah kode terkait disebut produser.

Saat penulis dibuat dan mulai menulis ke streaming (penulis aktif), penulis tersebut dianggap terkunci ke streaming. Hanya satu penulis yang dapat menulis ke stream yang dapat ditulis dalam satu waktu. Jika ingin penulis lain mulai menulis ke streaming, Anda biasanya harus merilisnya, sebelum melampirkan penulis lain ke streaming.

Antrean internal melacak bagian yang telah ditulis ke aliran data, tetapi belum diproses oleh sink yang mendasarinya.

Strategi antrean adalah objek yang menentukan cara streaming memberikan sinyal backpressure berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran untuk setiap bagian, dan membandingkan ukuran total semua bagian dalam antrean dengan angka yang ditentukan, yang dikenal sebagai high water mark.

Konstruksi akhir disebut pengontrol. Setiap aliran data yang dapat ditulis memiliki pengontrol terkait yang memungkinkan Anda mengontrol aliran data (misalnya, untuk membatalkannya).

Membuat streaming yang dapat ditulis

Antarmuka WritableStream Streams API menyediakan abstraksi standar untuk menulis data streaming ke tujuan, yang dikenal sebagai sink. Objek ini dilengkapi dengan backpressure dan antrean bawaan. Anda membuat aliran yang dapat ditulis dengan memanggil konstruktornya WritableStream(). Class ini memiliki parameter underlyingSink opsional, yang mewakili objek dengan metode dan properti yang menentukan perilaku instance streaming yang dibuat.

underlyingSink

underlyingSink dapat menyertakan metode opsional yang ditentukan developer berikut. Parameter controller yang diteruskan ke beberapa metode adalah WritableStreamDefaultController.

  • start(controller): Metode ini langsung dipanggil saat objek dibuat. Konten metode ini harus bertujuan untuk mendapatkan akses ke sink yang mendasarinya. Jika proses ini akan dilakukan secara asinkron, proses ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan.
  • write(chunk, controller): Metode ini akan dipanggil saat bagian data baru (ditentukan dalam parameter chunk) siap ditulis ke sink yang mendasarinya. Fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan operasi tulis. Metode ini hanya akan dipanggil setelah penulisan sebelumnya berhasil, dan tidak pernah setelah streaming ditutup atau dibatalkan.
  • close(controller): Metode ini akan dipanggil jika aplikasi memberi sinyal bahwa aplikasi telah selesai menulis bagian ke aliran data. Konten harus melakukan apa pun yang diperlukan untuk menyelesaikan penulisan ke sink yang mendasarinya, dan melepaskan akses ke sink tersebut. Jika proses ini asinkron, proses ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Metode ini hanya akan dipanggil setelah semua operasi tulis dalam antrean berhasil.
  • abort(reason): Metode ini akan dipanggil jika aplikasi memberi sinyal bahwa aplikasi ingin menutup streaming secara tiba-tiba dan menempatkannya dalam status error. Fungsi ini dapat membersihkan resource yang disimpan, seperti close(), tetapi abort() akan dipanggil meskipun operasi tulis diantre. Potongan tersebut akan dibuang. Jika asinkron, proses ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Parameter reason berisi DOMString yang menjelaskan alasan streaming dibatalkan.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Antarmuka WritableStreamDefaultController Streams API mewakili pengontrol yang memungkinkan kontrol status WritableStream selama penyiapan, karena lebih banyak bagian dikirim untuk ditulis, atau di akhir penulisan. Saat membuat WritableStream, sink yang mendasarinya diberi instance WritableStreamDefaultController yang sesuai untuk dimanipulasi. WritableStreamDefaultController hanya memiliki satu metode: WritableStreamDefaultController.error(), yang menyebabkan interaksi mendatang dengan aliran data terkait mengalami error. WritableStreamDefaultController juga mendukung properti signal yang menampilkan instance AbortSignal, sehingga operasi WritableStream dapat dihentikan jika diperlukan.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Argumen kedua, yang juga bersifat opsional, dari konstruktor WritableStream() adalah queuingStrategy. Ini adalah objek yang secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua parameter:

  • highWaterMark: Angka non-negatif yang menunjukkan nilai maksimum aliran data menggunakan strategi antrean ini.
  • size(chunk): Fungsi yang menghitung dan menampilkan ukuran non-negatif yang terbatas dari nilai bagian yang diberikan. Hasilnya digunakan untuk menentukan backpressure, yang ditampilkan melalui properti WritableStreamDefaultWriter.desiredSize yang sesuai.

Metode getWriter() dan write()

Untuk menulis ke aliran yang dapat ditulis, Anda memerlukan penulis, yang akan menjadi WritableStreamDefaultWriter. Metode getWriter() antarmuka WritableStream menampilkan instance WritableStreamDefaultWriter baru dan mengunci streaming ke instance tersebut. Saat aliran terkunci, tidak ada penulis lain yang dapat diperoleh hingga penulis saat ini dirilis.

Metode write() antarmuka WritableStreamDefaultWriter menulis bagian data yang diteruskan ke WritableStream dan sink yang mendasarinya, lalu menampilkan promise yang di-resolve untuk menunjukkan keberhasilan atau kegagalan operasi tulis. Perhatikan bahwa makna "sukses" bergantung pada sink yang mendasarinya; hal ini mungkin menunjukkan bahwa bagian telah diterima, dan tidak selalu disimpan dengan aman ke tujuan akhirnya.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Properti locked

Anda dapat memeriksa apakah aliran data yang dapat ditulis terkunci dengan mengakses properti WritableStream.locked -nya.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Contoh kode streaming yang dapat ditulis

Contoh kode di bawah menunjukkan semua langkah yang sedang berjalan.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Mengalirkan streaming yang dapat dibaca ke streaming yang dapat ditulis

Stream yang dapat dibaca dapat disalurkan ke stream yang dapat ditulis melalui metode pipeTo() stream yang dapat dibaca. ReadableStream.pipeTo() menyalurkan ReadableStream saat ini ke WritableStream tertentu dan menampilkan promise yang terpenuhi saat proses penyaluran berhasil diselesaikan, atau menolak jika terjadi error.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Membuat streaming transformasi

Antarmuka TransformStream Streams API mewakili sekumpulan data yang dapat ditransformasi. Anda membuat aliran transformasi dengan memanggil konstruktornya TransformStream(), yang membuat dan menampilkan objek aliran transformasi dari pengendali yang diberikan. Konstruktor TransformStream() menerima sebagai argumen pertamanya objek JavaScript opsional yang mewakili transformer. Objek tersebut dapat berisi salah satu metode berikut:

transformer

  • start(controller): Metode ini langsung dipanggil saat objek dibuat. Biasanya, hal ini digunakan untuk mengantrekan potongan awalan, menggunakan controller.enqueue(). Potongan tersebut akan dibaca dari sisi yang dapat dibaca, tetapi tidak bergantung pada penulisan apa pun ke sisi yang dapat ditulis. Jika proses awal ini bersifat asinkron, misalnya karena perlu beberapa upaya untuk mendapatkan potongan awalan, fungsi dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan; promise yang ditolak akan membuat error pada streaming. Setiap pengecualian yang ditampilkan akan ditampilkan kembali oleh konstruktor TransformStream().
  • transform(chunk, controller): Metode ini dipanggil saat bagian baru yang awalnya ditulis ke sisi yang dapat ditulis siap diubah. Implementasi streaming menjamin bahwa fungsi ini hanya akan dipanggil setelah transformasi sebelumnya berhasil, dan tidak pernah sebelum start() selesai atau setelah flush() dipanggil. Fungsi ini melakukan pekerjaan transformasi yang sebenarnya dari aliran transformasi. Fungsi ini dapat mengantrekan hasil menggunakan controller.enqueue(). Hal ini memungkinkan satu bagian yang ditulis ke sisi yang dapat ditulis menghasilkan nol atau beberapa bagian di sisi yang dapat dibaca, bergantung pada frekuensi controller.enqueue() dipanggil. Jika proses transformasi bersifat asinkron, fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan transformasi. Promise yang ditolak akan menampilkan error pada sisi yang dapat dibaca dan ditulis dari streaming transformasi. Jika tidak ada metode transform() yang disediakan, transformasi identitas akan digunakan, yang menambahkan chunk ke antrean tanpa perubahan dari sisi yang dapat ditulis ke sisi yang dapat dibaca.
  • flush(controller): Metode ini dipanggil setelah semua bagian yang ditulis ke sisi yang dapat ditulis telah diubah dengan berhasil meneruskan transform(), dan sisi yang dapat ditulis akan ditutup. Biasanya, ini digunakan untuk mengantrekan potongan akhiran ke sisi yang dapat dibaca, sebelum ditutup juga. Jika proses penghapusan data bersifat asinkron, fungsi dapat menampilkan promise untuk memberi sinyal keberhasilan atau kegagalan; hasilnya akan disampaikan kepada pemanggil stream.writable.write(). Selain itu, promise yang ditolak akan menampilkan error pada sisi stream yang dapat dibaca dan ditulis. Menampilkan pengecualian diperlakukan sama seperti menampilkan promise yang ditolak.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Strategi antrean writableStrategy dan readableStrategy

Parameter opsional kedua dan ketiga dari konstruktor TransformStream() adalah strategi antrean writableStrategy dan readableStrategy opsional. Keduanya ditentukan seperti yang diuraikan di bagian streaming dapat dibaca dan dapat ditulis.

Contoh kode streaming transformasi

Contoh kode berikut menunjukkan cara kerja streaming transformasi sederhana.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Mengalirkan streaming yang dapat dibaca melalui streaming transformasi

Metode pipeThrough() antarmuka ReadableStream menyediakan cara yang dapat dirantai untuk menyalurkan streaming saat ini melalui streaming transformasi atau pasangan lain yang dapat ditulis/dibaca. Pemipaan aliran data biasanya akan menguncinya selama durasi pemipaan, sehingga mencegah pembaca lain menguncinya.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Contoh kode berikutnya (agak dibuat-buat) menunjukkan cara menerapkan versi fetch() "berteriak" yang menggunakan huruf besar untuk semua teks dengan menggunakan promise respons yang ditampilkan sebagai streaming dan menggunakan huruf besar untuk setiap bagian. Keuntungan dari pendekatan ini adalah Anda tidak perlu menunggu seluruh dokumen didownload, yang dapat membuat perbedaan besar saat menangani file berukuran besar.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

Demo di bawah menunjukkan cara kerja streaming yang dapat dibaca, ditulis, dan diubah. Contoh ini juga mencakup contoh rantai pipa pipeThrough() dan pipeTo(), serta menunjukkan tee(). Anda dapat menjalankan demo di jendelanya sendiri atau melihat kode sumber.

Streaming yang berguna tersedia di browser

Ada sejumlah streaming berguna yang terintegrasi langsung di browser. Anda dapat dengan mudah membuat ReadableStream dari blob. Metode stream() antarmuka Blob menampilkan ReadableStream yang setelah dibaca akan menampilkan data yang terdapat dalam blob. Ingat juga bahwa objek File adalah jenis khusus dari Blob, dan dapat digunakan dalam konteks apa pun yang dapat digunakan blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Varian streaming TextDecoder.decode() dan TextEncoder.encode() masing-masing disebut TextDecoderStream dan TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Mengompresi atau mendekompresi file menjadi mudah dengan aliran transformasi CompressionStream dan DecompressionStream. Contoh kode di bawah menunjukkan cara mendownload spesifikasi Streams, mengompresi (gzip) langsung di browser, dan menulis file yang dikompresi langsung ke disk.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream File System Access API dan fetch() request stream eksperimental adalah contoh stream yang dapat ditulis di dunia nyata.

Serial API banyak menggunakan streaming yang dapat dibaca dan ditulis.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Terakhir, WebSocketStream API mengintegrasikan streaming dengan WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Referensi yang berguna

Ucapan terima kasih

Artikel ini ditinjau oleh Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley, dan Adam Rice. Postingan blog Jake Archibald sangat membantu saya dalam memahami aliran data. Beberapa contoh kode terinspirasi oleh eksplorasi pengguna GitHub @bellbind dan bagian prosa yang sangat bergantung pada MDN Web Docs on Streams. Penulis Standar Aliran Data telah melakukan pekerjaan yang luar biasa dalam menulis spesifikasi ini. Gambar hero oleh Ryan Lara di Unsplash.