Scopri come utilizzare i flussi leggibili, scrivibili e di trasformazione con l'API Streams.
L'API Streams consente di accedere in modo programmatico ai flussi di dati ricevuti tramite la rete o creati localmente con qualsiasi mezzo e di elaborarli con JavaScript. Lo streaming prevede la suddivisione di una risorsa che vuoi ricevere, inviare o trasformare
in piccoli blocchi, che vengono poi elaborati un po' alla volta. Lo streaming è qualcosa che i browser fanno comunque quando ricevono asset come HTML o video da mostrare nelle pagine web, ma questa funzionalità non è mai stata disponibile per JavaScript prima dell'introduzione di fetch
con gli stream nel 2015.
In precedenza, se volevi elaborare una risorsa di qualche tipo (che si tratti di un video, di un file di testo e così via), dovevi scaricare l'intero file, attendere che venisse deserializzato in un formato adatto e poi elaborarlo. Con gli stream disponibili per JavaScript, tutto cambia. Ora puoi elaborare i dati non elaborati con JavaScript in modo progressivo non appena sono disponibili sul client, senza dover generare un buffer, una stringa o un blob. Ciò consente di sbloccare una serie di casi d'uso, alcuni dei quali sono elencati di seguito:
- Effetti video:invio di un flusso video leggibile tramite un flusso di trasformazione che applica gli effetti in tempo reale.
- Compressione/decompressione dei dati:invio di un flusso di file tramite un flusso di trasformazione che lo comprime/decomprime in modo selettivo.
- Decodifica delle immagini:invio di un flusso di risposta HTTP tramite un flusso di trasformazione che decodifica i byte
in dati bitmap e poi tramite un altro flusso di trasformazione che traduce le bitmap in PNG. Se
installato all'interno del gestore
fetch
di un service worker, ciò consente di eseguire il polyfill in modo trasparente di nuovi formati di immagine come AVIF.
Supporto browser
ReadableStream e WritableStream
TransformStream
Concetti principali
Prima di entrare nel dettaglio dei vari tipi di stream, vorrei presentarti alcuni concetti fondamentali.
Bocconcini
Un chunk è un singolo dato scritto o letto da uno stream. Può essere di qualsiasi
tipo; gli stream possono contenere anche blocchi di tipi diversi. La maggior parte delle volte, un segmento non sarà l'unità di dati più atomica per un determinato stream. Ad esempio, un flusso di byte potrebbe contenere blocchi costituiti da 16
KiB Uint8Array
, anziché singoli byte.
Stream leggibili
Uno stream leggibile rappresenta un'origine dati da cui puoi leggere. In altre parole, i dati vengono
estratti da uno stream leggibile. In concreto, un flusso leggibile è un'istanza della classe ReadableStream
.
Stream scrivibili
Uno stream scrivibile rappresenta una destinazione in cui puoi scrivere i dati. In altre parole, i dati
vengono inseriti in uno stream scrivibile. Nello specifico, un flusso scrivibile è un'istanza della classe WritableStream
.
Trasformare gli stream
Un flusso di trasformazione è costituito da una coppia di flussi: un flusso scrivibile, noto come lato scrivibile,
e un flusso leggibile, noto come lato leggibile.
Una metafora del mondo reale potrebbe essere un
interprete simultaneo
che traduce da una lingua all'altra in tempo reale.
In modo specifico per lo stream di trasformazione, la scrittura
sul lato scrivibile comporta la disponibilità di nuovi dati per la lettura dal
lato leggibile. In concreto, qualsiasi oggetto con una proprietà writable
e una proprietà readable
può fungere da flusso di trasformazione. Tuttavia, la classe TransformStream
standard semplifica la creazione
di una coppia di questo tipo correttamente entangled.
Catene per tubi
Gli stream vengono utilizzati principalmente per collegarli tra loro. Un flusso leggibile può essere inviato direttamente
a un flusso scrivibile utilizzando il metodo pipeTo()
del flusso leggibile oppure può essere inviato tramite uno
o più flussi di trasformazione utilizzando il metodo pipeThrough()
del flusso leggibile. Un insieme di
stream collegati tra loro in questo modo viene chiamato pipe chain.
Contropressione
Una volta costruita una catena di pipe, questa propagherà i segnali relativi alla velocità con cui i chunk devono fluire al suo interno. Se un passaggio della catena non può ancora accettare i segmenti, propaga un segnale all'indietro attraverso la catena di pipe, finché alla fine alla sorgente originale viene comunicato di interrompere la produzione di segmenti così rapidamente. Questo processo di normalizzazione del flusso è chiamato contropressione.
Teeing
Uno stream leggibile può essere suddiviso (il nome deriva dalla forma della lettera maiuscola "T") utilizzando il metodo tee()
.
In questo modo, lo stream verrà bloccato, ovvero non sarà più utilizzabile direttamente. Tuttavia, verranno creati due nuovi stream, chiamati rami, che possono essere utilizzati in modo indipendente.
Il teeing è importante anche perché gli stream non possono essere riavvolti o riavviati. Scopri di più in seguito.
Meccanismi di un flusso leggibile
Un flusso leggibile è un'origine dati rappresentata in JavaScript da un oggetto
ReadableStream
che
deriva da un'origine sottostante. Il costruttore
ReadableStream()
crea e restituisce un oggetto stream leggibile dai gestori specificati. Esistono due tipi di origine sottostante:
- Le origini push inviano costantemente dati quando vi accedi e spetta a te avviare, mettere in pausa o annullare l'accesso allo stream. Alcuni esempi includono stream video live, eventi inviati dal server o WebSocket.
- Le origini pull richiedono di richiedere esplicitamente i dati una volta connessi. Alcuni esempi
includono operazioni HTTP tramite chiamate
fetch()
oXMLHttpRequest
.
I dati di streaming vengono letti in sequenza in piccole parti chiamate chunk. I chunk inseriti in un flusso vengono accodati. Ciò significa che sono in attesa in una coda pronti per essere letti. Una coda interna tiene traccia dei chunk che non sono ancora stati letti.
Una strategia di accodamento è un oggetto che determina in che modo un flusso deve segnalare la contropressione in base allo stato della sua coda interna. La strategia di accodamento assegna una dimensione a ogni blocco e confronta la dimensione totale di tutti i blocchi nella coda con un numero specificato, noto come livello di riempimento.
I blocchi all'interno dello stream vengono letti da un lettore. Questo lettore recupera i dati un blocco alla volta, consentendoti di eseguire qualsiasi tipo di operazione. Il lettore più l'altro codice di elaborazione associato viene chiamato consumatore.
Il costrutto successivo in questo contesto è chiamato controller. Ogni flusso leggibile ha un controller associato che, come suggerisce il nome, ti consente di controllare il flusso.
Un solo lettore può leggere un flusso alla volta; quando un lettore viene creato e inizia a leggere un flusso (ovvero diventa un lettore attivo), viene bloccato. Se vuoi che un altro lettore prenda il tuo posto nella lettura dello stream, in genere devi rilasciare il primo lettore prima di fare qualsiasi altra cosa (anche se puoi fare tee degli stream).
Creare uno stream leggibile
Crea uno stream leggibile chiamando il relativo costruttore
ReadableStream()
.
Il costruttore ha un argomento facoltativo underlyingSource
, che rappresenta un oggetto
con metodi e proprietà che definiscono il comportamento dell'istanza dello stream creata.
underlyingSource
Può utilizzare i seguenti metodi facoltativi definiti dallo sviluppatore:
start(controller)
: viene chiamato immediatamente quando viene creato l'oggetto. Il metodo può accedere all'origine del flusso ed eseguire qualsiasi altra operazione necessaria per configurare la funzionalità di streaming. Se questa operazione deve essere eseguita in modo asincrono, il metodo può restituire una promessa per segnalare l'esito positivo o negativo. Il parametrocontroller
passato a questo metodo è unReadableStreamDefaultController
.pull(controller)
: può essere utilizzato per controllare lo stream man mano che vengono recuperati altri chunk. Viene chiamato ripetutamente finché la coda interna di segmenti dello stream non è piena, fino a quando la coda non raggiunge il livello massimo. Se il risultato della chiamata dipull()
è una promessa,pull()
non verrà richiamato finché la promessa non sarà mantenuta. Se la promessa viene rifiutata, lo stream genererà un errore.cancel(reason)
: chiamato quando il consumatore dello stream annulla lo stream.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
supporta i seguenti metodi:
ReadableStreamDefaultController.close()
chiude lo stream associato.ReadableStreamDefaultController.enqueue()
accoda un determinato blocco nello stream associato.ReadableStreamDefaultController.error()
causa errori nelle interazioni future con lo stream associato.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream()
è queuingStrategy
.
È un oggetto che definisce facoltativamente una strategia di accodamento per lo stream, che accetta due
parametri:
highWaterMark
: un numero non negativo che indica il livello massimo del flusso che utilizza questa strategia di accodamento.size(chunk)
: una funzione che calcola e restituisce la dimensione finita non negativa del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietàReadableStreamDefaultController.desiredSize
appropriata. Determina anche quando viene chiamato il metodopull()
dell'origine sottostante.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
I metodi getReader()
e read()
Per leggere da uno stream leggibile, è necessario un lettore, che sarà un
ReadableStreamDefaultReader
.
Il metodo getReader()
dell'interfaccia ReadableStream
crea un lettore e blocca lo stream. Mentre lo stream è bloccato, non è possibile acquisire altri lettori finché questo non viene rilasciato.
Il metodo read()
dell'interfaccia ReadableStreamDefaultReader
restituisce una promessa che fornisce l'accesso al successivo
chunk nella coda interna dello stream. Esegue o rifiuta con un risultato a seconda dello stato
dello stream. Le diverse possibilità sono le seguenti:
- Se è disponibile un blocco, la promessa verrà soddisfatta con un oggetto del modulo
{ value: chunk, done: false }
. - Se lo stream viene chiuso, la promessa verrà soddisfatta con un oggetto del modulo
{ value: undefined, done: true }
. - Se lo stream genera un errore, la promessa verrà rifiutata con l'errore pertinente.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Proprietà locked
Puoi verificare se un flusso leggibile è bloccato accedendo alla relativa proprietà
ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Esempi di codice di stream leggibili
L'esempio di codice riportato di seguito mostra tutti i passaggi in azione. Per prima cosa, crea un ReadableStream
che nell'argomento
underlyingSource
(ovvero la classe TimestampSource
) definisce un metodo start()
.
Questo metodo indica al controller
dello stream di
enqueue()
un timestamp ogni secondo per dieci secondi.
Infine, indica al controller di close()
lo stream. Consumi questo
stream creando un lettore tramite il metodo getReader()
e chiamando read()
finché lo stream non è
done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iterazione asincrona
Controllare a ogni iterazione del ciclo read()
se lo stream è done
potrebbe non essere l'API più conveniente.
Fortunatamente, presto ci sarà un modo migliore per farlo: l'iterazione asincrona.
for await (const chunk of stream) {
console.log(chunk);
}
Una soluzione alternativa per utilizzare l'iterazione asincrona oggi è implementare il comportamento con un polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Creare un flusso leggibile
Il metodo tee()
dell'interfaccia
ReadableStream
prepara lo stream leggibile corrente, restituendo un array di due elementi
contenente i due rami risultanti come nuove istanze ReadableStream
. In questo modo
due lettori possono leggere un flusso contemporaneamente. Ad esempio, potresti farlo in un service worker se vuoi recuperare una risposta dal server e trasmetterla in streaming al browser, ma anche alla cache del service worker. Poiché un corpo della risposta non può essere utilizzato più di una volta, sono necessarie due copie
per farlo. Per annullare lo stream, devi annullare entrambi i rami risultanti. L'avvio di uno stream
in genere lo blocca per tutta la durata, impedendo ad altri lettori di bloccarlo.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Stream di byte leggibili
Per i flussi che rappresentano byte, viene fornita una versione estesa del flusso leggibile per gestire i byte in modo efficiente, in particolare riducendo al minimo le copie. I flussi di byte consentono l'acquisizione di lettori BYOB (Bring Your Own Buffer). L'implementazione predefinita può fornire una gamma di output diversi, ad esempio stringhe o buffer di array nel caso di WebSocket, mentre i flussi di byte garantiscono l'output di byte. Inoltre, i lettori BYOB hanno vantaggi in termini di stabilità. Questo perché se un buffer si stacca, può garantire che non venga scritto due volte nello stesso buffer, evitando così le condizioni di competizione. I lettori BYOB possono ridurre il numero di volte in cui il browser deve eseguire la garbage collection, perché possono riutilizzare i buffer.
Creazione di uno stream di byte leggibile
Puoi creare un flusso di byte leggibile passando un parametro type
aggiuntivo al
costruttore ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
L'origine sottostante di un flusso di byte leggibile riceve un ReadableByteStreamController
da
manipolare. Il suo metodo ReadableByteStreamController.enqueue()
accetta un argomento chunk
il cui valore
è un ArrayBufferView
. La proprietà ReadableByteStreamController.byobRequest
restituisce l'attuale
pull request BYOB o null se non è presente. Infine, la proprietà ReadableByteStreamController.desiredSize
restituisce le dimensioni desiderate per riempire la coda interna dello stream controllato.
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream()
è queuingStrategy
.
È un oggetto che definisce facoltativamente una strategia di accodamento per lo stream, che accetta un
parametro:
highWaterMark
: un numero non negativo di byte che indica il livello massimo del flusso utilizzando questa strategia di accodamento. Viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietàReadableByteStreamController.desiredSize
appropriata. Determina anche quando viene chiamato il metodopull()
dell'origine sottostante.
I metodi getReader()
e read()
Puoi quindi accedere a un ReadableStreamBYOBReader
impostando il parametro mode
di conseguenza:
ReadableStream.getReader({ mode: "byob" })
. Ciò consente un controllo più preciso dell'allocazione del buffer per evitare copie. Per leggere dallo stream di byte, devi chiamare
ReadableStreamBYOBReader.read(view)
, dove view
è un
ArrayBufferView
.
Esempio di codice di flusso di byte leggibile
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
La seguente funzione restituisce flussi di byte leggibili che consentono una lettura efficiente senza copia di un array generato in modo casuale. Anziché utilizzare una dimensione del blocco predeterminata di 1024, tenta di riempire il buffer fornito dallo sviluppatore, consentendo il controllo completo.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Meccanismi di un flusso scrivibile
Un flusso scrivibile è una destinazione in cui puoi scrivere dati, rappresentata in JavaScript da un oggetto
WritableStream
. Questo
funge da astrazione sopra un sink sottostante, un sink I/O di livello inferiore in cui
vengono scritti i dati non elaborati.
I dati vengono scritti nello stream tramite un writer, un blocco alla volta. Un blocco può assumere molte forme, proprio come i blocchi in un lettore. Puoi utilizzare il codice che preferisci per produrre i blocchi pronti per la scrittura; lo scrittore più il codice associato sono chiamati produttore.
Quando viene creato un writer e inizia a scrivere in uno stream (un writer attivo), si dice che è bloccato. Solo un writer può scrivere in uno stream scrivibile alla volta. Se vuoi che un altro autore inizi a scrivere nel tuo stream, in genere devi rilasciarlo prima di allegare un altro autore.
Una coda interna tiene traccia dei blocchi scritti nel flusso ma non ancora elaborati dal sink sottostante.
Una strategia di accodamento è un oggetto che determina in che modo un flusso deve segnalare la contropressione in base allo stato della sua coda interna. La strategia di accodamento assegna una dimensione a ogni blocco e confronta la dimensione totale di tutti i blocchi nella coda con un numero specificato, noto come livello di riempimento.
L'ultimo costrutto è chiamato controller. Ogni stream scrivibile ha un controller associato che ti consente di controllarlo (ad esempio, di interromperlo).
Creare uno stream scrivibile
L'interfaccia WritableStream
dell'API Streams fornisce un'astrazione standard per la scrittura di dati di streaming in una destinazione, nota come sink. Questo oggetto è dotato di contropressione e accodamento integrati. Crea uno stream scrivibile chiamando il relativo costruttore WritableStream()
.
Ha un parametro facoltativo underlyingSink
, che rappresenta un oggetto
con metodi e proprietà che definiscono il comportamento dell'istanza dello stream creata.
underlyingSink
underlyingSink
può includere i seguenti metodi facoltativi definiti dallo sviluppatore. Il parametro controller
passato ad alcuni metodi è un
WritableStreamDefaultController
.
start(controller)
: questo metodo viene chiamato immediatamente quando viene creato l'oggetto. I contenuti di questo metodo devono mirare a ottenere l'accesso al sink sottostante. Se questo processo deve essere eseguito in modo asincrono, può restituire una promessa per segnalare l'esito positivo o negativo.write(chunk, controller)
: questo metodo verrà chiamato quando un nuovo blocco di dati (specificato nel parametrochunk
) è pronto per essere scritto nel sink sottostante. Può restituire una promessa per segnalare l'esito positivo o negativo dell'operazione di scrittura. Questo metodo verrà chiamato solo dopo che le scritture precedenti sono andate a buon fine e mai dopo la chiusura o l'interruzione del flusso.close(controller)
: Questo metodo verrà chiamato se l'app segnala di aver terminato la scrittura dei chunk nel flusso. I contenuti devono fare tutto il necessario per finalizzare le scritture nel sink sottostante e rilasciarne l'accesso. Se questo processo è asincrono, può restituire una promessa per segnalare l'esito positivo o negativo. Questo metodo verrà chiamato solo dopo che tutte le scritture in coda sono state eseguite correttamente.abort(reason)
: questo metodo verrà chiamato se l'app segnala che vuole chiudere bruscamente lo stream e metterlo in uno stato di errore. Può pulire le risorse in attesa, proprio comeclose()
, maabort()
verrà chiamato anche se le scritture sono in coda. Questi blocchi verranno eliminati. Se questo processo è asincrono, può restituire una promessa per segnalare l'esito positivo o negativo. Il parametroreason
contiene unDOMString
che descrive il motivo per cui lo stream è stato interrotto.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
L'interfaccia
WritableStreamDefaultController
dell'API Streams rappresenta un controller che consente di controllare lo stato di un WritableStream
durante la configurazione, man mano che vengono inviati altri chunk per la scrittura o al termine della scrittura. Quando viene creato
un WritableStream
, al sink sottostante viene fornita un'istanza WritableStreamDefaultController
corrispondente da manipolare. WritableStreamDefaultController
ha un solo metodo:
WritableStreamDefaultController.error()
,
che causa errori in qualsiasi interazione futura con lo stream associato.
WritableStreamDefaultController
supporta anche una proprietà signal
che restituisce un'istanza di
AbortSignal
,
consentendo l'interruzione di un'operazione WritableStream
, se necessario.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore WritableStream()
è queuingStrategy
.
È un oggetto che definisce facoltativamente una strategia di accodamento per lo stream, che accetta due
parametri:
highWaterMark
: un numero non negativo che indica il livello massimo del flusso che utilizza questa strategia di accodamento.size(chunk)
: una funzione che calcola e restituisce la dimensione finita non negativa del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietàWritableStreamDefaultWriter.desiredSize
appropriata.
I metodi getWriter()
e write()
Per scrivere in uno stream scrivibile, devi disporre di un writer, che sarà un
WritableStreamDefaultWriter
. Il metodo getWriter()
dell'interfaccia WritableStream
restituisce una nuova istanza di WritableStreamDefaultWriter
e blocca lo stream su questa istanza. Mentre lo
stream è bloccato, non è possibile acquisire un altro autore finché quello attuale non viene rilasciato.
Il metodo write()
dell'interfaccia
WritableStreamDefaultWriter
scrive un blocco di dati passato in un WritableStream
e nel relativo sink sottostante, quindi restituisce
una promessa che si risolve per indicare l'esito positivo o negativo dell'operazione di scrittura. Tieni presente che il significato di "successo" dipende dal sink sottostante. Potrebbe indicare che il blocco è stato accettato e non necessariamente che è stato salvato in modo sicuro nella sua destinazione finale.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Proprietà locked
Puoi verificare se un flusso scrivibile è bloccato accedendo alla relativa proprietà
WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Esempio di codice di stream scrivibile
Il codice di esempio riportato di seguito mostra tutti i passaggi in azione.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Trasferimento di un flusso leggibile a un flusso scrivibile
Un flusso leggibile può essere inviato a un flusso scrivibile tramite il metodo
pipeTo()
del flusso leggibile.
ReadableStream.pipeTo()
invia la ReadableStream
corrente a un determinato WritableStream
e restituisce una
promessa che viene soddisfatta al termine del processo di piping o rifiutata in caso di errori.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Creazione di un flusso di trasformazione
L'interfaccia TransformStream
dell'API Streams rappresenta un insieme di dati trasformabili. Crei
un flusso di trasformazione chiamando il relativo costruttore TransformStream()
, che crea e restituisce
un oggetto flusso di trasformazione dai gestori specificati. Il costruttore TransformStream()
accetta come
primo argomento un oggetto JavaScript facoltativo che rappresenta transformer
. Questi oggetti possono
contenere uno dei seguenti metodi:
transformer
start(controller)
: questo metodo viene chiamato immediatamente quando viene creato l'oggetto. In genere viene utilizzato per accodare i blocchi di prefisso, utilizzandocontroller.enqueue()
. Questi blocchi verranno letti dal lato leggibile, ma non dipendono da alcuna scrittura sul lato scrivibile. Se questo processo iniziale è asincrono, ad esempio perché è necessario un certo impegno per acquisire i blocchi di prefissi, la funzione può restituire una promessa per segnalare l'esito positivo o negativo; una promessa rifiutata genererà un errore nel flusso. Eventuali eccezioni generate verranno generate nuovamente dal costruttoreTransformStream()
.transform(chunk, controller)
: questo metodo viene chiamato quando un nuovo blocco scritto originariamente sul lato scrivibile è pronto per essere trasformato. L'implementazione dello stream garantisce che questa funzione venga chiamata solo dopo che le trasformazioni precedenti sono andate a buon fine e mai prima chestart()
sia stata completata o dopo che è stata chiamataflush()
. Questa funzione esegue il lavoro di trasformazione effettivo dello stream di trasformazione. Può mettere in coda i risultati utilizzandocontroller.enqueue()
. Ciò consente a un singolo blocco scritto sul lato scrivibile di generare zero o più blocchi sul lato leggibile, a seconda di quante volte viene chiamatocontroller.enqueue()
. Se il processo di trasformazione è asincrono, questa funzione può restituire una promessa per segnalare l'esito positivo o negativo della trasformazione. Una promessa rifiutata genererà un errore sia per la parte leggibile che per quella scrivibile dello stream di trasformazione. Se non viene fornito alcun metodotransform()
, viene utilizzata la trasformazione dell'identità, che accoda i blocchi invariati dal lato scrivibile al lato leggibile.flush(controller)
: questo metodo viene chiamato dopo che tutti i blocchi scritti sul lato scrivibile sono stati trasformati superando correttamentetransform()
e il lato scrivibile sta per essere chiuso. In genere viene utilizzato per accodare i blocchi di suffisso al lato leggibile, prima che anche questo venga chiuso. Se il processo di svuotamento è asincrono, la funzione può restituire una promessa per segnalare l'esito positivo o negativo; il risultato verrà comunicato al chiamante distream.writable.write()
. Inoltre, una promessa rifiutata genererà un errore sia nel lato leggibile che in quello scrivibile dello stream. Generare un'eccezione equivale a restituire una promessa rifiutata.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Le strategie di accodamento writableStrategy
e readableStrategy
Il secondo e il terzo parametro facoltativo del costruttore TransformStream()
sono le strategie di accodamento facoltative
writableStrategy
e readableStrategy
. Sono definiti come descritto nelle sezioni
stream leggibile e stream scrivibile rispettivamente.
Esempio di codice di trasformazione dello stream
Il seguente esempio di codice mostra un semplice stream di trasformazione in azione.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Invio di un flusso leggibile tramite un flusso di trasformazione
Il metodo pipeThrough()
dell'interfaccia ReadableStream
fornisce un modo concatenabile per inviare il flusso corrente
tramite un flusso di trasformazione o qualsiasi altra coppia scrivibile/leggibile. Il piping di uno stream in genere lo blocca
per la durata del pipe, impedendo ad altri lettori di bloccarlo.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Il seguente esempio di codice (un po' artificioso) mostra come implementare una versione "urlata" di fetch()
che converte tutto il testo in maiuscolo consumando la promessa di risposta restituita
come flusso
e convertendo in maiuscolo blocco per blocco. Il vantaggio di questo approccio è che non devi attendere il download dell'intero documento, il che può fare una grande differenza quando si ha a che fare con file di grandi dimensioni.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
La demo seguente mostra i flussi leggibili, scrivibili e di trasformazione in azione. Include anche esempi
di catene di pipe pipeThrough()
e pipeTo()
e mostra anche tee()
. Se vuoi, puoi eseguire la demo in una finestra separata o visualizzare il codice sorgente.
Stream utili disponibili nel browser
Nel browser sono integrati diversi stream utili. Puoi creare facilmente un
ReadableStream
da un blob. Il metodo stream() dell'interfaccia Blob
restituisce un ReadableStream
che, una volta letto, restituisce i dati contenuti nel blob. Ricorda inoltre che un oggetto
File
è un tipo specifico di
Blob
e può essere utilizzato in qualsiasi contesto in cui può essere utilizzato un blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Le varianti di streaming di TextDecoder.decode()
e TextEncoder.encode()
sono chiamate
TextDecoderStream
e
TextEncoderStream
rispettivamente.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
La compressione o la decompressione di un file è semplice con i flussi di trasformazione
CompressionStream
e
DecompressionStream
rispettivamente. L'esempio di codice riportato di seguito mostra come scaricare la specifica Streams, comprimerla (gzip)
direttamente nel browser e scrivere il file compresso direttamente sul disco.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
L'API File System Access,
FileSystemWritableFileStream
e gli stream di richieste sperimentali fetch()
sono
esempi di stream scrivibili in natura.
L'API Serial utilizza molto sia i flussi leggibili che quelli scrivibili.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Infine, l'API WebSocketStream
integra gli stream con l'API WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Risorse utili
- Specifiche degli stream
- Demo di accompagnamento
- Polyfill per Streams
- 2016: l'anno degli stream web
- Iteratori e generatori asincroni
- Visualizzatore di stream
Ringraziamenti
Questo articolo è stato rivisto da Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley e Adam Rice. I post del blog di Jake Archibald mi hanno aiutato molto a capire gli stream. Alcuni esempi di codice sono ispirati alle esplorazioni dell'utente GitHub @bellbind e parti del testo si basano in gran parte sulla documentazione MDN Web Docs su Streams. Gli autori dello standard Streams hanno fatto un ottimo lavoro nella stesura di questa specifica. Immagine hero di Ryan Lara su Unsplash.