Pelajari cara menggunakan aliran yang dapat dibaca, dapat ditulis, dan dapat ditransformasi dengan Streams API.
Streams API memungkinkan Anda mengakses aliran data yang diterima melalui jaringan secara terprogram atau dibuat dengan cara apa pun secara lokal dan memprosesnya dengan JavaScript. Streaming melibatkan penguraian resource yang ingin Anda terima, kirim, atau ubah menjadi potongan kecil, lalu memproses potongan ini sedikit demi sedikit. Meskipun streaming adalah sesuatu yang dilakukan browser saat menerima aset seperti HTML atau video untuk ditampilkan di halaman web, kemampuan ini belum pernah tersedia untuk JavaScript sebelum fetch
dengan streaming diperkenalkan pada tahun 2015.
Sebelumnya, jika Anda ingin memproses suatu jenis resource (baik itu video, file teks, dll.), Anda harus mendownload seluruh file, menunggu file tersebut dideserialisasi ke dalam format yang sesuai, lalu memprosesnya. Dengan tersedianya stream untuk JavaScript, semuanya berubah. Anda kini dapat memproses data mentah dengan JavaScript secara progresif segera setelah tersedia di klien, tanpa perlu membuat buffer, string, atau blob. Hal ini memungkinkan sejumlah kasus penggunaan, beberapa di antaranya saya cantumkan di bawah:
- Efek video: menyalurkan streaming video yang dapat dibaca melalui streaming transformasi yang menerapkan efek secara real time.
- (De)kompresi data: menyalurkan aliran file melalui aliran transformasi yang secara selektif (de)mengompresinya.
- Dekode gambar: menyalurkan aliran respons HTTP melalui aliran transformasi yang mendekode byte menjadi data bitmap, lalu melalui aliran transformasi lain yang menerjemahkan bitmap menjadi PNG. Jika
diinstal di dalam pengendali
fetch
pekerja layanan, Anda dapat melakukan polyfill format gambar baru seperti AVIF secara transparan.
Dukungan browser
ReadableStream dan WritableStream
TransformStream
Konsep inti
Sebelum membahas detail berbagai jenis aliran, izinkan saya memperkenalkan beberapa konsep inti.
Potongan
Chunk adalah satu bagian data yang ditulis ke atau dibaca dari aliran. Data ini dapat berupa jenis apa pun; aliran bahkan dapat berisi potongan dari berbagai jenis. Sering kali, potongan tidak akan menjadi unit data paling atomik untuk aliran tertentu. Misalnya, aliran byte dapat berisi chunk yang terdiri dari unit Uint8Array
16 KiB, bukan byte tunggal.
Aliran data yang dapat dibaca
Aliran yang dapat dibaca merepresentasikan sumber data yang dapat Anda baca. Dengan kata lain, data keluar dari aliran yang dapat dibaca. Secara konkret, aliran yang dapat dibaca adalah instance class ReadableStream
.
Aliran data yang dapat ditulis
Aliran yang dapat ditulis merepresentasikan tujuan data yang dapat Anda tulis. Dengan kata lain, data
masuk ke aliran yang dapat ditulis. Secara konkret, stream yang dapat ditulis adalah instance class
WritableStream
.
Transformasi aliran
Aliran transformasi terdiri dari pasangan aliran: aliran yang dapat ditulis, yang dikenal sebagai sisi yang dapat ditulis, dan aliran yang dapat dibaca, yang dikenal sebagai sisi yang dapat dibaca.
Metafora dunia nyata untuk hal ini adalah
penerjemah simultan
yang menerjemahkan dari satu bahasa ke bahasa lain secara langsung.
Dengan cara yang khusus untuk aliran transformasi, penulisan
ke sisi yang dapat ditulis akan menghasilkan data baru yang tersedia untuk dibaca dari
sisi yang dapat dibaca. Secara konkret, setiap objek dengan properti writable
dan properti readable
dapat berfungsi
sebagai aliran transformasi. Namun, class TransformStream
standar memudahkan pembuatan
pasangan yang terjalin dengan benar.
Rantai pipa
Aliran terutama digunakan dengan menyalurkannya satu sama lain. Aliran yang dapat dibaca dapat disalurkan langsung
ke aliran yang dapat ditulis, menggunakan metode pipeTo()
aliran yang dapat dibaca, atau dapat disalurkan melalui satu
atau beberapa aliran transformasi terlebih dahulu, menggunakan metode pipeThrough()
aliran yang dapat dibaca. Kumpulan
aliran yang disalurkan bersama dengan cara ini disebut sebagai rantai saluran.
Tekanan balik
Setelah rantai pipa dibuat, rantai tersebut akan menyebarkan sinyal mengenai seberapa cepat potongan harus mengalir melaluinya. Jika ada langkah dalam rantai yang belum dapat menerima chunk, langkah tersebut akan menyebarkan sinyal ke belakang melalui rantai saluran, hingga akhirnya sumber asli diminta untuk berhenti memproduksi chunk dengan cepat. Proses menormalisasi alur ini disebut tekanan balik.
Memulai
Aliran yang dapat dibaca dapat di-tee (dinamai sesuai bentuk 'T' huruf besar) menggunakan metode tee()
.
Tindakan ini akan mengunci stream, yaitu membuatnya tidak dapat digunakan secara langsung lagi; namun, tindakan ini akan membuat dua stream baru, yang disebut cabang, yang dapat digunakan secara terpisah.
Teeing juga penting karena streaming tidak dapat di-rewind atau dimulai ulang. Kami akan membahasnya lebih lanjut nanti.
Mekanisme aliran yang dapat dibaca
Aliran yang dapat dibaca adalah sumber data yang direpresentasikan di JavaScript oleh objek
ReadableStream
yang
mengalir dari sumber pokok. Konstruktor
ReadableStream()
membuat dan menampilkan objek stream yang dapat dibaca dari handler yang diberikan. Ada dua jenis sumber pokok:
- Sumber push terus-menerus mengirimkan data kepada Anda saat Anda mengaksesnya, dan Anda dapat memulai, menjeda, atau membatalkan akses ke aliran data. Contohnya mencakup live stream video, peristiwa yang dikirim oleh server, atau WebSockets.
- Sumber penarikan mengharuskan Anda meminta data secara eksplisit dari sumber tersebut setelah terhubung. Contohnya mencakup operasi HTTP melalui panggilan
fetch()
atauXMLHttpRequest
.
Data streaming dibaca secara berurutan dalam potongan kecil yang disebut chunk. Chunk yang ditempatkan dalam aliran dikatakan dalam antrean. Artinya, mereka menunggu dalam antrean dan siap dibaca. Antrean internal melacak chunk yang belum dibaca.
Strategi antrean adalah objek yang menentukan cara aliran harus memberi sinyal tekanan balik berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran untuk setiap bagian, dan membandingkan total ukuran semua bagian dalam antrean dengan jumlah yang ditentukan, yang dikenal sebagai batas atas.
Potongan dalam aliran dibaca oleh pembaca. Pembaca ini mengambil data satu bagian dalam satu waktu, sehingga Anda dapat melakukan jenis operasi apa pun yang ingin Anda lakukan. Pembaca ditambah kode pemrosesan lainnya yang menyertainya disebut konsumen.
Konstruk berikutnya dalam konteks ini disebut pengontrol. Setiap aliran yang dapat dibaca memiliki pengontrol terkait yang, seperti namanya, memungkinkan Anda mengontrol aliran.
Hanya satu pembaca yang dapat membaca aliran data pada satu waktu; saat pembaca dibuat dan mulai membaca aliran data (yaitu, menjadi pembaca aktif), pembaca tersebut akan dikunci ke aliran data tersebut. Jika Anda ingin pembaca lain mengambil alih pembacaan streaming, Anda biasanya perlu melepaskan pembaca pertama sebelum melakukan hal lain (meskipun Anda dapat menyiapkan streaming).
Membuat stream yang dapat dibaca
Anda membuat stream yang dapat dibaca dengan memanggil konstruktornya
ReadableStream()
.
Konstruktor memiliki argumen opsional underlyingSource
, yang merepresentasikan objek
dengan metode dan properti yang menentukan perilaku instance stream yang dibuat.
underlyingSource
Hal ini dapat menggunakan metode opsional yang ditentukan developer berikut:
start(controller)
: Dipanggil segera saat objek dibuat. Metode dapat mengakses sumber streaming, dan melakukan hal lain yang diperlukan untuk menyiapkan fungsi streaming. Jika proses ini akan dilakukan secara asinkron, metode dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Parametercontroller
yang diteruskan ke metode ini adalahReadableStreamDefaultController
.pull(controller)
: Dapat digunakan untuk mengontrol streaming saat lebih banyak chunk diambil. Fungsi ini dipanggil berulang kali selama antrean internal potongan streaming tidak penuh, hingga antrean mencapai tanda batas atasnya. Jika hasil pemanggilanpull()
adalah promise,pull()
tidak akan dipanggil lagi hingga promise tersebut terpenuhi. Jika promise ditolak, aliran data akan menjadi error.cancel(reason)
: Dipanggil saat konsumen stream membatalkan stream.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
mendukung metode berikut:
ReadableStreamDefaultController.close()
menutup aliran data terkait.ReadableStreamDefaultController.enqueue()
mengantrekan potongan tertentu dalam streaming terkait.ReadableStreamDefaultController.error()
menyebabkan error pada interaksi mendatang dengan aliran data terkait.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Argumen kedua, yang juga opsional, dari konstruktor ReadableStream()
adalah queuingStrategy
.
Objek ini secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua parameter:
highWaterMark
: Angka non-negatif yang menunjukkan tanda air tinggi dari aliran yang menggunakan strategi antrean ini.size(chunk)
: Fungsi yang menghitung dan menampilkan ukuran non-negatif terbatas dari nilai potongan yang diberikan. Hasilnya digunakan untuk menentukan tekanan balik, yang ditampilkan melalui propertiReadableStreamDefaultController.desiredSize
yang sesuai. Hal ini juga mengatur kapan metodepull()
sumber pokok dipanggil.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Metode getReader()
dan read()
Untuk membaca dari aliran yang dapat dibaca, Anda memerlukan pembaca, yang akan menjadi
ReadableStreamDefaultReader
.
Metode getReader()
antarmuka ReadableStream
membuat pembaca dan mengunci streaming ke
pembaca tersebut. Saat aliran dikunci, tidak ada pembaca lain yang dapat diperoleh hingga pembaca ini dilepaskan.
Metode read()
dari antarmuka ReadableStreamDefaultReader
menampilkan promise yang memberikan akses ke potongan
berikutnya dalam antrean internal stream. Metode ini akan memenuhi atau menolak dengan hasil bergantung pada status
aliran. Kemungkinan yang berbeda adalah sebagai berikut:
- Jika chunk tersedia, promise akan dipenuhi dengan objek formulir
{ value: chunk, done: false }
. - Jika aliran ditutup, promise akan dipenuhi dengan objek dalam bentuk
{ value: undefined, done: true }
. - Jika streaming mengalami error, promise akan ditolak dengan error yang relevan.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Properti locked
Anda dapat memeriksa apakah aliran yang dapat dibaca dikunci dengan mengakses properti
ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Contoh kode aliran yang dapat dibaca
Contoh kode di bawah menunjukkan semua langkah yang sedang dilakukan. Pertama, Anda membuat ReadableStream
yang dalam argumen
underlyingSource
(yaitu, class TimestampSource
) menentukan metode start()
.
Metode ini memberi tahu controller
aliran untuk
enqueue()
stempel waktu setiap detik selama sepuluh detik.
Terakhir, ia memberi tahu pengontrol untuk close()
streaming. Anda menggunakan aliran ini dengan membuat pembaca melalui metode getReader()
dan memanggil read()
hingga aliran menjadi done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iterasi asinkron
Memeriksa apakah aliran done
pada setiap iterasi loop read()
mungkin bukan API yang paling nyaman.
Untungnya, akan segera ada cara yang lebih baik untuk melakukannya: iterasi asinkron.
for await (const chunk of stream) {
console.log(chunk);
}
Solusi untuk menggunakan iterasi asinkron saat ini adalah dengan menerapkan perilaku dengan polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Mengatur waktu aliran yang dapat dibaca
Metode tee()
dari antarmuka
ReadableStream
akan mengalirkan stream yang dapat dibaca saat ini, menampilkan array dua elemen
yang berisi dua cabang yang dihasilkan sebagai instance ReadableStream
baru. Hal ini memungkinkan
dua pembaca membaca stream secara bersamaan. Anda dapat melakukannya, misalnya, di service worker jika Anda ingin mengambil respons dari server dan melakukan streaming ke browser, tetapi juga melakukan streaming ke cache service worker. Karena isi respons tidak dapat digunakan lebih dari sekali, Anda memerlukan dua salinan
untuk melakukannya. Untuk membatalkan streaming, Anda harus membatalkan kedua cabang yang dihasilkan. Mengawali streaming
biasanya akan menguncinya selama durasi, sehingga mencegah pembaca lain menguncinya.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Aliran byte yang dapat dibaca
Untuk aliran yang merepresentasikan byte, versi yang diperluas dari aliran yang dapat dibaca disediakan untuk menangani byte secara efisien, khususnya dengan meminimalkan salinan. Aliran byte memungkinkan akuisisi pembaca bawa buffer Anda sendiri (BYOB). Implementasi default dapat memberikan berbagai output yang berbeda seperti string atau buffer array dalam kasus WebSockets, sedangkan aliran byte menjamin output byte. Selain itu, pembaca BYOB memiliki manfaat stabilitas. Hal ini karena jika buffer dilepas, buffer dapat menjamin bahwa buffer tidak ditulis ke buffer yang sama dua kali, sehingga menghindari kondisi persaingan. Pembaca BYOB dapat mengurangi jumlah pembersihan sampah memori yang perlu dijalankan browser, karena dapat menggunakan kembali buffer.
Membuat aliran byte yang dapat dibaca
Anda dapat membuat aliran byte yang dapat dibaca dengan meneruskan parameter type
tambahan ke
konstruktor ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
Sumber pokok dari aliran byte yang dapat dibaca diberi ReadableByteStreamController
untuk dimanipulasi. Metode ReadableByteStreamController.enqueue()
menggunakan argumen chunk
yang nilainya
adalah ArrayBufferView
. Properti ReadableByteStreamController.byobRequest
menampilkan pull request BYOB saat ini, atau null jika tidak ada. Terakhir, properti ReadableByteStreamController.desiredSize
menampilkan ukuran yang diinginkan untuk mengisi antrean internal stream yang dikontrol.
queuingStrategy
Argumen kedua, yang juga opsional, dari konstruktor ReadableStream()
adalah queuingStrategy
.
Objek ini secara opsional menentukan strategi antrean untuk streaming, yang menggunakan satu
parameter:
highWaterMark
: Jumlah byte non-negatif yang menunjukkan tanda batas atas aliran menggunakan strategi antrean ini. Hal ini digunakan untuk menentukan tekanan balik, yang terwujud melalui propertiReadableByteStreamController.desiredSize
yang sesuai. Hal ini juga mengatur kapan metodepull()
sumber pokok dipanggil.
Metode getReader()
dan read()
Kemudian, Anda dapat memperoleh akses ke ReadableStreamBYOBReader
dengan menetapkan parameter mode
dengan tepat:
ReadableStream.getReader({ mode: "byob" })
. Hal ini memungkinkan kontrol yang lebih presisi atas alokasi buffer untuk menghindari salinan. Untuk membaca dari aliran byte, Anda perlu memanggil
ReadableStreamBYOBReader.read(view)
, dengan view
adalah
ArrayBufferView
.
Contoh kode aliran byte yang dapat dibaca
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Fungsi berikut menampilkan aliran byte yang dapat dibaca yang memungkinkan pembacaan tanpa salinan yang efisien dari array yang dibuat secara acak. Daripada menggunakan ukuran potongan yang telah ditentukan sebelumnya sebesar 1.024, metode ini mencoba mengisi buffer yang disediakan developer, sehingga memungkinkan kontrol penuh.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Mekanisme aliran yang dapat ditulis
Aliran yang dapat ditulis adalah tujuan tempat Anda dapat menulis data, yang direpresentasikan di JavaScript oleh objek
WritableStream
. Hal ini berfungsi sebagai abstraksi di atas sink yang mendasarinya—sink I/O tingkat bawah tempat data mentah ditulis.
Data ditulis ke aliran melalui penulis, satu bagian dalam satu waktu. Potongan dapat memiliki berbagai bentuk, seperti potongan dalam pembaca. Anda dapat menggunakan kode apa pun yang Anda suka untuk menghasilkan potongan yang siap ditulis; penulis beserta kode terkait disebut produser.
Saat pembuat dibuat dan mulai menulis ke aliran (penulis aktif), pembuat tersebut dikatakan dikunci ke aliran tersebut. Hanya satu penulis yang dapat menulis ke stream yang dapat ditulis pada satu waktu. Jika Anda ingin penulis lain mulai menulis ke aliran Anda, biasanya Anda perlu melepaskannya, sebelum Anda melampirkan penulis lain ke aliran tersebut.
Antrean internal melacak potongan yang telah ditulis ke aliran, tetapi belum diproses oleh sink yang mendasarinya.
Strategi antrean adalah objek yang menentukan cara aliran harus memberi sinyal tekanan balik berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran untuk setiap bagian, dan membandingkan total ukuran semua bagian dalam antrean dengan jumlah yang ditentukan, yang dikenal sebagai batas atas.
Konstruksi akhir disebut pengontrol. Setiap stream yang dapat ditulis memiliki pengontrol terkait yang memungkinkan Anda mengontrol stream (misalnya, untuk membatalkannya).
Membuat stream yang dapat ditulis
Antarmuka WritableStream
dari
Streams API menyediakan abstraksi standar untuk menulis data streaming ke tujuan, yang dikenal
sebagai sink. Objek ini dilengkapi dengan backpressure dan antrean bawaan. Anda membuat stream yang dapat ditulis dengan
memanggil konstruktornya
WritableStream()
.
Metode ini memiliki parameter underlyingSink
opsional, yang merepresentasikan objek
dengan metode dan properti yang menentukan perilaku instance stream yang dibuat.
underlyingSink
underlyingSink
dapat mencakup metode opsional yang ditentukan developer berikut. Parameter controller
yang diteruskan ke beberapa metode adalah
WritableStreamDefaultController
.
start(controller)
: Metode ini dipanggil segera saat objek dibuat. Isi metode ini harus bertujuan untuk mendapatkan akses ke sink pokok. Jika proses ini akan dilakukan secara asinkron, proses ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan.write(chunk, controller)
: Metode ini akan dipanggil saat potongan data baru (yang ditentukan dalam parameterchunk
) siap ditulis ke sink yang mendasarinya. Metode ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan operasi tulis. Metode ini hanya akan dipanggil setelah penulisan sebelumnya berhasil, dan tidak pernah setelah aliran ditutup atau dibatalkan.close(controller)
: Metode ini akan dipanggil jika aplikasi memberi sinyal bahwa aplikasi telah selesai menulis chunk ke aliran. Konten harus melakukan apa pun yang diperlukan untuk menyelesaikan penulisan ke sink yang mendasarinya, dan melepaskan akses ke sink tersebut. Jika proses ini asinkron, proses ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Metode ini hanya akan dipanggil setelah semua penulisan yang diantrekan berhasil.abort(reason)
: Metode ini akan dipanggil jika aplikasi memberi sinyal bahwa aplikasi ingin menutup streaming secara tiba-tiba dan menempatkannya dalam status error. Metode ini dapat membersihkan resource yang ditahan, seperticlose()
, tetapiabort()
akan dipanggil meskipun operasi tulis diantrekan. Chunk tersebut akan dibuang. Jika proses ini asinkron, proses dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Parameterreason
berisiDOMString
yang menjelaskan alasan streaming dibatalkan.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Antarmuka
WritableStreamDefaultController
Streams API merepresentasikan pengontrol yang memungkinkan kontrol status WritableStream
selama penyiapan, saat lebih banyak potongan dikirimkan untuk penulisan, atau di akhir penulisan. Saat membuat
WritableStream
, sink yang mendasarinya diberi instance WritableStreamDefaultController
yang sesuai untuk dimanipulasi. WritableStreamDefaultController
hanya memiliki satu metode:
WritableStreamDefaultController.error()
,
yang menyebabkan error pada interaksi mendatang dengan aliran data terkait.
WritableStreamDefaultController
juga mendukung properti signal
yang menampilkan instance
AbortSignal
,
sehingga operasi WritableStream
dapat dihentikan jika diperlukan.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Argumen kedua, yang juga opsional, dari konstruktor WritableStream()
adalah queuingStrategy
.
Objek ini secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua parameter:
highWaterMark
: Angka non-negatif yang menunjukkan tanda air tinggi dari aliran yang menggunakan strategi antrean ini.size(chunk)
: Fungsi yang menghitung dan menampilkan ukuran non-negatif terbatas dari nilai potongan yang diberikan. Hasilnya digunakan untuk menentukan tekanan balik, yang ditampilkan melalui propertiWritableStreamDefaultWriter.desiredSize
yang sesuai.
Metode getWriter()
dan write()
Untuk menulis ke stream yang dapat ditulis, Anda memerlukan penulis, yang akan berupa
WritableStreamDefaultWriter
. Metode getWriter()
antarmuka WritableStream
menampilkan instance
baru WritableStreamDefaultWriter
dan mengunci aliran ke instance tersebut. Saat
stream dikunci, penulis lain tidak dapat diperoleh hingga penulis saat ini dilepaskan.
Metode write()
dari antarmuka
WritableStreamDefaultWriter
menulis potongan data yang diteruskan ke WritableStream
dan sink dasarnya, lalu menampilkan
promise yang diselesaikan untuk menunjukkan keberhasilan atau kegagalan operasi tulis. Perhatikan bahwa arti "berhasil" bergantung pada sink yang mendasarinya; hal ini mungkin menunjukkan bahwa chunk telah diterima, dan tidak berarti chunk tersebut disimpan dengan aman ke tujuan akhirnya.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Properti locked
Anda dapat memeriksa apakah aliran data yang dapat ditulis dikunci dengan mengakses properti
WritableStream.locked
-nya.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Contoh kode stream yang dapat ditulis
Contoh kode di bawah menunjukkan semua langkah yang sedang dilakukan.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Menyalurkan aliran yang dapat dibaca ke aliran yang dapat ditulis
Stream yang dapat dibaca dapat disalurkan ke stream yang dapat ditulis melalui metode
pipeTo()
stream yang dapat dibaca.
ReadableStream.pipeTo()
menyalurkan ReadableStream
saat ini ke WritableStream
tertentu dan menampilkan
promise yang terpenuhi saat proses penyaluran berhasil diselesaikan, atau menolak jika ada error yang
ditemukan.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Membuat stream transformasi
Antarmuka TransformStream
Streams API merepresentasikan sekumpulan data yang dapat diubah. Anda
membuat aliran transformasi dengan memanggil konstruktor TransformStream()
, yang membuat dan menampilkan
objek aliran transformasi dari handler yang diberikan. Konstruktor TransformStream()
menerima sebagai
argumen pertamanya objek JavaScript opsional yang merepresentasikan transformer
. Objek tersebut dapat
berisi salah satu metode berikut:
transformer
start(controller)
: Metode ini dipanggil segera saat objek dibuat. Biasanya, perintah ini digunakan untuk mengantrekan potongan awalan, menggunakancontroller.enqueue()
. Chunk tersebut akan dibaca dari sisi yang dapat dibaca, tetapi tidak bergantung pada penulisan apa pun ke sisi yang dapat ditulis. Jika proses awal ini bersifat asinkron, misalnya karena memerlukan upaya untuk mendapatkan potongan awalan, fungsi dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan; promise yang ditolak akan menyebabkan error pada aliran. Setiap pengecualian yang ditampilkan akan ditampilkan ulang oleh konstruktorTransformStream()
.transform(chunk, controller)
: Metode ini dipanggil saat chunk baru yang awalnya ditulis ke sisi yang dapat ditulis siap diubah. Implementasi streaming menjamin bahwa fungsi ini hanya akan dipanggil setelah transformasi sebelumnya berhasil, dan tidak pernah sebelumstart()
selesai atau setelahflush()
dipanggil. Fungsi ini melakukan pekerjaan transformasi yang sebenarnya dari aliran transformasi. Hasil dapat dimasukkan dalam antrean menggunakancontroller.enqueue()
. Hal ini memungkinkan satu bagian yang ditulis ke sisi yang dapat ditulis menghasilkan nol atau beberapa bagian di sisi yang dapat dibaca, bergantung pada berapa kalicontroller.enqueue()
dipanggil. Jika proses transformasi bersifat asinkron, fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan transformasi. Promise yang ditolak akan menyebabkan error pada sisi yang dapat dibaca dan dapat ditulis dari aliran transformasi. Jika tidak ada metodetransform()
yang diberikan, transformasi identitas akan digunakan, yang mengantrekan chunk yang tidak berubah dari sisi yang dapat ditulis ke sisi yang dapat dibaca.flush(controller)
: Metode ini dipanggil setelah semua bagian yang ditulis ke sisi yang dapat ditulis telah diubah dengan berhasil melewatitransform()
, dan sisi yang dapat ditulis akan ditutup. Biasanya, hal ini digunakan untuk mengantrekan potongan akhiran ke sisi yang dapat dibaca, sebelum sisi tersebut juga ditutup. Jika proses penghapusan bersifat asinkron, fungsi dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan; hasilnya akan dikomunikasikan kepada pemanggilstream.writable.write()
. Selain itu, promise yang ditolak akan menyebabkan error pada sisi stream yang dapat dibaca dan ditulis. Menampilkan pengecualian diperlakukan sama dengan menampilkan promise yang ditolak.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Strategi antrean writableStrategy
dan readableStrategy
Parameter opsional kedua dan ketiga dari konstruktor TransformStream()
adalah strategi antrean writableStrategy
dan readableStrategy
opsional. Proses tersebut didefinisikan seperti yang diuraikan di bagian
dapat dibaca dan dapat ditulis.
Contoh kode aliran transformasi
Contoh kode berikut menunjukkan cara kerja aliran transformasi sederhana.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Menyalurkan aliran yang dapat dibaca melalui aliran transformasi
Metode pipeThrough()
dari antarmuka ReadableStream
menyediakan cara yang dapat dirangkai untuk menyalurkan aliran saat ini
melalui aliran transformasi atau pasangan yang dapat ditulis/dibaca lainnya. Menyalurkan stream umumnya akan menguncinya selama durasi saluran, sehingga mencegah pembaca lain menguncinya.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Contoh kode berikutnya (sedikit dibuat-buat) menunjukkan cara menerapkan versi "berteriak" dari fetch()
yang mengubah semua teks menjadi huruf besar dengan menggunakan respons promise yang ditampilkan
sebagai aliran
dan mengubah huruf besar per bagian. Keuntungan dari pendekatan ini adalah Anda tidak perlu menunggu seluruh dokumen didownload, yang dapat membuat perbedaan besar saat menangani file besar.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
Demo di bawah menunjukkan cara kerja aliran yang dapat dibaca, ditulis, dan diubah. Dokumen ini juga menyertakan contoh
rantai pipa pipeThrough()
dan pipeTo()
, serta menunjukkan tee()
. Anda dapat menjalankan
demo di jendelanya sendiri atau melihat
kode sumber.
Aliran yang berguna tersedia di browser
Ada sejumlah aliran berguna yang langsung tersedia di browser. Anda dapat membuat
ReadableStream
dari blob dengan mudah. Metode stream() antarmuka Blob
menampilkan ReadableStream
yang setelah dibaca akan menampilkan data yang ada dalam blob. Ingat juga bahwa objek
File
adalah jenis
Blob
tertentu, dan dapat digunakan dalam konteks apa pun yang dapat dilakukan blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Varian streaming TextDecoder.decode()
dan TextEncoder.encode()
masing-masing disebut
TextDecoderStream
dan
TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Mengompresi atau mendekompresi file menjadi mudah dengan aliran transformasi
CompressionStream
dan
DecompressionStream
. Contoh kode di bawah menunjukkan cara mendownload spesifikasi Streams, mengompresinya (gzip) langsung di browser, dan menulis file yang dikompresi langsung ke disk.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
File System Access API
FileSystemWritableFileStream
dan aliran permintaan fetch()
eksperimental adalah
contoh aliran yang dapat ditulis di luar sana.
Serial API banyak menggunakan aliran yang dapat dibaca dan ditulis.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Terakhir, API WebSocketStream
mengintegrasikan streaming dengan WebSocket API.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Referensi yang berguna
- Spesifikasi streaming
- Demo pendamping
- Polyfill Streams
- 2016—tahun aliran data web
- Iterator dan generator asinkron
- Stream Visualizer
Ucapan terima kasih
Artikel ini ditinjau oleh Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley, dan Adam Rice. Postingan blog Jake Archibald sangat membantu saya dalam memahami streaming. Beberapa contoh kode terinspirasi oleh eksplorasi pengguna GitHub @bellbind dan sebagian besar teksnya dibangun berdasarkan MDN Web Docs on Streams. Penulis Streams Standard telah melakukan pekerjaan yang luar biasa dalam menulis spesifikasi ini. Gambar unggulan oleh Ryan Lara di Unsplash.