Streaming—Panduan definitif

Pelajari cara menggunakan stream yang dapat dibaca, ditulis, dan mentransformasi dengan Streams API.

Dengan Streams API, Anda dapat mengakses secara terprogram aliran data yang diterima melalui jaringan atau dibuat dengan cara apa pun secara lokal dan memprosesnya dengan JavaScript. Streaming berarti memecah resource yang ingin Anda terima, kirim, atau ubah menjadi potongan-potongan kecil, lalu pemrosesan potongan ini sedikit demi sedikit. Meskipun streaming adalah sesuatu yang dilakukan browser saat menerima aset seperti HTML atau video untuk ditampilkan di halaman web, kemampuan ini belum pernah tersedia untuk JavaScript sebelum fetch dengan streaming diperkenalkan pada tahun 2015.

Sebelumnya, jika ingin memproses jenis resource (baik itu video atau file teks, dll.), Anda harus mendownload seluruh file, menunggu hingga dideserialisasi ke dalam format yang sesuai, lalu memprosesnya. Dengan stream yang tersedia untuk JavaScript, ini semua akan berubah. Anda sekarang dapat memproses data mentah dengan JavaScript secara bertahap segera setelah tersedia di klien, tanpa perlu membuat buffer, string, atau blob. Tindakan ini akan membuka sejumlah kasus penggunaan, beberapa di antaranya saya cantumkan di bawah:

  • Efek video: menyisipkan streaming video yang dapat dibaca melalui streaming transformasi yang menerapkan efek secara real time.
  • Data (de)kompresi: menghubungkan aliran file melalui stream transformasi yang secara selektif (mende) mengompresinya.
  • Dekode gambar: menghubungkan aliran respons HTTP melalui aliran transformasi yang mendekode byte menjadi data bitmap, lalu melalui aliran transformasi lain yang menerjemahkan bitmap menjadi PNG. Jika diinstal di dalam pengendali fetch dari pekerja layanan, Anda dapat mem-polyfill format gambar baru seperti AVIF secara transparan.

Dukungan browser

ReadableStream dan WritableStream

Dukungan Browser

  • 43
  • 14
  • 65
  • 10.1

Sumber

TransformStream

Dukungan Browser

  • 67
  • 79
  • 102
  • 14.1

Sumber

Konsep inti

Sebelum membahas berbagai jenis streaming secara mendetail, saya akan memperkenalkan beberapa konsep inti.

Potongan

Potongan adalah satu bagian dari data yang ditulis atau dibaca dari aliran data. Jenis apa pun; {i>stream<i} bahkan dapat berisi potongan-potongan dari jenis yang berbeda. Sering kali, potongan tidak akan menjadi unit data paling atomik untuk aliran data tertentu. Misalnya, aliran byte mungkin berisi potongan yang terdiri dari 16 unit Uint8Array KiB, bukan byte tunggal.

Streaming yang dapat dibaca

Aliran data yang dapat dibaca mewakili sumber data yang dapat Anda baca. Dengan kata lain, data keluar dari aliran yang dapat dibaca. Konkretnya, aliran yang dapat dibaca adalah instance class ReadableStream.

Feed yang dapat ditulis

Aliran yang dapat ditulis mewakili tujuan untuk data yang dapat Anda tulis. Dengan kata lain, data masuk ke aliran yang dapat ditulis. Secara konkret, stream yang dapat ditulis adalah instance dari class WritableStream.

Mentransformasi streaming

Aliran transformasi terdiri dari sepasang aliran: aliran yang dapat ditulis, yang dikenal sebagai sisi yang dapat ditulis, dan aliran yang dapat dibaca, yang dikenal sebagai sisi yang dapat dibaca. Metafora di dunia nyata untuk hal ini adalah penerjemah simultan yang menerjemahkan dari satu bahasa ke bahasa lain dengan cepat. Dengan cara yang khusus untuk aliran transformasi, menulis ke sisi yang dapat ditulis akan membuat data baru tersedia untuk dibaca dari sisi yang dapat dibaca. Secara konkret, objek apa pun dengan properti writable dan properti readable dapat berfungsi sebagai aliran transformasi. Namun, class TransformStream standar mempermudah pembuatan pasangan yang terjerat dengan tepat.

Rantai pipa

Streaming terutama digunakan dengan menghubungkan streaming tersebut ke satu sama lain. Stream yang dapat dibaca dapat diarahkan langsung ke stream yang dapat ditulis, menggunakan metode pipeTo() stream yang dapat dibaca, atau dapat disalurkan melalui satu atau beberapa stream transformasi terlebih dahulu, menggunakan metode pipeThrough() stream yang dapat dibaca tersebut. Kumpulan aliran yang disalurkan bersama-sama dengan cara ini disebut sebagai rantai pipe.

Tekanan Balik

Setelah rantai pipa dibuat, rantai tersebut akan menyebarkan sinyal terkait seberapa cepat potongan harus mengalir melaluinya. Jika salah satu langkah dalam rantai tersebut belum dapat menerima potongan, langkah tersebut akan menyebarkan sinyal ke belakang melalui rantai pipa, hingga akhirnya sumber asli diberi tahu untuk berhenti menghasilkan potongan dengan begitu cepat. Proses normalisasi aliran ini disebut tekanan balik.

Kaus

Stream yang dapat dibaca dapat di-teed (dinamai berdasarkan bentuk huruf besar 'T') menggunakan metode tee(). Tindakan ini akan mengunci aliran data, sehingga tidak dapat digunakan lagi secara langsung; namun, tindakan ini akan membuat dua aliran data baru, yang disebut cabang, yang dapat digunakan secara terpisah. Teeing juga penting karena streaming tidak dapat diputar ulang atau dimulai ulang, hal ini akan dibahas nanti.

Diagram rantai pipa yang terdiri dari streaming yang dapat dibaca yang berasal dari panggilan ke API pengambilan yang kemudian disalurkan melalui aliran transformasi yang output-nya di-teed, lalu dikirim ke browser untuk streaming pertama yang dapat dibaca dan ke cache pekerja layanan untuk aliran yang dapat dibaca kedua.
Rantai pipa.

Mekanisme streaming yang dapat dibaca

Aliran yang dapat dibaca adalah sumber data yang direpresentasikan dalam JavaScript oleh objek ReadableStream yang mengalir dari sumber pokok. Konstruktor ReadableStream() membuat dan menampilkan objek streaming yang dapat dibaca dari pengendali yang diberikan. Ada dua jenis sumber dasar:

  • Sumber push terus-menerus mengirimkan data saat Anda mengaksesnya, dan terserah Anda untuk memulai, menjeda, atau membatalkan akses ke aliran data. Contohnya mencakup streaming video live, peristiwa yang dikirim server, atau WebSocket.
  • Sumber pengambilan mengharuskan Anda untuk secara eksplisit meminta data dari sumber tersebut setelah terhubung. Contohnya mencakup operasi HTTP melalui panggilan fetch() atau XMLHttpRequest.

Data streaming dibaca secara berurutan dalam potongan-potongan kecil yang disebut potongan. Potongan yang ditempatkan dalam aliran data dikatakan diantrekan. Ini berarti mereka sedang menunggu dalam antrean siap untuk dibaca. Antrean internal melacak potongan yang belum dibaca.

Strategi antrean adalah objek yang menentukan cara streaming seharusnya memberikan sinyal tekanan balik berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran ke setiap bagian, dan membandingkan ukuran total bagian dalam antrean dengan angka yang ditentukan, yang dikenal sebagai tanda air tinggi.

Potongan-potongan di dalam streaming tersebut dibaca oleh pembaca. Pembaca ini mengambil data satu per satu, sehingga Anda dapat melakukan jenis operasi apa pun yang ingin dilakukan pada data tersebut. Pembaca dan kode pemrosesan lain yang menyertainya disebut konsumen.

Konstruksi berikutnya dalam konteks ini disebut pengontrol. Setiap aliran data yang dapat dibaca memiliki pengontrol terkait yang, seperti namanya, memungkinkan Anda untuk mengontrol aliran.

Hanya satu pembaca yang dapat membaca streaming pada satu waktu; saat pembaca dibuat dan mulai membaca streaming (yaitu, menjadi pembaca aktif), pembaca akan terkunci pada streaming tersebut. Jika ingin pembaca lain mengambil alih membaca streaming, Anda biasanya perlu melepaskan pembaca pertama sebelum melakukan hal lainnya (meskipun Anda dapat melakukan tee streaming).

Membuat feed yang dapat dibaca

Anda membuat aliran yang dapat dibaca dengan memanggil konstruktornya, ReadableStream(). Konstruktor ini memiliki argumen opsional underlyingSource, yang mewakili sebuah objek dengan metode dan properti yang menentukan perilaku dari instance streaming yang dibuat.

underlyingSource

Hal ini dapat menggunakan metode opsional yang ditentukan developer berikut:

  • start(controller): Dipanggil langsung saat objek dibuat. Metode ini dapat mengakses sumber streaming, dan melakukan hal lain yang diperlukan untuk menyiapkan fungsi streaming. Jika proses ini dilakukan secara asinkron, metode ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Parameter controller yang diteruskan ke metode ini adalah ReadableStreamDefaultController.
  • pull(controller): Dapat digunakan untuk mengontrol streaming saat lebih banyak potongan diambil. Metode ini akan dipanggil berulang kali selama antrean internal potongan streaming tidak penuh, hingga antrean mencapai tanda air tinggi. Jika hasil pemanggilan pull() adalah promise, pull() tidak akan dipanggil lagi hingga promise tersebut terpenuhi. Jika promise ditolak, streaming akan mengalami error.
  • cancel(reason): Dipanggil saat konsumen streaming membatalkan streaming.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController mendukung metode berikut:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Argumen kedua, yang juga bersifat opsional, dari konstruktor ReadableStream() adalah queuingStrategy. Objek ini secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua parameter:

  • highWaterMark: Angka non-negatif yang menunjukkan tanda air tinggi pada aliran menggunakan strategi antrean ini.
  • size(chunk): Fungsi yang menghitung dan menampilkan ukuran non-negatif terbatas dari nilai potongan yang diberikan. Hasilnya digunakan untuk menentukan tekanan balik, yang diwujudkan melalui properti ReadableStreamDefaultController.desiredSize yang sesuai. Ini juga mengatur kapan metode pull() sumber dasar dipanggil.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Metode getReader() dan read()

Untuk membaca dari aliran yang dapat dibaca, Anda memerlukan alat pembaca, yang akan berupa ReadableStreamDefaultReader. Metode getReader() antarmuka ReadableStream membuat pembaca dan mengunci aliran data ke pembaca. Saat streaming dikunci, tidak ada pembaca lain yang dapat diperoleh hingga pembaca ini dirilis.

Metode read() dari antarmuka ReadableStreamDefaultReader menampilkan promise yang memberikan akses ke bagian berikutnya dalam antrean internal aliran data. Fungsi ini memenuhi atau menolak dengan hasil, bergantung pada status streaming. Berikut ini beberapa kemungkinan yang tersedia:

  • Jika ada bagian yang tersedia, promise tersebut akan dipenuhi dengan objek berbentuk
    { value: chunk, done: false }.
  • Jika aliran data ditutup, promise akan dipenuhi dengan objek berbentuk
    { value: undefined, done: true }.
  • Jika streaming tersebut mengalami error, promise akan ditolak dengan error yang relevan.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Properti locked

Anda dapat memeriksa apakah streaming yang dapat dibaca dikunci dengan mengakses properti ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Contoh kode streaming yang dapat dibaca

Contoh kode di bawah ini menunjukkan semua langkah yang dilakukan. Pertama-tama, buat ReadableStream bahwa dalam argumen underlyingSource-nya (yaitu, class TimestampSource) menentukan metode start(). Metode ini memberi tahu controller aliran data ke enqueue() stempel waktu setiap detik selama sepuluh detik. Terakhir, kode ini akan memberi tahu pengontrol untuk melakukan close() aliran data. Gunakan streaming ini dengan membuat pembaca melalui metode getReader(), lalu memanggil read() hingga streaming tersebut menjadi done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iterasi asinkron

Memeriksa setiap iterasi loop read() jika stream-nya adalah done mungkin bukan API yang paling praktis. Untungnya, akan segera ada cara yang lebih baik untuk melakukan ini: iterasi asinkron.

for await (const chunk of stream) {
  console.log(chunk);
}

Solusi untuk menggunakan iterasi asinkron saat ini adalah dengan menerapkan perilaku dengan polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Menyesuaikan feed untuk feed yang dapat dibaca

Metode tee() dari antarmuka ReadableStream mengarahkan aliran yang dapat dibaca saat ini, yang menampilkan array dua elemen yang berisi dua cabang yang dihasilkan sebagai instance ReadableStream baru. Hal ini memungkinkan dua pembaca membaca streaming secara bersamaan. Anda dapat melakukan hal ini, misalnya, pada pekerja layanan jika ingin mengambil respons dari server dan menstreamingnya ke browser, serta melakukan streaming ke cache pekerja layanan. Karena isi respons tidak dapat digunakan lebih dari sekali, Anda memerlukan dua salinan untuk melakukannya. Untuk membatalkan aliran data, Anda harus membatalkan kedua cabang yang dihasilkan. Pemberian rating pada aliran umumnya akan menguncinya selama durasi tersebut, sehingga pembaca lain tidak dapat menguncinya.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Streaming byte yang dapat dibaca

Untuk streaming yang merepresentasikan byte, versi panjang dari streaming yang dapat dibaca disediakan untuk menangani byte secara efisien, khususnya dengan meminimalkan salinan. Dengan streaming byte, pembaca bring your own buffer (BYOB) dapat diperoleh. Implementasi default dapat memberikan berbagai output yang berbeda, seperti string atau buffer array dalam kasus WebSockets, sedangkan streaming byte menjamin output byte. Selain itu, pembaca BYOB memiliki manfaat stabilitas. Hal ini karena jika buffer terlepas, ada jaminan bahwa buffer tersebut tidak akan ditulis ke buffer yang sama dua kali, sehingga menghindari kondisi race. Pembaca BYOB dapat mengurangi jumlah waktu yang diperlukan browser untuk menjalankan pembersihan sampah memori, karena dapat menggunakan kembali buffer.

Membuat aliran byte yang dapat dibaca

Anda dapat membuat aliran byte yang dapat dibaca dengan meneruskan parameter type tambahan ke konstruktor ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Sumber dasar aliran byte yang dapat dibaca diberi ReadableByteStreamController untuk dimanipulasi. Metode ReadableByteStreamController.enqueue() mengambil argumen chunk yang nilainya adalah ArrayBufferView. Properti ReadableByteStreamController.byobRequest menampilkan permintaan pull BYOB saat ini, atau null jika tidak ada. Terakhir, properti ReadableByteStreamController.desiredSize akan menampilkan ukuran yang diinginkan untuk mengisi antrean internal aliran data yang dikontrol.

queuingStrategy

Argumen kedua, yang juga bersifat opsional, dari konstruktor ReadableStream() adalah queuingStrategy. Ini adalah objek yang secara opsional menentukan strategi antrean untuk aliran data, yang memerlukan satu parameter:

  • highWaterMark: Jumlah byte non-negatif yang menunjukkan tanda air tinggi pada aliran menggunakan strategi antrean ini. Ini digunakan untuk menentukan tekanan balik, yang diwujudkan melalui properti ReadableByteStreamController.desiredSize yang sesuai. Ini juga mengatur kapan metode pull() sumber dasar dipanggil.

Metode getReader() dan read()

Anda kemudian bisa mendapatkan akses ke ReadableStreamBYOBReader dengan menetapkan parameter mode yang sesuai: ReadableStream.getReader({ mode: "byob" }). Hal ini memungkinkan kontrol alokasi buffer yang lebih akurat untuk menghindari salinan. Untuk membaca dari aliran byte, Anda perlu memanggil ReadableStreamBYOBReader.read(view), dengan view sebagai ArrayBufferView.

Contoh kode aliran byte yang dapat dibaca

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Fungsi berikut menampilkan aliran byte yang dapat dibaca yang memungkinkan pembacaan zero-copy yang efisien dari array yang dihasilkan secara acak. Alih-alih menggunakan ukuran potongan 1.024 yang telah ditentukan, fungsi ini mencoba mengisi buffer yang disediakan developer, sehingga memungkinkan kontrol penuh.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mekanisme streaming yang dapat ditulis

Stream yang dapat ditulis adalah tujuan tempat Anda dapat menulis data, yang diwakili dalam JavaScript oleh objek WritableStream. Fungsi ini berfungsi sebagai abstraksi di bagian atas sink yang mendasarinya—sink I/O tingkat rendah tempat data mentah ditulis.

Data ditulis ke streaming melalui writer, satu bagian per satu bagian. Sebuah potongan data dapat memiliki banyak bentuk, sama seperti potongan di pembaca. Anda dapat menggunakan kode apa pun yang diinginkan untuk menghasilkan potongan yang siap ditulis; penulis beserta kode terkait disebut produser.

Saat penulis dibuat dan mulai menulis ke aliran data (penulis aktif), penulis dianggap terkunci pada aliran tersebut. Hanya satu penulis yang dapat menulis ke aliran yang dapat ditulis dalam satu waktu. Jika ingin penulis lain mulai menulis ke streaming, Anda biasanya perlu melepaskannya, sebelum melampirkan penulis lain ke penulis tersebut.

Antrean internal melacak potongan yang telah ditulis ke aliran data, tetapi belum diproses oleh sink yang mendasarinya.

Strategi antrean adalah objek yang menentukan cara streaming seharusnya memberikan sinyal tekanan balik berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran ke setiap bagian, dan membandingkan ukuran total bagian dalam antrean dengan angka yang ditentukan, yang dikenal sebagai tanda air tinggi.

Konstruksi akhir disebut pengontrol. Setiap streaming yang dapat ditulis memiliki pengontrol terkait yang memungkinkan Anda mengontrol streaming (misalnya, untuk membatalkannya).

Membuat streaming yang dapat ditulis

Antarmuka WritableStream Streams API menyediakan abstraksi standar untuk menulis data streaming ke tujuan, yang dikenal sebagai sink. Objek ini dilengkapi dengan backpressure dan antrean bawaan. Anda akan membuat stream yang dapat ditulis dengan memanggil konstruktornya WritableStream(). Class ini memiliki parameter underlyingSink opsional, yang mewakili sebuah objek dengan metode dan properti yang menentukan perilaku instance aliran data yang dibuat.

underlyingSink

underlyingSink dapat menyertakan metode opsional berikut yang ditentukan developer. Parameter controller yang diteruskan ke beberapa metode adalah WritableStreamDefaultController.

  • start(controller): Metode ini langsung dipanggil saat objek dibuat. Konten metode ini harus ditujukan untuk mendapatkan akses ke sink yang mendasarinya. Jika proses ini dilakukan secara asinkron, proses ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan.
  • write(chunk, controller): Metode ini akan dipanggil saat potongan data baru (yang ditentukan dalam parameter chunk) siap ditulis ke sink yang mendasarinya. Fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan operasi tulis. Metode ini hanya akan dipanggil setelah penulisan sebelumnya berhasil, dan tidak pernah setelah streaming ditutup atau dibatalkan.
  • close(controller): Metode ini akan dipanggil jika aplikasi menandakan bahwa aplikasi telah selesai menulis potongan ke aliran data. Konten harus melakukan apa pun yang diperlukan untuk menyelesaikan penulisan ke sink yang mendasarinya, dan melepaskan akses ke dalamnya. Jika proses ini asinkron, proses dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Metode ini hanya akan dipanggil setelah semua operasi tulis yang diantrekan berhasil.
  • abort(reason): Metode ini akan dipanggil jika aplikasi memberikan sinyal bahwa aplikasi ingin menutup aliran data secara tiba-tiba dan menempatkannya dalam status error. Tindakan ini dapat membersihkan resource yang ditahan, seperti close(), tetapi abort() akan dipanggil meskipun operasi tulis diantrekan. Potongan tersebut akan dibuang. Jika proses ini asinkron, proses dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Parameter reason berisi DOMString yang menjelaskan alasan aliran data dibatalkan.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Antarmuka WritableStreamDefaultController dari Streams API mewakili pengontrol yang memungkinkan kontrol atas status WritableStream selama penyiapan, saat lebih banyak potongan dikirimkan untuk ditulis, atau di akhir penulisan. Saat membuat WritableStream, sink yang mendasarinya diberi instance WritableStreamDefaultController yang sesuai untuk dimanipulasi. WritableStreamDefaultController hanya memiliki satu metode: WritableStreamDefaultController.error(), yang menyebabkan error pada interaksi mendatang dengan aliran data yang terkait. WritableStreamDefaultController juga mendukung properti signal yang menampilkan instance AbortSignal, sehingga operasi WritableStream dapat dihentikan jika diperlukan.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Argumen kedua, yang juga bersifat opsional, dari konstruktor WritableStream() adalah queuingStrategy. Objek ini secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua parameter:

  • highWaterMark: Angka non-negatif yang menunjukkan tanda air tinggi pada aliran menggunakan strategi antrean ini.
  • size(chunk): Fungsi yang menghitung dan menampilkan ukuran non-negatif terbatas dari nilai potongan yang diberikan. Hasilnya digunakan untuk menentukan tekanan balik, yang diwujudkan melalui properti WritableStreamDefaultWriter.desiredSize yang sesuai.

Metode getWriter() dan write()

Untuk menulis ke aliran yang dapat ditulis, Anda memerlukan penulis, yang akan berupa WritableStreamDefaultWriter. Metode getWriter() dari antarmuka WritableStream akan menampilkan instance baru WritableStreamDefaultWriter dan mengunci aliran ke instance tersebut. Saat streaming dikunci, tidak ada penulis lain yang dapat diperoleh hingga penulis saat ini dirilis.

Metode write() dari antarmuka WritableStreamDefaultWriter menulis potongan data yang diteruskan ke WritableStream dan sink yang mendasarinya, lalu menampilkan promise yang di-resolve untuk menunjukkan keberhasilan atau kegagalan operasi tulis. Perlu diperhatikan bahwa arti "berhasil" bergantung pada sink yang mendasarinya; hal ini mungkin menunjukkan bahwa potongan telah diterima, dan belum tentu bahwa potongan tersebut telah disimpan dengan aman ke tujuan akhirnya.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Properti locked

Anda dapat memeriksa apakah stream yang dapat ditulis dikunci dengan mengakses properti WritableStream.locked-nya.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Contoh kode streaming yang dapat ditulis

Contoh kode di bawah ini menunjukkan semua langkah yang dilakukan.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Menambahkan stream yang dapat dibaca ke stream yang dapat ditulis

Stream yang dapat dibaca dapat diarahkan ke stream yang dapat ditulis melalui metode pipeTo() stream yang dapat dibaca. ReadableStream.pipeTo() menyalurkan ReadableStream saat ini ke WritableStream yang ditentukan dan menampilkan promise yang terpenuhi saat proses pemipaan berhasil diselesaikan, atau menolak jika terjadi error.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Membuat stream transformasi

Antarmuka TransformStream Streams API mewakili sekumpulan data yang dapat ditransformasi. Anda akan membuat aliran transformasi dengan memanggil TransformStream() konstruktornya, yang membuat dan menampilkan objek aliran transformasi dari pengendali yang diberikan. Konstruktor TransformStream() menerima objek JavaScript opsional yang mewakili transformer sebagai argumen pertamanya. Objek tersebut dapat berisi salah satu metode berikut:

transformer

  • start(controller): Metode ini langsung dipanggil saat objek dibuat. Biasanya ini digunakan untuk mengantrekan potongan awalan, menggunakan controller.enqueue(). Potongan tersebut akan dibaca dari sisi yang dapat dibaca, tetapi tidak bergantung pada penulisan ke sisi yang dapat ditulis. Jika proses awal ini asinkron, misalnya karena memerlukan upaya untuk memperoleh potongan awalan, fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan; promise yang ditolak akan melakukan error pada streaming. Setiap pengecualian yang ditampilkan akan ditampilkan ulang oleh konstruktor TransformStream().
  • transform(chunk, controller): Metode ini dipanggil saat potongan baru yang awalnya ditulis ke sisi yang dapat ditulis siap untuk diubah. Implementasi streaming menjamin bahwa fungsi ini hanya akan dipanggil setelah transformasi sebelumnya berhasil, dan belum pernah sebelum start() selesai atau setelah flush() dipanggil. Fungsi ini melakukan tugas transformasi yang sebenarnya dari aliran transformasi. Dapat mengantrekan hasil menggunakan controller.enqueue(). Tindakan ini mengizinkan satu bagian yang ditulis ke sisi yang dapat ditulis menghasilkan nol atau beberapa potongan pada sisi yang dapat dibaca, bergantung pada frekuensi controller.enqueue() dipanggil. Jika proses transformasi bersifat asinkron, fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan transformasi. Promise yang ditolak akan menimbulkan error pada sisi aliran transformasi yang dapat dibaca dan ditulis. Jika tidak ada metode transform() yang diberikan, transformasi identitas akan digunakan, yang mengantrekan potongan yang tidak berubah dari sisi yang dapat ditulis ke sisi yang dapat dibaca.
  • flush(controller): Metode ini dipanggil setelah semua bagian yang ditulis ke sisi yang dapat ditulis diubah dengan berhasil melewati transform(), dan sisi yang dapat ditulis akan ditutup. Biasanya digunakan untuk mengantrekan potongan akhiran ke sisi yang dapat dibaca, sebelum itu juga ditutup. Jika proses penghapusan bersifat asinkron, fungsi dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan; hasilnya akan disampaikan kepada pemanggil stream.writable.write(). Selain itu, promise yang ditolak akan menimbulkan error pada sisi streaming yang dapat dibaca dan ditulis. Menampilkan pengecualian akan diperlakukan sama seperti menampilkan promise yang ditolak.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Strategi antrean writableStrategy dan readableStrategy

Parameter opsional kedua dan ketiga dari konstruktor TransformStream() adalah strategi antrean writableStrategy dan readableStrategy opsional. Semuanya didefinisikan seperti yang diuraikan dalam bagian aliran yang dapat dibaca dan dapat ditulis.

Mentransformasi contoh kode streaming

Contoh kode berikut menunjukkan cara kerja streaming transformasi sederhana.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Menambahkan stream yang dapat dibaca melalui stream transformasi

Metode pipeThrough() dari antarmuka ReadableStream menyediakan cara berantai untuk menyalurkan aliran saat ini melalui aliran transformasi atau pasangan lainnya yang dapat ditulis/dibaca. Penyertaan aliran data umumnya akan menguncinya selama durasi pipe, sehingga pembaca lain tidak akan menguncinya.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Contoh kode berikutnya (sedikit rumit) menunjukkan cara menerapkan versi "teriakan" fetch() yang menulis semua teks dalam huruf besar dengan menggunakan promise respons yang ditampilkan sebagai stream dan menuliskan huruf besar per bagian. Keuntungan dari pendekatan ini adalah Anda tidak perlu menunggu seluruh dokumen didownload, sehingga dapat membuat perbedaan besar saat menangani file berukuran besar.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

Demo di bawah ini menunjukkan cara kerja stream yang dapat dibaca, ditulis, dan mentransformasikan. Contoh ini juga menyertakan contoh rantai pipe pipeThrough() dan pipeTo(), serta menunjukkan tee(). Jika ingin, Anda dapat menjalankan demo di jendelanya sendiri atau melihat kode sumber.

Streaming bermanfaat tersedia di browser

Ada sejumlah streaming berguna yang dibangun langsung ke dalam browser. Anda dapat dengan mudah membuat ReadableStream dari blob. Metode stream() antarmuka Blob menampilkan ReadableStream yang, setelah dibaca, menampilkan data yang terdapat dalam blob. Ingat juga bahwa objek File adalah jenis Blob tertentu, dan dapat digunakan dalam konteks apa pun yang dapat dilakukan oleh blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Varian streaming TextDecoder.decode() dan TextEncoder.encode() masing-masing disebut TextDecoderStream dan TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Mengompresi atau mendekompresi file dapat dilakukan dengan mudah menggunakan aliran transformasi CompressionStream dan DecompressionStream. Contoh kode di bawah ini menunjukkan cara mendownload spesifikasi Streams, mengompresi (gzip) langsung di browser, dan menulis file yang dikompresi langsung ke disk.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream File System Access API dan streaming permintaan fetch() eksperimental adalah contoh streaming yang dapat ditulis di mana pun.

Serial API memanfaatkan banyak streaming yang dapat dibaca dan ditulis.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Terakhir, WebSocketStream API mengintegrasikan stream dengan WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Referensi yang berguna

Ucapan terima kasih

Artikel ini ditinjau oleh Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley, dan Adam Rice. Postingan blog Jake Archibald telah banyak membantu saya dalam memahami aliran data. Beberapa contoh kode terinspirasi oleh eksplorasi @bellbind pengguna GitHub dan bagian dari prosa yang banyak dibangun di MDN Web Docs on Streams. Penulis Streams Standard telah melakukan pekerjaan yang luar biasa dalam menulis spesifikasi ini. Banner besar oleh Ryan Lara di Unsplash.