Streaming—Panduan definitif

Pelajari cara menggunakan aliran yang dapat dibaca, dapat ditulis, dan dapat ditransformasi dengan Streams API.

Streams API memungkinkan Anda mengakses aliran data yang diterima melalui jaringan secara terprogram atau dibuat dengan cara apa pun secara lokal dan memprosesnya dengan JavaScript. Streaming melibatkan penguraian resource yang ingin Anda terima, kirim, atau ubah menjadi potongan kecil, lalu memproses potongan ini sedikit demi sedikit. Meskipun streaming adalah sesuatu yang dilakukan browser saat menerima aset seperti HTML atau video untuk ditampilkan di halaman web, kemampuan ini belum pernah tersedia untuk JavaScript sebelum fetch dengan streaming diperkenalkan pada tahun 2015.

Sebelumnya, jika Anda ingin memproses suatu jenis resource (baik itu video, file teks, dll.), Anda harus mendownload seluruh file, menunggu file tersebut dideserialisasi ke dalam format yang sesuai, lalu memprosesnya. Dengan tersedianya stream untuk JavaScript, semuanya berubah. Anda kini dapat memproses data mentah dengan JavaScript secara progresif segera setelah tersedia di klien, tanpa perlu membuat buffer, string, atau blob. Hal ini memungkinkan sejumlah kasus penggunaan, beberapa di antaranya saya cantumkan di bawah:

  • Efek video: menyalurkan streaming video yang dapat dibaca melalui streaming transformasi yang menerapkan efek secara real time.
  • (De)kompresi data: menyalurkan aliran file melalui aliran transformasi yang secara selektif (de)mengompresinya.
  • Dekode gambar: menyalurkan aliran respons HTTP melalui aliran transformasi yang mendekode byte menjadi data bitmap, lalu melalui aliran transformasi lain yang menerjemahkan bitmap menjadi PNG. Jika diinstal di dalam pengendali fetch pekerja layanan, Anda dapat melakukan polyfill format gambar baru seperti AVIF secara transparan.

Dukungan browser

ReadableStream dan WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

Konsep inti

Sebelum membahas detail berbagai jenis aliran, izinkan saya memperkenalkan beberapa konsep inti.

Potongan

Chunk adalah satu bagian data yang ditulis ke atau dibaca dari aliran. Data ini dapat berupa jenis apa pun; aliran bahkan dapat berisi potongan dari berbagai jenis. Sering kali, potongan tidak akan menjadi unit data paling atomik untuk aliran tertentu. Misalnya, aliran byte dapat berisi chunk yang terdiri dari unit Uint8Array 16 KiB, bukan byte tunggal.

Aliran data yang dapat dibaca

Aliran yang dapat dibaca merepresentasikan sumber data yang dapat Anda baca. Dengan kata lain, data keluar dari aliran yang dapat dibaca. Secara konkret, aliran yang dapat dibaca adalah instance class ReadableStream.

Aliran data yang dapat ditulis

Aliran yang dapat ditulis merepresentasikan tujuan data yang dapat Anda tulis. Dengan kata lain, data masuk ke aliran yang dapat ditulis. Secara konkret, stream yang dapat ditulis adalah instance class WritableStream.

Transformasi aliran

Aliran transformasi terdiri dari pasangan aliran: aliran yang dapat ditulis, yang dikenal sebagai sisi yang dapat ditulis, dan aliran yang dapat dibaca, yang dikenal sebagai sisi yang dapat dibaca. Metafora dunia nyata untuk hal ini adalah penerjemah simultan yang menerjemahkan dari satu bahasa ke bahasa lain secara langsung. Dengan cara yang khusus untuk aliran transformasi, penulisan ke sisi yang dapat ditulis akan menghasilkan data baru yang tersedia untuk dibaca dari sisi yang dapat dibaca. Secara konkret, setiap objek dengan properti writable dan properti readable dapat berfungsi sebagai aliran transformasi. Namun, class TransformStream standar memudahkan pembuatan pasangan yang terjalin dengan benar.

Rantai pipa

Aliran terutama digunakan dengan menyalurkannya satu sama lain. Aliran yang dapat dibaca dapat disalurkan langsung ke aliran yang dapat ditulis, menggunakan metode pipeTo() aliran yang dapat dibaca, atau dapat disalurkan melalui satu atau beberapa aliran transformasi terlebih dahulu, menggunakan metode pipeThrough() aliran yang dapat dibaca. Kumpulan aliran yang disalurkan bersama dengan cara ini disebut sebagai rantai saluran.

Tekanan balik

Setelah rantai pipa dibuat, rantai tersebut akan menyebarkan sinyal mengenai seberapa cepat potongan harus mengalir melaluinya. Jika ada langkah dalam rantai yang belum dapat menerima chunk, langkah tersebut akan menyebarkan sinyal ke belakang melalui rantai saluran, hingga akhirnya sumber asli diminta untuk berhenti memproduksi chunk dengan cepat. Proses menormalisasi alur ini disebut tekanan balik.

Memulai

Aliran yang dapat dibaca dapat di-tee (dinamai sesuai bentuk 'T' huruf besar) menggunakan metode tee(). Tindakan ini akan mengunci stream, yaitu membuatnya tidak dapat digunakan secara langsung lagi; namun, tindakan ini akan membuat dua stream baru, yang disebut cabang, yang dapat digunakan secara terpisah. Teeing juga penting karena streaming tidak dapat di-rewind atau dimulai ulang. Kami akan membahasnya lebih lanjut nanti.

Diagram rantai pipe yang terdiri dari aliran yang dapat dibaca yang berasal dari panggilan ke fetch API yang kemudian di-pipe melalui aliran transformasi yang outputnya di-tee lalu dikirim ke browser untuk aliran yang dapat dibaca pertama dan ke cache service worker untuk aliran yang dapat dibaca kedua.
Rantai pipa.

Mekanisme aliran yang dapat dibaca

Aliran yang dapat dibaca adalah sumber data yang direpresentasikan di JavaScript oleh objek ReadableStream yang mengalir dari sumber pokok. Konstruktor ReadableStream() membuat dan menampilkan objek stream yang dapat dibaca dari handler yang diberikan. Ada dua jenis sumber pokok:

  • Sumber push terus-menerus mengirimkan data kepada Anda saat Anda mengaksesnya, dan Anda dapat memulai, menjeda, atau membatalkan akses ke aliran data. Contohnya mencakup live stream video, peristiwa yang dikirim oleh server, atau WebSockets.
  • Sumber penarikan mengharuskan Anda meminta data secara eksplisit dari sumber tersebut setelah terhubung. Contohnya mencakup operasi HTTP melalui panggilan fetch() atau XMLHttpRequest.

Data streaming dibaca secara berurutan dalam potongan kecil yang disebut chunk. Chunk yang ditempatkan dalam aliran dikatakan dalam antrean. Artinya, mereka menunggu dalam antrean dan siap dibaca. Antrean internal melacak chunk yang belum dibaca.

Strategi antrean adalah objek yang menentukan cara aliran harus memberi sinyal tekanan balik berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran untuk setiap bagian, dan membandingkan total ukuran semua bagian dalam antrean dengan jumlah yang ditentukan, yang dikenal sebagai batas atas.

Potongan dalam aliran dibaca oleh pembaca. Pembaca ini mengambil data satu bagian dalam satu waktu, sehingga Anda dapat melakukan jenis operasi apa pun yang ingin Anda lakukan. Pembaca ditambah kode pemrosesan lainnya yang menyertainya disebut konsumen.

Konstruk berikutnya dalam konteks ini disebut pengontrol. Setiap aliran yang dapat dibaca memiliki pengontrol terkait yang, seperti namanya, memungkinkan Anda mengontrol aliran.

Hanya satu pembaca yang dapat membaca aliran data pada satu waktu; saat pembaca dibuat dan mulai membaca aliran data (yaitu, menjadi pembaca aktif), pembaca tersebut akan dikunci ke aliran data tersebut. Jika Anda ingin pembaca lain mengambil alih pembacaan streaming, Anda biasanya perlu melepaskan pembaca pertama sebelum melakukan hal lain (meskipun Anda dapat menyiapkan streaming).

Membuat stream yang dapat dibaca

Anda membuat stream yang dapat dibaca dengan memanggil konstruktornya ReadableStream(). Konstruktor memiliki argumen opsional underlyingSource, yang merepresentasikan objek dengan metode dan properti yang menentukan perilaku instance stream yang dibuat.

underlyingSource

Hal ini dapat menggunakan metode opsional yang ditentukan developer berikut:

  • start(controller): Dipanggil segera saat objek dibuat. Metode dapat mengakses sumber streaming, dan melakukan hal lain yang diperlukan untuk menyiapkan fungsi streaming. Jika proses ini akan dilakukan secara asinkron, metode dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Parameter controller yang diteruskan ke metode ini adalah ReadableStreamDefaultController.
  • pull(controller): Dapat digunakan untuk mengontrol streaming saat lebih banyak chunk diambil. Fungsi ini dipanggil berulang kali selama antrean internal potongan streaming tidak penuh, hingga antrean mencapai tanda batas atasnya. Jika hasil pemanggilan pull() adalah promise, pull() tidak akan dipanggil lagi hingga promise tersebut terpenuhi. Jika promise ditolak, aliran data akan menjadi error.
  • cancel(reason): Dipanggil saat konsumen stream membatalkan stream.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController mendukung metode berikut:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Argumen kedua, yang juga opsional, dari konstruktor ReadableStream() adalah queuingStrategy. Objek ini secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua parameter:

  • highWaterMark: Angka non-negatif yang menunjukkan tanda air tinggi dari aliran yang menggunakan strategi antrean ini.
  • size(chunk): Fungsi yang menghitung dan menampilkan ukuran non-negatif terbatas dari nilai potongan yang diberikan. Hasilnya digunakan untuk menentukan tekanan balik, yang ditampilkan melalui properti ReadableStreamDefaultController.desiredSize yang sesuai. Hal ini juga mengatur kapan metode pull() sumber pokok dipanggil.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Metode getReader() dan read()

Untuk membaca dari aliran yang dapat dibaca, Anda memerlukan pembaca, yang akan menjadi ReadableStreamDefaultReader. Metode getReader() antarmuka ReadableStream membuat pembaca dan mengunci streaming ke pembaca tersebut. Saat aliran dikunci, tidak ada pembaca lain yang dapat diperoleh hingga pembaca ini dilepaskan.

Metode read() dari antarmuka ReadableStreamDefaultReader menampilkan promise yang memberikan akses ke potongan berikutnya dalam antrean internal stream. Metode ini akan memenuhi atau menolak dengan hasil bergantung pada status aliran. Kemungkinan yang berbeda adalah sebagai berikut:

  • Jika chunk tersedia, promise akan dipenuhi dengan objek formulir
    { value: chunk, done: false }.
  • Jika aliran ditutup, promise akan dipenuhi dengan objek dalam bentuk
    { value: undefined, done: true }.
  • Jika streaming mengalami error, promise akan ditolak dengan error yang relevan.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Properti locked

Anda dapat memeriksa apakah aliran yang dapat dibaca dikunci dengan mengakses properti ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Contoh kode aliran yang dapat dibaca

Contoh kode di bawah menunjukkan semua langkah yang sedang dilakukan. Pertama, Anda membuat ReadableStream yang dalam argumen underlyingSource (yaitu, class TimestampSource) menentukan metode start(). Metode ini memberi tahu controller aliran untuk enqueue() stempel waktu setiap detik selama sepuluh detik. Terakhir, ia memberi tahu pengontrol untuk close() streaming. Anda menggunakan aliran ini dengan membuat pembaca melalui metode getReader() dan memanggil read() hingga aliran menjadi done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iterasi asinkron

Memeriksa apakah aliran done pada setiap iterasi loop read() mungkin bukan API yang paling nyaman. Untungnya, akan segera ada cara yang lebih baik untuk melakukannya: iterasi asinkron.

for await (const chunk of stream) {
  console.log(chunk);
}

Solusi untuk menggunakan iterasi asinkron saat ini adalah dengan menerapkan perilaku dengan polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Mengatur waktu aliran yang dapat dibaca

Metode tee() dari antarmuka ReadableStream akan mengalirkan stream yang dapat dibaca saat ini, menampilkan array dua elemen yang berisi dua cabang yang dihasilkan sebagai instance ReadableStream baru. Hal ini memungkinkan dua pembaca membaca stream secara bersamaan. Anda dapat melakukannya, misalnya, di service worker jika Anda ingin mengambil respons dari server dan melakukan streaming ke browser, tetapi juga melakukan streaming ke cache service worker. Karena isi respons tidak dapat digunakan lebih dari sekali, Anda memerlukan dua salinan untuk melakukannya. Untuk membatalkan streaming, Anda harus membatalkan kedua cabang yang dihasilkan. Mengawali streaming biasanya akan menguncinya selama durasi, sehingga mencegah pembaca lain menguncinya.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Aliran byte yang dapat dibaca

Untuk aliran yang merepresentasikan byte, versi yang diperluas dari aliran yang dapat dibaca disediakan untuk menangani byte secara efisien, khususnya dengan meminimalkan salinan. Aliran byte memungkinkan akuisisi pembaca bawa buffer Anda sendiri (BYOB). Implementasi default dapat memberikan berbagai output yang berbeda seperti string atau buffer array dalam kasus WebSockets, sedangkan aliran byte menjamin output byte. Selain itu, pembaca BYOB memiliki manfaat stabilitas. Hal ini karena jika buffer dilepas, buffer dapat menjamin bahwa buffer tidak ditulis ke buffer yang sama dua kali, sehingga menghindari kondisi persaingan. Pembaca BYOB dapat mengurangi jumlah pembersihan sampah memori yang perlu dijalankan browser, karena dapat menggunakan kembali buffer.

Membuat aliran byte yang dapat dibaca

Anda dapat membuat aliran byte yang dapat dibaca dengan meneruskan parameter type tambahan ke konstruktor ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Sumber pokok dari aliran byte yang dapat dibaca diberi ReadableByteStreamController untuk dimanipulasi. Metode ReadableByteStreamController.enqueue() menggunakan argumen chunk yang nilainya adalah ArrayBufferView. Properti ReadableByteStreamController.byobRequest menampilkan pull request BYOB saat ini, atau null jika tidak ada. Terakhir, properti ReadableByteStreamController.desiredSize menampilkan ukuran yang diinginkan untuk mengisi antrean internal stream yang dikontrol.

queuingStrategy

Argumen kedua, yang juga opsional, dari konstruktor ReadableStream() adalah queuingStrategy. Objek ini secara opsional menentukan strategi antrean untuk streaming, yang menggunakan satu parameter:

  • highWaterMark: Jumlah byte non-negatif yang menunjukkan tanda batas atas aliran menggunakan strategi antrean ini. Hal ini digunakan untuk menentukan tekanan balik, yang terwujud melalui properti ReadableByteStreamController.desiredSize yang sesuai. Hal ini juga mengatur kapan metode pull() sumber pokok dipanggil.

Metode getReader() dan read()

Kemudian, Anda dapat memperoleh akses ke ReadableStreamBYOBReader dengan menetapkan parameter mode dengan tepat: ReadableStream.getReader({ mode: "byob" }). Hal ini memungkinkan kontrol yang lebih presisi atas alokasi buffer untuk menghindari salinan. Untuk membaca dari aliran byte, Anda perlu memanggil ReadableStreamBYOBReader.read(view), dengan view adalah ArrayBufferView.

Contoh kode aliran byte yang dapat dibaca

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Fungsi berikut menampilkan aliran byte yang dapat dibaca yang memungkinkan pembacaan tanpa salinan yang efisien dari array yang dibuat secara acak. Daripada menggunakan ukuran potongan yang telah ditentukan sebelumnya sebesar 1.024, metode ini mencoba mengisi buffer yang disediakan developer, sehingga memungkinkan kontrol penuh.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mekanisme aliran yang dapat ditulis

Aliran yang dapat ditulis adalah tujuan tempat Anda dapat menulis data, yang direpresentasikan di JavaScript oleh objek WritableStream. Hal ini berfungsi sebagai abstraksi di atas sink yang mendasarinya—sink I/O tingkat bawah tempat data mentah ditulis.

Data ditulis ke aliran melalui penulis, satu bagian dalam satu waktu. Potongan dapat memiliki berbagai bentuk, seperti potongan dalam pembaca. Anda dapat menggunakan kode apa pun yang Anda suka untuk menghasilkan potongan yang siap ditulis; penulis beserta kode terkait disebut produser.

Saat pembuat dibuat dan mulai menulis ke aliran (penulis aktif), pembuat tersebut dikatakan dikunci ke aliran tersebut. Hanya satu penulis yang dapat menulis ke stream yang dapat ditulis pada satu waktu. Jika Anda ingin penulis lain mulai menulis ke aliran Anda, biasanya Anda perlu melepaskannya, sebelum Anda melampirkan penulis lain ke aliran tersebut.

Antrean internal melacak potongan yang telah ditulis ke aliran, tetapi belum diproses oleh sink yang mendasarinya.

Strategi antrean adalah objek yang menentukan cara aliran harus memberi sinyal tekanan balik berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran untuk setiap bagian, dan membandingkan total ukuran semua bagian dalam antrean dengan jumlah yang ditentukan, yang dikenal sebagai batas atas.

Konstruksi akhir disebut pengontrol. Setiap stream yang dapat ditulis memiliki pengontrol terkait yang memungkinkan Anda mengontrol stream (misalnya, untuk membatalkannya).

Membuat stream yang dapat ditulis

Antarmuka WritableStream dari Streams API menyediakan abstraksi standar untuk menulis data streaming ke tujuan, yang dikenal sebagai sink. Objek ini dilengkapi dengan backpressure dan antrean bawaan. Anda membuat stream yang dapat ditulis dengan memanggil konstruktornya WritableStream(). Metode ini memiliki parameter underlyingSink opsional, yang merepresentasikan objek dengan metode dan properti yang menentukan perilaku instance stream yang dibuat.

underlyingSink

underlyingSink dapat mencakup metode opsional yang ditentukan developer berikut. Parameter controller yang diteruskan ke beberapa metode adalah WritableStreamDefaultController.

  • start(controller): Metode ini dipanggil segera saat objek dibuat. Isi metode ini harus bertujuan untuk mendapatkan akses ke sink pokok. Jika proses ini akan dilakukan secara asinkron, proses ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan.
  • write(chunk, controller): Metode ini akan dipanggil saat potongan data baru (yang ditentukan dalam parameter chunk) siap ditulis ke sink yang mendasarinya. Metode ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan operasi tulis. Metode ini hanya akan dipanggil setelah penulisan sebelumnya berhasil, dan tidak pernah setelah aliran ditutup atau dibatalkan.
  • close(controller): Metode ini akan dipanggil jika aplikasi memberi sinyal bahwa aplikasi telah selesai menulis chunk ke aliran. Konten harus melakukan apa pun yang diperlukan untuk menyelesaikan penulisan ke sink yang mendasarinya, dan melepaskan akses ke sink tersebut. Jika proses ini asinkron, proses ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Metode ini hanya akan dipanggil setelah semua penulisan yang diantrekan berhasil.
  • abort(reason): Metode ini akan dipanggil jika aplikasi memberi sinyal bahwa aplikasi ingin menutup streaming secara tiba-tiba dan menempatkannya dalam status error. Metode ini dapat membersihkan resource yang ditahan, seperti close(), tetapi abort() akan dipanggil meskipun operasi tulis diantrekan. Chunk tersebut akan dibuang. Jika proses ini asinkron, proses dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Parameter reason berisi DOMString yang menjelaskan alasan streaming dibatalkan.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Antarmuka WritableStreamDefaultController Streams API merepresentasikan pengontrol yang memungkinkan kontrol status WritableStream selama penyiapan, saat lebih banyak potongan dikirimkan untuk penulisan, atau di akhir penulisan. Saat membuat WritableStream, sink yang mendasarinya diberi instance WritableStreamDefaultController yang sesuai untuk dimanipulasi. WritableStreamDefaultController hanya memiliki satu metode: WritableStreamDefaultController.error(), yang menyebabkan error pada interaksi mendatang dengan aliran data terkait. WritableStreamDefaultController juga mendukung properti signal yang menampilkan instance AbortSignal, sehingga operasi WritableStream dapat dihentikan jika diperlukan.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Argumen kedua, yang juga opsional, dari konstruktor WritableStream() adalah queuingStrategy. Objek ini secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua parameter:

  • highWaterMark: Angka non-negatif yang menunjukkan tanda air tinggi dari aliran yang menggunakan strategi antrean ini.
  • size(chunk): Fungsi yang menghitung dan menampilkan ukuran non-negatif terbatas dari nilai potongan yang diberikan. Hasilnya digunakan untuk menentukan tekanan balik, yang ditampilkan melalui properti WritableStreamDefaultWriter.desiredSize yang sesuai.

Metode getWriter() dan write()

Untuk menulis ke stream yang dapat ditulis, Anda memerlukan penulis, yang akan berupa WritableStreamDefaultWriter. Metode getWriter() antarmuka WritableStream menampilkan instance baru WritableStreamDefaultWriter dan mengunci aliran ke instance tersebut. Saat stream dikunci, penulis lain tidak dapat diperoleh hingga penulis saat ini dilepaskan.

Metode write() dari antarmuka WritableStreamDefaultWriter menulis potongan data yang diteruskan ke WritableStream dan sink dasarnya, lalu menampilkan promise yang diselesaikan untuk menunjukkan keberhasilan atau kegagalan operasi tulis. Perhatikan bahwa arti "berhasil" bergantung pada sink yang mendasarinya; hal ini mungkin menunjukkan bahwa chunk telah diterima, dan tidak berarti chunk tersebut disimpan dengan aman ke tujuan akhirnya.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Properti locked

Anda dapat memeriksa apakah aliran data yang dapat ditulis dikunci dengan mengakses properti WritableStream.locked-nya.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Contoh kode stream yang dapat ditulis

Contoh kode di bawah menunjukkan semua langkah yang sedang dilakukan.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Menyalurkan aliran yang dapat dibaca ke aliran yang dapat ditulis

Stream yang dapat dibaca dapat disalurkan ke stream yang dapat ditulis melalui metode pipeTo() stream yang dapat dibaca. ReadableStream.pipeTo() menyalurkan ReadableStream saat ini ke WritableStream tertentu dan menampilkan promise yang terpenuhi saat proses penyaluran berhasil diselesaikan, atau menolak jika ada error yang ditemukan.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Membuat stream transformasi

Antarmuka TransformStream Streams API merepresentasikan sekumpulan data yang dapat diubah. Anda membuat aliran transformasi dengan memanggil konstruktor TransformStream(), yang membuat dan menampilkan objek aliran transformasi dari handler yang diberikan. Konstruktor TransformStream() menerima sebagai argumen pertamanya objek JavaScript opsional yang merepresentasikan transformer. Objek tersebut dapat berisi salah satu metode berikut:

transformer

  • start(controller): Metode ini dipanggil segera saat objek dibuat. Biasanya, perintah ini digunakan untuk mengantrekan potongan awalan, menggunakan controller.enqueue(). Chunk tersebut akan dibaca dari sisi yang dapat dibaca, tetapi tidak bergantung pada penulisan apa pun ke sisi yang dapat ditulis. Jika proses awal ini bersifat asinkron, misalnya karena memerlukan upaya untuk mendapatkan potongan awalan, fungsi dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan; promise yang ditolak akan menyebabkan error pada aliran. Setiap pengecualian yang ditampilkan akan ditampilkan ulang oleh konstruktor TransformStream().
  • transform(chunk, controller): Metode ini dipanggil saat chunk baru yang awalnya ditulis ke sisi yang dapat ditulis siap diubah. Implementasi streaming menjamin bahwa fungsi ini hanya akan dipanggil setelah transformasi sebelumnya berhasil, dan tidak pernah sebelum start() selesai atau setelah flush() dipanggil. Fungsi ini melakukan pekerjaan transformasi yang sebenarnya dari aliran transformasi. Hasil dapat dimasukkan dalam antrean menggunakan controller.enqueue(). Hal ini memungkinkan satu bagian yang ditulis ke sisi yang dapat ditulis menghasilkan nol atau beberapa bagian di sisi yang dapat dibaca, bergantung pada berapa kali controller.enqueue() dipanggil. Jika proses transformasi bersifat asinkron, fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan transformasi. Promise yang ditolak akan menyebabkan error pada sisi yang dapat dibaca dan dapat ditulis dari aliran transformasi. Jika tidak ada metode transform() yang diberikan, transformasi identitas akan digunakan, yang mengantrekan chunk yang tidak berubah dari sisi yang dapat ditulis ke sisi yang dapat dibaca.
  • flush(controller): Metode ini dipanggil setelah semua bagian yang ditulis ke sisi yang dapat ditulis telah diubah dengan berhasil melewati transform(), dan sisi yang dapat ditulis akan ditutup. Biasanya, hal ini digunakan untuk mengantrekan potongan akhiran ke sisi yang dapat dibaca, sebelum sisi tersebut juga ditutup. Jika proses penghapusan bersifat asinkron, fungsi dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan; hasilnya akan dikomunikasikan kepada pemanggil stream.writable.write(). Selain itu, promise yang ditolak akan menyebabkan error pada sisi stream yang dapat dibaca dan ditulis. Menampilkan pengecualian diperlakukan sama dengan menampilkan promise yang ditolak.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Strategi antrean writableStrategy dan readableStrategy

Parameter opsional kedua dan ketiga dari konstruktor TransformStream() adalah strategi antrean writableStrategy dan readableStrategy opsional. Proses tersebut didefinisikan seperti yang diuraikan di bagian dapat dibaca dan dapat ditulis.

Contoh kode aliran transformasi

Contoh kode berikut menunjukkan cara kerja aliran transformasi sederhana.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Menyalurkan aliran yang dapat dibaca melalui aliran transformasi

Metode pipeThrough() dari antarmuka ReadableStream menyediakan cara yang dapat dirangkai untuk menyalurkan aliran saat ini melalui aliran transformasi atau pasangan yang dapat ditulis/dibaca lainnya. Menyalurkan stream umumnya akan menguncinya selama durasi saluran, sehingga mencegah pembaca lain menguncinya.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Contoh kode berikutnya (sedikit dibuat-buat) menunjukkan cara menerapkan versi "berteriak" dari fetch() yang mengubah semua teks menjadi huruf besar dengan menggunakan respons promise yang ditampilkan sebagai aliran dan mengubah huruf besar per bagian. Keuntungan dari pendekatan ini adalah Anda tidak perlu menunggu seluruh dokumen didownload, yang dapat membuat perbedaan besar saat menangani file besar.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

Demo di bawah menunjukkan cara kerja aliran yang dapat dibaca, ditulis, dan diubah. Dokumen ini juga menyertakan contoh rantai pipa pipeThrough() dan pipeTo(), serta menunjukkan tee(). Anda dapat menjalankan demo di jendelanya sendiri atau melihat kode sumber.

Aliran yang berguna tersedia di browser

Ada sejumlah aliran berguna yang langsung tersedia di browser. Anda dapat membuat ReadableStream dari blob dengan mudah. Metode stream() antarmuka Blob menampilkan ReadableStream yang setelah dibaca akan menampilkan data yang ada dalam blob. Ingat juga bahwa objek File adalah jenis Blob tertentu, dan dapat digunakan dalam konteks apa pun yang dapat dilakukan blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Varian streaming TextDecoder.decode() dan TextEncoder.encode() masing-masing disebut TextDecoderStream dan TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Mengompresi atau mendekompresi file menjadi mudah dengan aliran transformasi CompressionStream dan DecompressionStream. Contoh kode di bawah menunjukkan cara mendownload spesifikasi Streams, mengompresinya (gzip) langsung di browser, dan menulis file yang dikompresi langsung ke disk.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access API FileSystemWritableFileStream dan aliran permintaan fetch() eksperimental adalah contoh aliran yang dapat ditulis di luar sana.

Serial API banyak menggunakan aliran yang dapat dibaca dan ditulis.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Terakhir, API WebSocketStream mengintegrasikan streaming dengan WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Referensi yang berguna

Ucapan terima kasih

Artikel ini ditinjau oleh Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley, dan Adam Rice. Postingan blog Jake Archibald sangat membantu saya dalam memahami streaming. Beberapa contoh kode terinspirasi oleh eksplorasi pengguna GitHub @bellbind dan sebagian besar teksnya dibangun berdasarkan MDN Web Docs on Streams. Penulis Streams Standard telah melakukan pekerjaan yang luar biasa dalam menulis spesifikasi ini. Gambar unggulan oleh Ryan Lara di Unsplash.