Streaming—Panduan definitif

Pelajari cara menggunakan streaming yang dapat dibaca, ditulis, dan ditransformasi dengan Streams API.

Streams API memungkinkan Anda mengakses secara terprogram aliran data yang diterima melalui jaringan atau dibuat dengan cara apa pun secara lokal dan memprosesnya dengan JavaScript. Streaming melibatkan pengelompokan resource yang ingin Anda terima, kirim, atau ubah menjadi potongan-potongan kecil, dan kemudian memproses potongan-potongan ini sedikit demi sedikit. Sementara streaming adalah browser saat menerima aset seperti HTML atau video untuk ditampilkan di halaman web, kemampuan ini belum pernah tersedia untuk JavaScript sebelum fetch dengan streaming diperkenalkan pada tahun 2015.

Sebelumnya, jika Anda ingin memproses semacam resource (baik video, file teks, dll.), Anda harus mengunduh seluruh file, menunggu hingga file selesai di-deserialisasi ke dalam format yang sesuai, kemudian memprosesnya. Dengan tersedianya streaming JavaScript, semuanya berubah. Sekarang Anda dapat memproses data mentah dengan JavaScript secara bertahap sebagai segera setelah tersedia di klien, tanpa perlu membuat buffer, string, atau blob. Langkah ini membuka sejumlah kasus penggunaan, beberapa di antaranya saya cantumkan di bawah ini:

  • Efek video: menyisipkan streaming video yang dapat dibaca melalui streaming transformasi yang menerapkan efek secara real-time.
  • Kompresi data (de): menyisipkan aliran file melalui aliran transformasi yang selektif (de)mengompresinya.
  • Pendekodean gambar: menyisipkan aliran respons HTTP melalui aliran transformasi yang mendekode byte menjadi data bitmap, lalu melalui aliran transformasi lain yang menerjemahkan bitmap menjadi PNG. Jika diinstal di dalam pengendali fetch dari pekerja layanan, hal ini memungkinkan Anda mem-polyfill secara transparan format gambar baru seperti AVIF.

Dukungan browser

ReadableStream dan WritableStream

Dukungan Browser

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Sumber

TransformStream

Dukungan Browser

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Sumber

Konsep inti

Sebelum membahas berbagai jenis streaming, saya akan memperkenalkan beberapa konsep inti.

Potongan

Potongan adalah satu bagian data yang ditulis atau dibaca dari stream. Dapat berupa apa saja ketik; {i>stream<i} bahkan dapat berisi potongan dari berbagai jenis. Sering kali, sebuah potongan tidak akan menjadi yang paling atomik unit data untuk aliran tertentu. Misalnya, satu byte bisa berisi potongan yang terdiri dari 16 byte KiB unit Uint8Array, bukan byte tunggal.

Streaming yang dapat dibaca

Aliran data yang dapat dibaca mewakili sumber data yang dapat Anda baca. Dengan kata lain, data berasal out streaming yang dapat dibaca. Konkretnya, streaming yang dapat dibaca adalah instance ReadableStream .

Streaming yang dapat ditulis

Aliran data yang dapat ditulisi merupakan tujuan data yang dapat Anda tulis. Dengan kata lain, data ini masuk ke aliran yang dapat ditulis. Konkretnya, aliran yang dapat ditulis adalah contoh dari Class WritableStream.

Mengubah aliran data

Streaming transformasi terdiri dari sepasang streaming: streaming yang dapat ditulis, yang dikenal sebagai sisi yang dapat ditulis, dan aliran yang dapat dibaca, yang dikenal sebagai sisi yang dapat dibaca. Metafora dunia nyata tentang ini adalah penerjemah simultan yang menerjemahkan dari satu bahasa ke bahasa lain dengan cepat. Dengan cara yang khusus untuk aliran transformasi, menulis ke sisi yang dapat ditulis menghasilkan data baru yang tersedia untuk dibaca dari dapat dibaca. Secara konkret, objek apa pun dengan properti writable dan properti readable dapat ditayangkan sebagai stream transformasi. Namun, class TransformStream standar mempermudah pembuatan pasangan yang terjerat dengan benar.

Rantai pipa

Streaming terutama digunakan dengan melakukan perutean ke satu sama lain. Aliran yang dapat dibaca dapat langsung disalurkan ke streaming yang dapat ditulis, menggunakan metode pipeTo() streaming yang dapat dibaca, atau dapat disalurkan melalui satu atau lebih streaming transformasi terlebih dahulu, menggunakan metode pipeThrough() streaming yang dapat dibaca. Serangkaian aliran data yang disalurkan bersama dengan cara ini disebut sebagai rantai pipa.

Tekanan balik

Setelah rantai pipa dibangun, ia akan menyebarkan sinyal tentang seberapa cepat potongan harus mengalir melewatinya. Jika ada langkah apa pun dalam rantai yang belum bisa menerima potongan, maka akan menyebarkan sinyal ke belakang melalui rantai pipa, sampai akhirnya sumber aslinya diberitahu untuk berhenti menghasilkan potongan sehingga dengan cepat. Proses normalisasi aliran ini disebut backpressure.

Menyusui Tee

Aliran data yang dapat dibaca dapat diberi judul (dinamai berdasarkan bentuk huruf besar 'T') menggunakan metode tee(). Tindakan ini akan mengunci streaming sehingga tidak dapat lagi digunakan secara langsung; namun, hal itu akan membuat dua stream, yang disebut cabang, yang dapat digunakan secara mandiri. Teeing juga penting karena streaming tidak dapat diputar ulang atau dimulai ulang. Hal ini akan dibahas lebih lanjut nanti.

Diagram pipeline yang terdiri dari stream yang dapat dibaca dan berasal dari panggilan ke pengambilan API yang kemudian disalurkan melalui stream transformasi yang outputnya di-teed, lalu dikirim ke browser untuk menghasilkan streaming pertama yang dapat dibaca dan ke cache pekerja layanan untuk menghasilkan streaming kedua yang dapat dibaca.
Rantai pipa.

Mekanisme streaming yang dapat dibaca

Streaming yang dapat dibaca adalah sumber data yang direpresentasikan dalam JavaScript oleh Objek ReadableStream yang yang mengalir dari sumber yang mendasarinya. Tujuan ReadableStream() membuat dan menampilkan objek aliran yang dapat dibaca dari pengendali yang diberikan. Ada dua jenis sumber yang mendasarinya:

  • Sumber push secara terus-menerus mendorong data kepada Anda saat Anda telah mengaksesnya, dan Anda bertanggung jawab untuk memulai, menjeda, atau membatalkan akses ke streaming. Contohnya mencakup streaming video live, peristiwa yang dikirim ke server, atau WebSockets.
  • Sumber pull mengharuskan Anda secara eksplisit meminta data dari sumber tersebut setelah terhubung. Contoh menyertakan operasi HTTP melalui panggilan fetch() atau XMLHttpRequest.

Data streaming dibaca secara berurutan dalam bagian kecil yang disebut bagian. Potongan yang ditempatkan dalam stream dikatakan diantrekan. Ini berarti mereka sedang menunggu dalam antrean siap untuk dibaca. Antrean internal melacak potongan yang belum dibaca.

Strategi antrean adalah objek yang menentukan cara streaming memberikan sinyal backpressure berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran untuk setiap potongan, dan membandingkan ukuran total dari semua potongan dalam antrean ke angka tertentu, yang dikenal sebagai tanda air tinggi.

Potongan di dalam streaming akan dibaca oleh pembaca. Pembaca ini mengambil data satu per satu waktu, sehingga Anda dapat melakukan jenis operasi apa pun yang Anda inginkan. Pembaca dan yang lainnya kode pemrosesan yang sesuai dengannya disebut konsumen.

Konstruksi berikutnya dalam konteks ini disebut pengontrol. Tiap aliran yang dapat dibaca memiliki sebagai pengontrol yang, seperti namanya, memungkinkan Anda mengontrol streaming.

Hanya satu pembaca yang dapat membaca streaming pada satu waktu; saat pembaca dibuat dan mulai membaca streaming (yaitu, menjadi pembaca aktif), pembaca akan terkunci. Jika Anda ingin pembaca lain mengambil alih membaca streaming, Anda biasanya perlu merilis pembaca pertama sebelum melakukan hal lain (meskipun Anda dapat melakukan streaming).

Membuat streaming yang dapat dibaca

Anda membuat streaming yang dapat dibaca dengan memanggil konstruktornya ReadableStream() Konstruktor memiliki argumen opsional underlyingSource, yang mewakili sebuah objek dengan metode dan properti yang menentukan perilaku instance streaming yang dibuat.

underlyingSource

Hal ini bisa menggunakan metode opsional berikut yang ditentukan developer:

  • start(controller): Segera dipanggil saat objek dibuat. Tujuan bisa mengakses sumber streaming, dan melakukan hal lain yang diperlukan untuk menyiapkan fungsi streaming. Jika proses ini harus dilakukan secara asinkron, metode dapat mengembalikan janji untuk menandakan keberhasilan atau kegagalan. Parameter controller yang diteruskan ke metode ini adalah suatu ReadableStreamDefaultController.
  • pull(controller): Dapat digunakan untuk mengontrol streaming saat semakin banyak potongan yang diambil. Ini dipanggil berulang kali selama antrean internal potongan stream tidak penuh, sampai antrean mencapai tanda air yang tinggi. Jika hasil pemanggilan pull() adalah promise, pull() tidak akan dipanggil lagi hingga promise tersebut terpenuhi. Jika promise ditolak, streaming akan mengalami error.
  • cancel(reason): Dipanggil saat konsumen streaming membatalkan streaming.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController mendukung metode berikut:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Argumen kedua, yang juga opsional, dari konstruktor ReadableStream() adalah queuingStrategy. Ini adalah objek yang secara opsional menentukan strategi antrean untuk streaming, yang membutuhkan waktu dua parameter:

  • highWaterMark: Angka non-negatif yang menunjukkan tanda air sungai yang tinggi menggunakan strategi antrean ini.
  • size(chunk): Fungsi yang menghitung dan menampilkan ukuran non-negatif terbatas dari nilai potongan yang diberikan. Hasilnya digunakan untuk menentukan backpressure, yang dimanifes melalui properti ReadableStreamDefaultController.desiredSize yang sesuai. Kolom ini juga mengatur kapan metode pull() sumber pokok dipanggil.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Metode getReader() dan read()

Untuk membaca dari aliran yang dapat dibaca, Anda membutuhkan pembaca, yang akan menjadi ReadableStreamDefaultReader Metode getReader() dari antarmuka ReadableStream membuat pembaca dan mengunci aliran data ke anotasi. Saat streaming dikunci, tidak ada pembaca lain yang dapat diperoleh hingga streaming ini dirilis.

read() antarmuka ReadableStreamDefaultReader akan menampilkan promise yang memberikan akses ke metode dalam antrean internal stream. Memenuhi atau menolak dengan hasil tergantung pada status {i>stream <i}itu. Kemungkinan yang berbeda adalah sebagai berikut:

  • Jika potongan tersedia, promise akan dipenuhi dengan objek berformat
    { value: chunk, done: false }.
  • Jika aliran data menjadi ditutup, promise akan dipenuhi dengan objek berformat
    { value: undefined, done: true }.
  • Jika streaming mengalami error, promise akan ditolak dengan error yang relevan.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Properti locked

Anda dapat memeriksa apakah streaming yang dapat dibaca terkunci dengan mengakses ReadableStream.locked saat ini.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Contoh kode streaming yang dapat dibaca

Contoh kode di bawah menunjukkan penerapan semua langkah. Pertama, Anda membuat ReadableStream yang di Argumen underlyingSource (yaitu, class TimestampSource) menentukan metode start(). Metode ini memberi tahu controller aliran data untuk enqueue() stempel waktu setiap detik selama sepuluh detik. Terakhir, ini akan memberi tahu pengontrol untuk melakukan close() aliran. Anda memakai ini streaming dengan membuat pembaca melalui metode getReader() dan memanggil read() hingga streaming selesai done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iterasi asinkron

Memeriksa setiap iterasi loop read() jika streaming berupa done mungkin bukan API yang paling praktis. Untungnya, akan segera ada cara yang lebih baik untuk melakukan ini: iterasi asinkron.

for await (const chunk of stream) {
  console.log(chunk);
}

Solusi untuk menggunakan iterasi asinkron saat ini adalah dengan menerapkan perilaku dengan polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Memasukkan streaming yang dapat dibaca

Metode tee() atribut Antarmuka ReadableStream memproses streaming yang dapat dibaca saat ini, yang menampilkan array dua elemen yang berisi dua cabang yang dihasilkan sebagai instance ReadableStream baru. Hal ini memungkinkan dua pembaca untuk membaca aliran secara bersamaan. Anda mungkin melakukannya, misalnya, dalam pekerja layanan jika Anda akan mengambil respons dari server dan mengalirkannya ke browser, sekaligus mengalirkannya ke cache pekerja layanan. Karena isi respons tidak dapat digunakan lebih dari sekali, Anda memerlukan dua salinan untuk melakukan hal ini. Untuk membatalkan streaming, Anda harus membatalkan kedua cabang yang dihasilkan. Melakukan live streaming umumnya akan menguncinya selama jangka waktu tertentu, sehingga mencegah pembaca lain menguncinya.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Streaming byte yang dapat dibaca

Untuk streaming yang mewakili byte, versi tambahan dari streaming yang dapat dibaca akan disediakan untuk menangani {i>byte<i} secara efisien, khususnya dengan meminimalkan salinan. Byte stream memungkinkan buffer bawa sendiri (BYOB) pembaca akan diakuisisi. Implementasi default dapat memberikan berbagai output yang berbeda seperti sebagai string atau buffer array pada kasus WebSockets, sedangkan aliran byte menjamin output byte. Selain itu, pembaca BYOB memiliki manfaat stabilitas. Ini adalah karena jika {i>buffer<i} terputus, itu dapat menjamin bahwa satu per satu tidak akan menulis ke {i>buffer <i}yang sama dua kali, sehingga dapat menghindari kondisi race. Pembaca BYOB dapat mengurangi frekuensi browser perlu dijalankan pembersihan sampah memori, karena dapat menggunakan kembali {i>buffer<i}.

Membuat aliran byte yang dapat dibaca

Anda dapat membuat aliran byte yang dapat dibaca dengan meneruskan parameter type tambahan ke Konstruktor ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Sumber pokok aliran byte yang dapat dibaca diberi ReadableByteStreamController ke memanipulasi. Metode ReadableByteStreamController.enqueue()-nya mengambil argumen chunk yang nilainya adalah ArrayBufferView. Properti ReadableByteStreamController.byobRequest menampilkan BYOB pull request, atau null jika tidak ada. Terakhir, ReadableByteStreamController.desiredSize menampilkan ukuran yang diinginkan untuk mengisi antrean internal streaming yang dikontrol.

queuingStrategy

Argumen kedua, yang juga opsional, dari konstruktor ReadableStream() adalah queuingStrategy. Ini adalah objek yang secara opsional menentukan strategi antrean untuk streaming, yang membutuhkan satu :

  • highWaterMark: Jumlah byte non-negatif yang menunjukkan tanda air yang tinggi pada streaming menggunakan strategi antrean ini. Ini digunakan untuk menentukan backpressure, yang dimanifes melalui properti ReadableByteStreamController.desiredSize yang sesuai. Kolom ini juga mengatur kapan metode pull() sumber pokok dipanggil.

Metode getReader() dan read()

Anda kemudian bisa mendapatkan akses ke ReadableStreamBYOBReader dengan menetapkan parameter mode sebagaimana mestinya: ReadableStream.getReader({ mode: "byob" }). Hal ini memungkinkan kontrol yang lebih tepat atas buffer alokasi data untuk menghindari salinan. Untuk membaca dari aliran byte, Anda perlu memanggil ReadableStreamBYOBReader.read(view), dengan view adalah ArrayBufferView

Contoh kode streaming byte yang dapat dibaca

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Fungsi berikut menampilkan aliran byte yang dapat dibaca yang memungkinkan pembacaan nol salinan yang efisien dari array yang dihasilkan secara acak. Alih-alih menggunakan ukuran potongan 1.024 yang telah ditentukan sebelumnya, ia mencoba mengisi {i>buffer<i} yang disediakan oleh pengembang, yang memungkinkan kontrol penuh.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mekanisme streaming yang dapat ditulis

Aliran data yang dapat ditulis adalah tujuan di mana Anda dapat menulis data, yang direpresentasikan dalam JavaScript oleh Objek WritableStream. Ini berfungsi sebagai abstraksi di atas sink dasar—sink I/O tingkat rendah ke data mentah ditulis.

Data ditulis ke streaming melalui penulis, satu bagian dalam satu waktu. Sebuah potongan foto dapat memerlukan banyak bentuk, seperti potongan yang ditulis di pembaca. Anda dapat menggunakan kode apa pun yang Anda inginkan untuk menghasilkan potongan-potongan yang siap untuk ditulis; penulis ditambah kode terkait disebut produser.

Saat seorang penulis dibuat dan mulai menulis ke streaming (penulis aktif), hal ini disebut terkunci padanya. Hanya satu penulis yang dapat menulis ke streaming yang dapat ditulis pada satu waktu. Jika Anda ingin yang lain penulis untuk mulai menulis ke aliran, biasanya Anda perlu melepaskannya, sebelum kemudian melampirkan penulis lain.

Antrean internal melacak potongan yang telah ditulis ke aliran data, tetapi belum telah diproses oleh sink yang mendasarinya.

Strategi antrean adalah objek yang menentukan cara streaming memberikan sinyal backpressure berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran untuk setiap potongan, dan membandingkan ukuran total dari semua potongan dalam antrean ke angka tertentu, yang dikenal sebagai tanda air tinggi.

Konstruksi terakhir disebut controller. Setiap streaming yang dapat ditulis memiliki satu {i>controller<i} terkait yang memungkinkan Anda mengontrol streaming (misalnya, untuk membatalkannya).

Membuat streaming yang dapat ditulis

Antarmuka WritableStream Streams API menyediakan abstraksi standar untuk menulis data streaming ke tujuan, yang dikenal sebagai sink. Objek ini dilengkapi dengan backpressure dan antrean bawaan. Anda membuat streaming yang dapat ditulis dengan memanggil konstruktornya WritableStream() Class ini memiliki parameter underlyingSink opsional, yang mewakili sebuah objek dengan metode dan properti yang menentukan perilaku instance streaming yang dibuat.

underlyingSink

underlyingSink dapat menyertakan metode opsional berikut yang ditentukan developer. controller yang diteruskan ke beberapa metode adalah WritableStreamDefaultController.

  • start(controller): Metode ini segera dipanggil saat objek dibuat. Tujuan dari metode ini harus bertujuan untuk mendapatkan akses ke sink yang mendasarinya. Jika proses ini harus dilakukan secara asinkron, proses itu dapat memberikan janji untuk menandakan keberhasilan atau kegagalan.
  • write(chunk, controller): Metode ini akan dipanggil saat potongan data baru (ditentukan dalam chunk) siap ditulis ke sink yang mendasarinya. Fungsi ini dapat mengembalikan janji untuk menandakan keberhasilan atau kegagalan operasi tulis. Metode ini hanya akan dipanggil setelah tindakan sebelumnya operasi tulis berhasil, dan tidak pernah setelah {i> stream <i}ditutup atau dibatalkan.
  • close(controller): Metode ini akan dipanggil jika aplikasi memberi sinyal bahwa penulisan telah selesai potongan ke aliran. Konten harus melakukan apa pun yang diperlukan untuk menyelesaikan penulisan ke yang mendasari, lalu melepaskan akses ke sana. Jika proses ini asinkron, proses ini dapat mengembalikan berjanji untuk menandakan keberhasilan atau kegagalan. Metode ini hanya akan dipanggil setelah semua penulisan dalam antrean telah berhasil.
  • abort(reason): Metode ini akan dipanggil jika aplikasi memberikan sinyal bahwa aplikasi ingin menutup tiba-tiba {i>stream <i}dan memasukkannya dalam status {i>error<i}. Cara ini dapat membersihkan semua sumber daya yang tersimpan, seperti close(), tetapi abort() akan dipanggil meskipun operasi tulis dalam antrean. Potongan tersebut akan ditampilkan pergi. Jika proses ini asinkron, proses ini dapat mengembalikan janji untuk menandakan keberhasilan atau kegagalan. Tujuan Parameter reason berisi DOMString yang menjelaskan alasan streaming dibatalkan.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Tujuan WritableStreamDefaultController antarmuka Streams API mewakili pengontrol yang memungkinkan kontrol status WritableStream selama pengaturan, karena lebih banyak potongan yang dikirim untuk penulisan, atau di akhir penulisan. Saat membuat WritableStream, sink yang mendasarinya akan diberi WritableStreamDefaultController yang sesuai untuk dimanipulasi. WritableStreamDefaultController hanya memiliki satu metode: WritableStreamDefaultController.error(), sehingga menyebabkan error pada interaksi mendatang dengan aliran data terkait. WritableStreamDefaultController juga mendukung properti signal yang menampilkan instance AbortSignal, yang memungkinkan operasi WritableStream dihentikan jika diperlukan.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Argumen kedua, yang juga opsional, dari konstruktor WritableStream() adalah queuingStrategy. Ini adalah objek yang secara opsional menentukan strategi antrean untuk streaming, yang membutuhkan waktu dua parameter:

  • highWaterMark: Angka non-negatif yang menunjukkan tanda air sungai yang tinggi menggunakan strategi antrean ini.
  • size(chunk): Fungsi yang menghitung dan menampilkan ukuran non-negatif terbatas dari nilai potongan yang diberikan. Hasilnya digunakan untuk menentukan backpressure, yang dimanifes melalui properti WritableStreamDefaultWriter.desiredSize yang sesuai.

Metode getWriter() dan write()

Untuk menulis ke aliran yang dapat ditulis, Anda membutuhkan penulis, yang akan menjadi WritableStreamDefaultWriter. Metode getWriter() dari antarmuka WritableStream menampilkan instance baru WritableStreamDefaultWriter dan mengunci streaming ke instance tersebut. Sementara streaming dikunci, tidak ada penulis lain yang dapat diperoleh hingga penulis saat ini dirilis.

write() dari WritableStreamDefaultWriter menulis sepotong data yang diteruskan ke WritableStream dan sink yang mendasarinya, kemudian mengembalikan promise yang diselesaikan untuk menunjukkan keberhasilan atau kegagalan operasi tulis. Perhatikan bahwa apa "berhasil" berarti bergantung pada sink yang mendasarinya; mungkin menunjukkan bahwa potongan tersebut telah diterima, dan belum tentu data itu disimpan dengan aman ke tujuan akhirnya.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Properti locked

Anda dapat memeriksa apakah streaming yang dapat ditulis dikunci dengan mengakses WritableStream.locked saat ini.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Contoh kode streaming yang dapat ditulis

Contoh kode di bawah menunjukkan penerapan semua langkah.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Menyisipkan streaming yang dapat dibaca ke aliran yang dapat ditulis

Aliran yang dapat dibaca, dapat disalurkan ke aliran yang dapat ditulis melalui aliran data yang dapat dibaca Metode pipeTo(). ReadableStream.pipeTo() menyalurkan ReadableStream saat ini ke WritableStream yang ditentukan dan menampilkan yang terpenuhi ketika proses pemipaan berhasil diselesaikan, atau ditolak jika ada kesalahan temui.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Membuat stream transformasi

Antarmuka TransformStream Streams API mewakili kumpulan data yang dapat ditransformasi. Anda buat aliran transformasi dengan memanggil konstruktornya TransformStream(), yang membuat dan menampilkan objek aliran transformasi dari pengendali yang diberikan. Konstruktor TransformStream() menerima sebagai argumen pertamanya objek JavaScript opsional yang mewakili transformer. Objek tersebut dapat berisi salah satu metode berikut:

transformer

  • start(controller): Metode ini segera dipanggil saat objek dibuat. Biasanya ini digunakan untuk mengantrekan potongan awalan, menggunakan controller.enqueue(). Potongan informasi itu akan dibaca dari sisi yang dapat dibaca tetapi tidak tergantung pada penulisan apa pun ke sisi yang dapat ditulis. Jika inisial ini proses asinkron, misalnya karena diperlukan usaha untuk memperoleh potongan awalan, {i>function<i} itu bisa memberikan janji untuk menandakan keberhasilan atau kegagalan; promise yang ditolak akan membuat error feed. Setiap pengecualian yang ditampilkan akan ditampilkan ulang oleh konstruktor TransformStream().
  • transform(chunk, controller): Metode ini dipanggil jika potongan baru awalnya ditulis ke sisi yang dapat ditulisi siap diubah. Implementasi streaming menjamin bahwa fungsi ini hanya akan dipanggil setelah transformasi sebelumnya berhasil, dan belum pernah sebelum start() selesai atau setelah flush() dipanggil. Fungsi ini melakukan transformasi aktual pekerjaan aliran transformasi. Fitur ini dapat mengantrekan hasilnya menggunakan controller.enqueue(). Ini memungkinkan satu potongan yang ditulis di sisi yang dapat ditulis untuk menghasilkan nol atau beberapa potongan pada sisi yang dapat dibaca, bergantung pada seberapa sering controller.enqueue() dipanggil. Jika proses bersifat asinkron, fungsi ini dapat mengembalikan janji untuk menandakan keberhasilan atau kegagalan melakukan transformasi. Promise yang ditolak akan menimbulkan error pada sisi yang dapat dibaca dan ditulis transformasi stream. Jika tidak ada metode transform() yang disediakan, transformasi identitas akan digunakan, yang mengantrekan potongan tanpa perubahan dari sisi yang bisa ditulis ke sisi yang bisa dibaca.
  • flush(controller): Metode ini dipanggil setelah semua potongan yang ditulis ke sisi yang dapat ditulisi diubah dengan berhasil meneruskan transform(), dan sisi yang dapat ditulis akan segera tutup. Biasanya ini digunakan untuk mengantrekan potongan akhiran ke sisi yang dapat dibaca, sebelum itu juga menjadi ditutup. Jika proses flushing asinkron, fungsi ini bisa menampilkan promise ke menunjukkan keberhasilan atau kegagalan; hasilnya akan dikomunikasikan ke pemanggil dari stream.writable.write(). Selain itu, promise yang ditolak akan membuat error antara promise yang dibaca dan sisi streaming yang dapat ditulis. Menampilkan pengecualian diperlakukan sama seperti menampilkan pengecualian yang ditolak yang menjanjikan.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Strategi antrean writableStrategy dan readableStrategy

Parameter opsional kedua dan ketiga dari konstruktor TransformStream() bersifat opsional Strategi antrean writableStrategy dan readableStrategy. Hal tersebut didefinisikan sebagaimana diuraikan dalam dapat dibaca dan aliran yang dapat ditulis bagian masing-masing.

Contoh kode streaming transformasi

Contoh kode berikut menunjukkan cara kerja aliran transformasi sederhana.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Menyisipkan stream yang dapat dibaca melalui stream transformasi

pipeThrough() metode antarmuka ReadableStream memberikan cara berantai untuk melakukan pemipaan aliran data saat ini melalui aliran transformasi atau pasangan lainnya yang dapat ditulis/dibaca. Meliputi aliran data biasanya akan mengunci sepanjang waktu, sehingga mencegah pembaca lain menguncinya.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Contoh kode berikutnya (sedikit dibuat-buat) menunjukkan cara menerapkan "teriakan" versi fetch() menggunakan huruf besar semua teks dengan menggunakan promise respons yang ditampilkan sebagai aliran data dan mengapit potongan demi potongan. Keuntungan dari pendekatan ini adalah Anda tidak perlu menunggu seluruh dokumen untuk diunduh, yang dapat membuat perbedaan besar ketika berhadapan dengan file besar.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

Demo di bawah ini menunjukkan cara kerja streaming yang dapat dibaca, ditulis, dan diubah. Artikel ini juga menyertakan contoh dari rantai pipa pipeThrough() dan pipeTo(), serta menunjukkan tee(). Anda juga dapat menjalankan demo di jendelanya sendiri atau lihat kode sumber.

Streaming bermanfaat yang tersedia di browser

Ada sejumlah aliran data berguna yang terintegrasi langsung ke dalam browser. Anda dapat dengan mudah membuat ReadableStream dari blob. Blob metode stream() antarmuka ditampilkan ReadableStream yang setelah dibaca akan menampilkan data yang terdapat dalam blob. Ingat juga bahwa Objek File adalah jenis instance Blob, dan dapat digunakan dalam konteks apa pun seperti yang dapat dilakukan blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Varian streaming TextDecoder.decode() dan TextEncoder.encode() disebut TextDecoderStream dan TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Mengompresi atau melakukan dekompresi file mudah dengan CompressionStream dan Aliran data transformasi DecompressionStream secara berurutan. Contoh kode di bawah menunjukkan cara mendownload spesifikasi Streams, kompres (gzip) langsung di browser, dan menulis file yang dikompresi langsung ke {i>disk<i}.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access API FileSystemWritableFileStream dan aliran permintaan fetch() eksperimental contoh {i>stream<i} yang dapat ditulis di alam bebas.

Serial API sangat sering menggunakan streaming yang dapat dibaca dan ditulis.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Terakhir, WebSocketStream API mengintegrasikan streaming dengan WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Referensi yang berguna

Ucapan terima kasih

Artikel ini telah ditinjau oleh Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley, dan Beras Adam. Postingan blog Jake Archibald telah banyak membantu saya dalam memahami feed. Beberapa contoh kode terinspirasi oleh pengguna GitHub Eksplorasi @bellbind dan bagian-bagian prosanya sangat bergantung pada Dokumen Web MDN tentang Streams. Tujuan Streams Standard penulis telah melakukan pekerjaan yang luar biasa dalam menulis spesifikasi ini. Gambar pahlawan oleh Ryan Lara di Buka Percikan.