Scopri come utilizzare stream leggibili, scrivibili e di trasformazione con l'API Streams.
L'API Streams consente di accedere in modo programmatico ai flussi di dati ricevuti tramite la rete o creati con qualsiasi mezzo localmente e di elaborarli con JavaScript. Lo streaming prevede la suddivisione di una risorsa che vuoi ricevere, inviare o trasformare in piccoli blocchi, per poi elaborarli bit per bit. Anche se lo streaming è un compito che i browser svolgono comunque quando ricevono asset come HTML o video da mostrare sulle pagine web, questa funzionalità non è mai stata disponibile per JavaScript prima dell'introduzione di fetch
con gli stream nel 2015.
In precedenza, se volevi elaborare una risorsa di qualche tipo (un video, un file di testo e così via), dovevano scaricare l'intero file, attendere che venisse deserializzato in un formato adatto e poi elaborarlo. Con gli stream disponibili per JavaScript, tutto cambia. Ora puoi elaborare i dati non elaborati con JavaScript progressivamente, non appena sono disponibili sul client, senza dover generare un buffer, una stringa o un blob. Ciò sblocca una serie di casi d'uso, alcuni dei quali sono elencati di seguito:
- Effetti video: inoltro di uno stream video leggibile tramite uno stream di trasformazione che applica gli effetti in tempo reale.
- (De)compressione dei dati: inoltro di uno stream di file tramite uno stream di trasformazione che lo (de)comprime in modo selettivo.
- Decodifica delle immagini: inoltro di uno stream di risposta HTTP tramite uno stream di trasformazione che decodifica i byte in dati bitmap e poi tramite un altro stream di trasformazione che traduce le bitmap in PNG. Se lo installi all'interno dell'handler
fetch
di un service worker, puoi eseguire il polyfill in modo trasparente di nuovi formati di immagini come AVIF.
Supporto browser
ReadableStream e WritableStream
TransformStream
Concetti principali
Prima di entrare nei dettagli sui vari tipi di stream, vorrei presentarti alcuni concetti fondamentali.
Blocchi
Un chunk è un singolo dato scritto in uno stream o letto da uno stream. Può essere di qualsiasi tipo; gli stream possono anche contenere chunk di tipi diversi. Nella maggior parte dei casi, un chunk non sarà l'unità di dati più atomica per un determinato stream. Ad esempio, uno stream di byte potrebbe contenere chunk costituiti da unità di 16 KiB Uint8Array
anziché da singoli byte.
Stream leggibili
Uno stream leggibile rappresenta un'origine dati da cui puoi leggere. In altre parole, i dati vengono
generati da uno stream leggibile. Nello specifico, uno stream leggibile è un'istanza della classe ReadableStream
.
Stream scrivibili
Uno stream in cui è possibile scrivere rappresenta una destinazione per i dati in cui puoi scrivere. In altre parole, i dati
vengono inseriti in uno stream in cui è possibile scrivere. Nello specifico, uno stream in scrittura è un'istanza della classeWritableStream
.
Trasformare gli stream
Uno stream di trasformazione è costituito da una coppia di stream: uno stream in scrittura, noto come lato in scrittura,
e uno stream in lettura, noto come lato in lettura.
Una metafora del mondo reale potrebbe essere un
interprete simultaneo
che traduce da una lingua all'altra in tempo reale.
In modo specifico per lo stream di trasformazione, la scrittura sul lato in cui è possibile scrivere comporta la disponibilità di nuovi dati per la lettura dal lato in cui è possibile leggere. Nello specifico, qualsiasi oggetto con una proprietà writable
e una proprietà readable
può essere utilizzato come stream di trasformazione. Tuttavia, la classe TransformStream
standard semplifica la creazione di una coppia di questo tipo correttamente intrecciata.
Catene per tubi
Gli stream vengono utilizzati principalmente collegandoli tra loro. Uno stream leggibile può essere incanalato direttamente
in uno stream scrivibile, utilizzando il metodo pipeTo()
dello stream leggibile, oppure può essere incanalato prima in uno
o più stream di trasformazione, utilizzando il metodo pipeTo()
dello stream leggibile.pipeThrough()
Un insieme di stream collegati tra loro in questo modo è noto come catena di pipe.
Pressione di ricircolo
Una volta creata una catena di pipe, vengono propagati gli indicatori relativi alla velocità con cui i chunk devono attraversarla. Se un passaggio della catena non è ancora in grado di accettare i chunk, viene propagato un segnale all'indietro tramite la catena di pipe, fino a quando all'origine originale non viene chiesto di smettere di produrre chunk così rapidamente. Questo processo di normalizzazione del flusso è chiamato contropressione.
Teeing
Uno stream leggibile può essere suddiviso (dal nome della forma di una "T" maiuscola) utilizzando il relativo metodo tee()
.
In questo modo, lo stream verrà bloccato, ovvero non sarà più utilizzabile direttamente, ma verranno creati due nuovi stream, chiamati branch, che possono essere utilizzati in modo indipendente.
È importante anche perché gli stream non possono essere riavvolti o riavviati. Scopri di più in seguito.
La struttura di uno stream leggibile
Uno stream leggibile è un'origine dati rappresentata in JavaScript da un oggetto
ReadableStream
che proviene da un'origine sottostante. Il
costruttore
ReadableStream()
crea e restituisce un oggetto stream leggibile dai gestori specificati. Esistono due tipi di origini sottostanti:
- Le origini push inviano costantemente dati quando li hai aperti ed è tua responsabilità avviare, mettere in pausa o annullare l'accesso allo stream. Alcuni esempi sono stream video in diretta, eventi inviati dal server o WebSocket.
- Le origini pull richiedono di richiedere esplicitamente i dati dopo la connessione. Alcuni esempi sono le operazioni HTTP tramite chiamate
fetch()
oXMLHttpRequest
.
I dati dello stream vengono letti in sequenza in piccoli pezzi chiamati chunk. I chunk inseriti in uno stream vengono messi in coda. Ciò significa che sono in attesa in una coda pronte per essere lette. Una coda interna tiene traccia dei chunk che non sono stati ancora letti.
Una strategia di coda è un oggetto che determina in che modo uno stream deve segnalare la contropressione in base allo stato della coda interna. La strategia di coda assegna una dimensione a ogni chunk e confronta la dimensione totale di tutti i chunk nella coda con un numero specificato, noto come soglia massima.
I chunk all'interno dello stream vengono letti da un lettore. Questo lettore recupera i dati un chunk alla volta, consentendoti di eseguire qualsiasi tipo di operazione. Il lettore e l'altro codice di elaborazione associato sono chiamati consumer.
Il costrutto successivo in questo contesto è chiamato controller. Ogni stream leggibile ha un controllo associato che, come suggerisce il nome, ti consente di controllarlo.
Solo un lettore può leggere uno stream alla volta. Quando un lettore viene creato e inizia a leggere uno stream (ovvero diventa un lettore attivo), viene bloccato. Se vuoi che un altro lettore prenda il controllo della lettura dello stream, in genere devi rilasciare il primo lettore prima di fare qualsiasi altra cosa (anche se puoi dirigere gli stream).
Creazione di uno stream leggibile
Puoi creare uno stream leggibile chiamando il relativo costruttore
ReadableStream()
.
Il costruttore ha un argomento facoltativo underlyingSource
, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza dello stream creata.
underlyingSource
È possibile utilizzare i seguenti metodi facoltativi definiti dallo sviluppatore:
start(controller)
: viene chiamato immediatamente al momento della costruzione dell'oggetto. Il metodo può accedere all'origine dello stream ed eseguire qualsiasi altra operazione necessaria per configurare la funzionalità di streaming. Se questa procedura deve essere eseguita in modo asincrono, il metodo può restituire una promessa per segnalare il successo o l'errore. Il parametrocontroller
passato a questo metodo è unReadableStreamDefaultController
.pull(controller)
: può essere utilizzato per controllare lo stream man mano che vengono recuperati altri chunk. Viene chiamato ripetutamente finché la coda interna di chunk dello stream non è piena, fino a quando la coda non raggiunge il suo picco. Se il risultato della chiamata apull()
è una promessa,pull()
non verrà richiamato di nuovo finché la promessa non verrà soddisfatta. Se la promessa viene rifiutata, lo stream genera un errore.cancel(reason)
: viene chiamato quando il consumatore dello stream annulla lo stream.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
supporta i seguenti metodi:
ReadableStreamDefaultController.close()
chiude lo stream associato.ReadableStreamDefaultController.enqueue()
inserisce in coda un determinato chunk nello stream associato.ReadableStreamDefaultController.error()
causerà errori in qualsiasi interazione futura con lo stream associato.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream()
è queuingStrategy
.
Si tratta di un oggetto che, facoltativamente, definisce una strategia di coda per lo stream, che accetta due parametri:
highWaterMark
: un numero non negativo che indica il livello massimo dello stream che utilizza questa strategia di coda.size(chunk)
: una funzione che calcola e restituisce la dimensione finita non negativa del valore del chunk specificato. Il risultato viene utilizzato per determinare la contropressione, che viene visualizzata tramite la proprietàReadableStreamDefaultController.desiredSize
appropriata. Regola anche quando viene chiamato il metodopull()
dell'origine sottostante.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
I metodi getReader()
e read()
Per leggere da uno stream leggibile, è necessario un lettore, che sarà un
ReadableStreamDefaultReader
.
Il metodo getReader()
dell'interfaccia ReadableStream
crea un lettore e blocca lo stream su di esso. Mentre lo stream è bloccato, non è possibile acquisire altri lettori finché questo non viene rilasciato.
Il metodo read()
dell'interfaccia ReadableStreamDefaultReader
restituisce una promessa che fornisce l'accesso al prossimo
chunk nella coda interna dello stream. Soddisfa o rifiuta con un risultato a seconda dello stato del stream. Le diverse possibilità sono le seguenti:
- Se è disponibile un chunk, la promessa verrà soddisfatta con un oggetto del tipo
{ value: chunk, done: false }
. - Se lo stream viene chiuso, la promessa verrà soddisfatta con un oggetto del tipo
{ value: undefined, done: true }
. - Se lo stream genera un errore, la promessa verrà rifiutata con l'errore pertinente.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
La proprietà locked
Puoi verificare se uno stream leggibile è bloccato accedendo alla relativa proprietà ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Esempi di codice per stream leggibili
L'esempio di codice riportato di seguito mostra tutti i passaggi in azione. Devi prima creare un ReadableStream
che nel suo
underlyingSource
argomento (ovvero la classe TimestampSource
) definisce un metodo start()
.
Questo metodo indica al controller
dello stream di
enqueue()
un timestamp ogni secondo per dieci secondi.
Infine, dice al controller di close()
lo stream. Puoi utilizzare questo
stream creando un lettore tramite il metodo getReader()
e chiamando read()
finché lo stream non è
done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iterazione asincrona
Controllare se lo stream è done
in ogni iterazione del loop read()
potrebbe non essere l'API più comoda.
Fortunatamente, a breve sarà disponibile un modo migliore per farlo: l'iterazione asincrona.
for await (const chunk of stream) {
console.log(chunk);
}
Una soluzione alternativa per utilizzare l'iterazione asincrona oggi è implementare il comportamento con un polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Creazione di uno stream leggibile
Il metodo tee()
dell'interfaccia ReadableStream
suddivide lo stream leggibile corrente, restituendo un array di due elementi contenente i due rami risultanti come nuove istanze ReadableStream
. In questo modo, due lettori possono leggere uno stream contemporaneamente. Ad esempio, puoi farlo in un service worker se vuoi recuperare una risposta dal server e trasmetterla in streaming al browser, ma anche alla cache del service worker. Poiché il corpo di una risposta non può essere utilizzato più di una volta, per farlo sono necessarie due copie. Per annullare lo stream, devi annullare entrambi i rami risultanti. In genere, la messa in primo piano di uno stream lo blocca per tutta la durata, impedendo ad altri lettori di bloccarlo.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Stream di byte leggibili
Per gli stream che rappresentano byte, viene fornita una versione estesa dello stream leggibile per gestire i byte in modo efficiente, in particolare riducendo al minimo le copie. Gli stream di byte consentono di acquisire lettori bring-your-own-buffer (BYOB). L'implementazione predefinita può fornire una serie di output diversi, come stringhe o buffer di array nel caso di WebSocket, mentre gli stream di byte garantiscono l'output di byte. Inoltre, i lettori BYOB offrono vantaggi in termini di stabilità. Questo accade perché se un buffer si scollega, è possibile garantire che non venga scritto nello stesso buffer due volte, evitando così le condizioni di gara. I lettori BYOB possono ridurre il numero di volte in cui il browser deve eseguire la raccolta dei rifiuti, perché può riutilizzare i buffer.
Creazione di uno stream di byte leggibile
Puoi creare uno stream di byte leggibile passando un parametro type
aggiuntivo al
costruttore ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
All'origine sottostante di uno stream di byte leggibile viene assegnato un ReadableByteStreamController
da manipolare. Il suo metodo ReadableByteStreamController.enqueue()
accetta un argomento chunk
il cui valore è un ArrayBufferView
. La proprietà ReadableByteStreamController.byobRequest
restituisce la richiesta pull BYOB corrente o null se non esiste. Infine, la proprietà ReadableByteStreamController.desiredSize
restituisce le dimensioni desiderate per riempire la coda interna dello stream controllato.
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream()
è queuingStrategy
.
Si tratta di un oggetto che, facoltativamente, definisce una strategia di coda per lo stream, che accetta un parametro:
highWaterMark
: un numero non negativo di byte che indica il limite massimo dello stream che utilizza questa strategia di coda. Viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietàReadableByteStreamController.desiredSize
appropriata. Regola anche quando viene chiamato il metodopull()
dell'origine sottostante.
I metodi getReader()
e read()
Puoi quindi accedere a un ReadableStreamBYOBReader
impostando di conseguenza il parametro mode
:
ReadableStream.getReader({ mode: "byob" })
. Ciò consente un controllo più preciso sull'allocazione del buffer per evitare le copie. Per leggere dallo stream di byte, devi chiamare
ReadableStreamBYOBReader.read(view)
, dove view
è un
ArrayBufferView
.
Esempio di codice di stream di byte leggibile
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
La seguente funzione restituisce stream di byte leggibili che consentono una lettura efficiente senza copia di un array generato in modo casuale. Anziché utilizzare una dimensione del chunk predeterminata di 1024, tenta di riempire il buffer fornito dallo sviluppatore, consentendo il pieno controllo.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
La struttura di uno stream in cui è possibile scrivere
Uno stream in scrittura è una destinazione in cui puoi scrivere dati, rappresentata in JavaScript da un oggetto
WritableStream
. Questa
funzione consente di astrarre un destinatario sottostante, ovvero un destinatario I/O di livello inferiore in cui vengono scritti i dati non elaborati.
I dati vengono scritti nello stream tramite un scrittore, un blocco alla volta. Un chunk può assumere una molteplicità di forme, proprio come i chunk in un reader. Puoi utilizzare il codice che preferisci per produrre i chunk pronti per la scrittura. Lo scriptwriter più il codice associato è chiamato producer.
Quando viene creato uno scrittore e inizia a scrivere in uno stream (uno scrittore attivo), si dice che sia bloccato. Solo un autore può scrivere in uno stream scrivibile alla volta. Se vuoi che un altro autore inizi a scrivere nel tuo stream, in genere devi rilasciarlo prima di associarlo a un altro autore.
Una coda interna tiene traccia dei chunk che sono stati scritti nello stream, ma che non sono stati ancora elaborati dal sink sottostante.
Una strategia di coda è un oggetto che determina in che modo uno stream deve segnalare la contropressione in base allo stato della coda interna. La strategia di coda assegna una dimensione a ogni chunk e confronta la dimensione totale di tutti i chunk nella coda con un numero specificato, noto come soglia massima.
La struttura finale è chiamata controller. A ogni stream scrivibile è associato un controller che consente di controllarlo (ad esempio di interromperlo).
Creazione di uno stream in cui scrivere
L'interfaccia WritableStream
dell'API Streams fornisce un'astrazione standard per la scrittura di dati in streaming in una destinazione, nota come destinazione. Questo oggetto è dotato di backpressure e code integrate. Puoi creare uno stream in cui scrivere chiamando il relativo costruttore
WritableStream()
.
Ha un parametro facoltativo underlyingSink
, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza dello stream creata.
underlyingSink
underlyingSink
può includere i seguenti metodi facoltativi definiti dallo sviluppatore. Il parametro controller
trasmesso ad alcuni dei metodi è un
WritableStreamDefaultController
.
start(controller)
: questo metodo viene chiamato immediatamente al momento della creazione dell'oggetto. I contenuti di questo metodo devono avere come obiettivo l'accesso alla destinazione sottostante. Se questo processo deve essere eseguito in modo asincrono, può restituire una promessa per segnalare l'esito positivo o negativo.write(chunk, controller)
: questo metodo viene chiamato quando un nuovo blocco di dati (specificato nel parametrochunk
) è pronto per essere scritto nello scopo sottostante. Può restituire una promessa per indicare il successo o l'errore dell'operazione di scrittura. Questo metodo verrà chiamato solo dopo che le scritture precedenti sono andate a buon fine e mai dopo la chiusura o l'interruzione dello stream.close(controller)
: questo metodo viene chiamato se l'app indica di aver completato la scrittura di chunk nello stream. I contenuti devono fare tutto il necessario per finalizzare le scritture nel flusso di destinazione sottostante e rilasciarne l'accesso. Se questa procedura è asincrona, può restituire una promessa per segnalare l'esito positivo o negativo. Questo metodo verrà chiamato solo dopo che tutte le scritture messe in coda si sono concluse correttamente.abort(reason)
: questo metodo viene chiamato se l'app indica che vuole chiudere bruscamente lo stream e impostarlo in uno stato di errore. Può ripulire le risorse trattenute, in modo simile aclose()
, maabort()
verrà chiamato anche se le scritture sono in coda. Questi blocchi verranno eliminati. Se questo processo è asincrono, può restituire una promessa per segnalare l'esito positivo o negativo. Il parametroreason
contiene unDOMString
che descrive il motivo dell'interruzione dello stream.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
L'interfaccia
WritableStreamDefaultController
dell'API Streams rappresenta un controller che consente di controllare lo stato di un WritableStream
durante la configurazione, man mano che vengono inviati altri chunk per la scrittura o al termine della scrittura. Quando viene costruito un WritableStream
, al sink sottostante viene assegnata un'istanza WritableStreamDefaultController
corrispondente da manipolare. WritableStreamDefaultController
ha un solo metodo:
WritableStreamDefaultController.error()
,
che causa errori in qualsiasi interazione futura con lo stream associato.
WritableStreamDefaultController
supporta anche una proprietà signal
che restituisce un'istanza di
AbortSignal
,
consentendo di interrompere un'operazione WritableStream
, se necessario.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore WritableStream()
è queuingStrategy
.
Si tratta di un oggetto che, facoltativamente, definisce una strategia di coda per lo stream, che accetta due parametri:
highWaterMark
: un numero non negativo che indica il livello massimo dello stream che utilizza questa strategia di coda.size(chunk)
: una funzione che calcola e restituisce la dimensione finita non negativa del valore del chunk specificato. Il risultato viene utilizzato per determinare la contropressione, che viene visualizzata tramite la proprietàWritableStreamDefaultWriter.desiredSize
appropriata.
I metodi getWriter()
e write()
Per scrivere in uno stream modificabile, devi avere un autore, che sarà un
WritableStreamDefaultWriter
. Il metodo getWriter()
dell'interfaccia WritableStream
restituisce una nuova istanza di WritableStreamDefaultWriter
e blocca lo stream su quell'istanza. Mentre lo stream è bloccato, non è possibile acquisire altri autori finché non viene rilasciato quello corrente.
Il metodo write()
dell'interfaccia
WritableStreamDefaultWriter
scrive un blocco di dati passato in un WritableStream
e nel relativo sink sottostante, quindi restituisce
una promessa che si risolve per indicare il successo o l'errore dell'operazione di scrittura. Tieni presente che il significato di "successo" dipende dal sink sottostante; potrebbe indicare che il chunk è stato accettato e non necessariamente che sia stato salvato in modo sicuro nella destinazione finale.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
La proprietà locked
Puoi verificare se uno stream in scrittura è bloccato accedendo alla relativa proprietà WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Esempio di codice per stream modificabili
L'esempio di codice riportato di seguito mostra tutti i passaggi in azione.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Inoltro di uno stream leggibile a uno stream in cui è possibile scrivere
Uno stream leggibile può essere incanalato in uno stream scrivibile tramite il metodo
pipeTo()
dello stream leggibile.
ReadableStream.pipeTo()
inoltra l'attuale ReadableStream
a un determinato WritableStream
e restituisce una promessa che viene soddisfatta al termine del processo di inoltro o rifiutata se si sono verificati errori.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Creazione di uno stream di trasformazione
L'interfaccia TransformStream
dell'API Streams rappresenta un insieme di dati trasformabili. Puoi creare uno stream di trasformazione chiamando il relativo costruttore TransformStream()
, che crea e restituisce un oggetto stream di trasformazione dai gestori specificati. Il costruttore TransformStream()
accetta come
primo argomento un oggetto JavaScript facoltativo che rappresenta il transformer
. Questi oggetti possono contenere uno dei seguenti metodi:
transformer
start(controller)
: questo metodo viene chiamato immediatamente al momento della costruzione dell'oggetto. In genere, questo viene utilizzato per mettere in coda i chunk di prefisso utilizzandocontroller.enqueue()
. Questi chunk verranno letti dal lato leggibile, ma non dipendono da eventuali scritture sul lato scrivibile. Se questo processo iniziale è asincrono, ad esempio perché richiede un po' di impegno per acquisire i chunk del prefisso, la funzione può restituire una promessa per segnalare il successo o l'errore. Una promessa rifiutata causerà un errore nello stream. Eventuali eccezioni lanciate verranno rilanciate dal costruttoreTransformStream()
.transform(chunk, controller)
: questo metodo viene chiamato quando un nuovo chunk scritto inizialmente sul lato scrivibile è pronto per essere trasformato. L'implementazione dello stream garantisce che questa funzione verrà chiamata solo dopo il completamento delle trasformazioni precedenti e mai prima del completamento distart()
o dopo la chiamata diflush()
. Questa funzione esegue il lavoro di trasformazione effettivo dello stream di trasformazione. Può mettere in coda i risultati utilizzandocontroller.enqueue()
. In questo modo, un singolo chunk scritto sul lato in scrittura può generare zero o più chunk sul lato in lettura, a seconda del numero di volte in cui viene chiamatocontroller.enqueue()
. Se la procedura di trasformazione è asincrona, questa funzione può restituire una promessa per segnalare l'esito positivo o negativo della trasformazione. Una promessa rifiutata genera errori sia nei lati leggibili che in quelli scrivibili dello stream di trasformazione. Se non viene fornito alcun metodotransform()
, viene utilizzata la trasformazione dell'identità, che inserisce in coda i chunk invariati dal lato in cui è possibile scrivere a quello in cui è possibile leggere.flush(controller)
: questo metodo viene chiamato dopo che tutti i chunk scritti sul lato in scrittura sono stati trasformati passando correttamente pertransform()
e il lato in scrittura sta per essere chiuso. In genere viene utilizzato per mettere in coda i chunk dei suffissi sul lato leggibile, prima che anche questo si chiuda. Se il processo di svuotamento è asincrono, la funzione può restituire una promessa per indicare il successo o l'errore. Il risultato verrà comunicato al chiamante distream.writable.write()
. Inoltre, una promessa rifiutata genera errori sia sul lato leggibile sia su quello scrivibile dello stream. L'invio di un'eccezione viene trattato come il ritorno di una promessa rifiutata.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Le strategie di coda writableStrategy
e readableStrategy
Il secondo e il terzo parametro facoltativo del costruttore TransformStream()
sono facoltativi
writableStrategy
e readableStrategy
strategie di coda. Sono definiti come descritto rispettivamente nelle sezioni dello stream leggibile e scrivibile.
Esempio di codice dello stream di trasformazione
Il seguente esempio di codice mostra un semplice stream di trasformazione in azione.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Inoltro di uno stream leggibile tramite uno stream di trasformazione
Il metodo pipeThrough()
dell'interfaccia ReadableStream
fornisce un modo incatenabile per incanalare lo stream corrente
tramite uno stream di trasformazione o qualsiasi altra coppia di scrittura/lettura. In genere, l'inserimento di uno stream lo blocca per la durata del pipe, impedendo ad altri lettori di bloccarlo.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Il seguente esempio di codice (un po' artificioso) mostra come implementare una versione "urlata" di fetch()
che mette tutto il testo in maiuscolo utilizzando la promessa di risposta restituita
come stream
e mettendo in maiuscolo ogni blocco. Il vantaggio di questo approccio è che non devi attendere il download dell'intero documento, il che può fare un'enorme differenza quando si tratta di file di grandi dimensioni.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
La demo di seguito mostra gli stream leggibili, scrivibili e di trasformazione in azione. Include anche esempi di catene di pipe pipeThrough()
e pipeTo()
e mostra tee()
. Se vuoi, puoi eseguire la demo in una finestra separata o visualizzare il codice sorgente.
Stream utili disponibili nel browser
Il browser integra una serie di stream utili. Puoi creare facilmente un
ReadableStream
da un blob. Il metodo stream() dell'interfaccia Blob
restituisce un ReadableStream
che, al momento della lettura, restituisce i dati contenuti nel blob. Inoltre, ricorda che un oggetto
File
è un tipo specifico di
Blob
e può essere utilizzato in qualsiasi contesto in cui può essere utilizzato un blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Le varianti di streaming di TextDecoder.decode()
e TextEncoder.encode()
si chiamano
TextDecoderStream
e
TextEncoderStream
rispettivamente.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Comprimere o decomprimere un file è facile con gli stream di trasformazione
CompressionStream
e
DecompressionStream
. L'esempio di codice seguente mostra come scaricare la specifica Streams, comprimerla (gzip) direttamente nel browser e scrivere il file compresso direttamente sul disco.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
I flussi di richieste fetch()
sperimentali e FileSystemWritableFileStream
dell'API File System Access sono esempi di stream scrivibili disponibili pubblicamente.
L'API Serial fa un uso intensivo di stream sia leggibili che scrivibili.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Infine, l'API WebSocketStream
integra gli stream con l'API WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Risorse utili
- Specifiche degli stream
- Demo di accompagnamento
- Polyfill di Streams
- 2016: l'anno degli stream web
- Generatori e iteratori asincroni
- Visualizzatore di stream
Ringraziamenti
Questo articolo è stato esaminato da Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley e Adam Rice. I post del blog di Jake Archibald mi hanno aiutato molto a comprendere gli stream. Alcuni esempi di codice sono ispirati alle esplorazioni dell'utente GitHub @bellbind e alcune parti del testo si basano molto sulle documentazioni web di MDN su Streams. Gli autori dello standard Streams hanno svolto un lavoro straordinario per scrivere questa specifica. Immagine hero di Ryan Lara su Unsplash.