Scopri come utilizzare flussi leggibili, scrivibili e trasformare i flussi con l'API Streams.
L'API Streams ti consente di accedere in modo programmatico agli stream di dati ricevuti sulla rete
o creato con qualsiasi mezzo, localmente
e l'elaborazione con JavaScript. I flussi di dati prevedono l'analisi di una risorsa che vuoi ricevere, inviare o trasformare
in piccoli blocchi, per poi elaborarli gradualmente. Mentre lo streaming è qualcosa
i browser effettuano comunque quando ricevono asset come HTML o video da mostrare sulle pagine web,
non era mai stata disponibile per JavaScript prima dell'introduzione di fetch
con gli stream nel 2015.
In precedenza, se volevi elaborare una risorsa di qualche tipo (che si tratti di un video, di un file di testo e così via), occorre scaricare l'intero file, attendere che venga deserializzato in un formato adatto. per poi elaborarli. Dato che gli stream sono disponibili per JavaScript, tutto cambia. Ora puoi elaborare i dati non elaborati con JavaScript progressivamente non appena è disponibile sul client, senza dover generare un buffer, una stringa o un blob. Ciò sblocca una serie di casi d'uso, alcuni dei quali sono elencati di seguito:
- Effetti video:trasmette uno stream video leggibile attraverso uno stream di trasformazione che applica effetti in tempo reale.
- Decompressione dei dati:pipeline di un flusso di file attraverso un flusso di trasformazione che selettivamente (de)comprimela.
- Decodifica dell'immagine: pipeline di un flusso di risposta HTTP attraverso un flusso di trasformazione che decodifica i byte
in dati bitmap e quindi tramite un altro flusso di trasformazione che converte le bitmap in PNG. Se
installati all'interno del gestore
fetch
di un service worker, questo consente di eseguire il polyfill in modo trasparente nuovi formati di immagine come AVIF.
Supporto browser
ReadableStream e WritableStream
TransformStream
Concetti principali
Prima di entrare nel dettaglio dei vari tipi di stream, vorrei introdurre alcuni concetti fondamentali.
Pezzi
Un blocco è un singolo dato che viene scritto o letto da un flusso. Può essere qualsiasi
type; i flussi possono anche contenere blocchi di tipi diversi. La maggior parte delle volte, un blocco non sarà il più atomico
unità di dati per un determinato flusso. Ad esempio, un flusso di byte potrebbe contenere blocchi composti da 16
Uint8Array
unità KiB, anziché byte singoli.
Flussi leggibili
Un flusso leggibile rappresenta un'origine di dati da cui puoi leggere. In altre parole, i dati vengono forniti
da uno stream leggibile. Concretamente, uno stream leggibile è un'istanza dell'elemento ReadableStream
.
Flussi scrivibili
Un flusso scrivibile rappresenta una destinazione per i dati in cui è possibile scrivere. In altre parole, i dati
entra in uno stream scrivibile. Concretamente, un flusso scrivibile è un'istanza del
WritableStream
corso.
Trasformare i flussi
Un flusso di trasformazione è costituito da una coppia di flussi: uno stream scrivibile, noto come lato scrivibile.
e uno stream leggibile, noto come lato leggibile.
Una metafora del mondo reale potrebbe essere
interprete simultaneo
che traduce da una lingua all'altra al volo.
In una maniera specifica per il flusso di trasformazione, scrivendo
accessibile in scrittura fa sì che i nuovi dati vengano resi disponibili per la lettura
un lato leggibile. Concretamente, qualsiasi oggetto con una proprietà writable
e una proprietà readable
può essere pubblicato
come flusso di trasformazione. Tuttavia, la classe TransformStream
standard semplifica la creazione
una coppia di questo tipo che è ben intrecciata.
Catene per tubi
Gli stream vengono utilizzati principalmente collegandoli tra loro. Uno stream leggibile può essere trasmesso direttamente
a uno stream scrivibile utilizzando il metodo pipeTo()
del flusso leggibile oppure può essere inviato tramite pipe
o più trasformano i flussi, utilizzando il metodo pipeThrough()
del flusso leggibile. Una serie di
flussi collegati tra loro in questo modo sono dette "catene di tubi".
Contropressione
Una volta creata una catena di tubazioni, propaga i segnali relativi alla velocità di flusso dei blocchi e lo attraverso. Se un passaggio della catena non può ancora accettare chunk, propaga un segnale all'indietro attraverso la catena, fino a quando alla fonte originale non viene detto di smettere di produrre blocchi rapidamente. Questo processo di normalizzazione del flusso è chiamato contropressione.
Maglietta a pallone
È possibile eseguire il tethering di uno stream leggibile (il cui nome segue la forma della "T" maiuscola) utilizzando il metodo tee()
.
Questa operazione bloccherà lo stream, ovvero non lo renderà più utilizzabile direttamente. vengono create due nuove versioni
detti rami, che possono essere consumati in modo indipendente.
Anche il teeing è importante perché gli stream non possono essere riavvolti o riavviati; ulteriori informazioni su questo argomento sono in seguito.
I meccanismi di uno stream leggibile
Uno stream leggibile è un'origine dati rappresentata in JavaScript da un
Oggetto ReadableStream
che
da un'origine sottostante. La
ReadableStream()
crea e restituisce un oggetto flusso leggibile dai gestori specificati. Esistono due metodi
tipi di origine sottostante:
- Le origini push inviano costantemente i dati verso di te quando le accedi e sta a te avviare, mettere in pausa o annullare l'accesso allo stream. Alcuni esempi includono video stream in diretta, eventi inviati dal server, o WebSocket.
- Le origini pull richiedono di richiedere esplicitamente i dati dopo la connessione. Esempi
includi operazioni HTTP tramite chiamate
fetch()
oXMLHttpRequest
.
I dati in streaming vengono letti in sequenza in piccole parti denominate blocchi. Si dice che i blocchi posizionati in uno stream siano in coda. Ciò significa che sono in attesa pronti per essere letti. Una coda interna tiene traccia dei blocchi che non sono stati ancora letti.
Una strategia di accodamento è un oggetto che determina come un flusso dovrebbe segnalare la contropressione in base a lo stato della sua coda interna. La strategia di accodamento assegna una dimensione a ciascun blocco e confronta dimensione totale di tutti i blocchi in coda fino a un numero specificato, noto come livello massimo.
I blocchi all'interno dello stream vengono letti da un lettore. Questo lettore recupera i dati un blocco di dati che ti consente di eseguire qualsiasi tipo di operazione. Il lettore più l'altro di elaborazione di questo codice è definito consumatore.
Il costrutto successivo in questo contesto è chiamato controller. A ogni stream leggibile è associato un che, come suggerisce il nome, ti consente di controllare lo stream.
Solo un lettore può leggere uno stream alla volta. Quando un lettore viene creato e inizia a leggere uno stream. (ovvero, diventa un lettore attivo), è bloccato. Se vuoi che un altro lettore assuma il controllo per leggere lo stream, in genere devi rilasciare il primo lettore prima di fare qualsiasi altra cosa (anche se puoi usare la funzionalità tee per gli stream).
Creazione di uno stream leggibile
Puoi creare uno stream leggibile chiamando il relativo costruttore
ReadableStream()
Il costruttore ha un argomento facoltativo underlyingSource
, che rappresenta un oggetto
con metodi e proprietà che definiscono il comportamento dell'istanza di flusso creata.
underlyingSource
In questo modo è possibile utilizzare i seguenti metodi facoltativi definiti dallo sviluppatore:
start(controller)
: viene richiamata immediatamente quando l'oggetto viene creato. La può accedere all'origine del flusso e svolgere qualsiasi altra operazione necessari per configurare la funzionalità di streaming. Se questo processo deve essere eseguito in modo asincrono, il metodo può restituiscono una promessa che indica l'esito positivo o negativo. Il parametrocontroller
passato a questo metodo è unReadableStreamDefaultController
pull(controller)
: può essere utilizzata per controllare il flusso quando vengono recuperati più blocchi. it viene richiamata ripetutamente purché la coda di chunk interna dello stream non sia piena, fino a quando raggiunge il suo picco. Se il risultato della chiamata apull()
è una promessa,pull()
non verrà richiamato finché non verrà soddisfatta la promessa. Se la promessa viene rifiutata, lo streaming diventerà errato.cancel(reason)
: richiamato quando il consumatore dello stream annulla lo stream.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
supporta i seguenti metodi:
ReadableStreamDefaultController.close()
chiude lo stream associato.ReadableStreamDefaultController.enqueue()
accoda un determinato blocco nel flusso associato.ReadableStreamDefaultController.error()
causa l'errore di eventuali interazioni future con lo stream associato.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream()
è queuingStrategy
.
È un oggetto che definisce facoltativamente una strategia di accodamento per il flusso, che richiede due
parametri:
highWaterMark
: un numero non negativo che indica il livello massimo dello stream utilizzando questa strategia di accodamento.size(chunk)
: una funzione che calcola e restituisce la dimensione non negativa finita del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietàReadableStreamDefaultController.desiredSize
appropriata. Inoltre, stabilisce quando viene chiamato il metodopull()
dell'origine sottostante.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Metodi getReader()
e read()
Per leggere da uno stream leggibile, hai bisogno di un lettore, che sarà un
ReadableStreamDefaultReader
Il metodo getReader()
dell'interfaccia ReadableStream
crea un lettore e blocca lo stream su
li annotino. Mentre lo stream è bloccato, nessun altro lettore può essere acquisito fino a quando questo non viene rilasciato.
La read()
dell'interfaccia ReadableStreamDefaultReader
restituisce una promessa che fornisce l'accesso al
un blocco note nella coda interna del flusso. L'annuncio soddisfa o rifiuta con un risultato che dipende dallo stato di
durante lo streaming. Le diverse possibilità sono le seguenti:
- Se un blocco è disponibile, la promessa verrà soddisfatta con un oggetto del modulo
{ value: chunk, done: false }
. - Se lo stream viene chiuso, la promessa verrà soddisfatta con un oggetto nel formato
{ value: undefined, done: true }
. - In caso di errori relativi allo stream, la promessa viene rifiutata con l'errore pertinente.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
La proprietà locked
Puoi verificare se uno stream leggibile è bloccato accedendo al relativo
ReadableStream.locked
proprietà.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Esempi di codice di flusso leggibili
L'esempio di codice seguente mostra tutti i passaggi in azione. Innanzitutto, crei un ReadableStream
che nei suoi
L'argomento underlyingSource
(ovvero la classe TimestampSource
) definisce un metodo start()
.
Questo metodo indica all'controller
dello stream di
enqueue()
un timestamp ogni secondo per dieci secondi.
Infine, comunica al controller di close()
lo stream. Usi questo
lo stream creando un lettore con il metodo getReader()
e chiamando read()
finché lo stream non viene
done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iterazione asincrona
Controllare a ogni iterazione del loop read()
se lo stream è done
potrebbe non essere l'API più comoda.
Fortunatamente, presto ci sarà un modo migliore per farlo: l'iterazione asincrona.
for await (const chunk of stream) {
console.log(chunk);
}
Una soluzione alternativa per utilizzare l'iterazione asincrona è implementare il comportamento con un polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Riproduzione di uno stream leggibile
Il metodo tee()
del metodo
L'interfaccia ReadableStream
avvia lo stream leggibile corrente, restituendo un array a due elementi
contenente i due rami risultanti come nuove istanze ReadableStream
. Ciò consente
due lettori per leggere uno stream contemporaneamente. Potresti farlo, ad esempio, in un service worker se
si desidera recuperare una risposta dal server e trasmetterla in modalità flusso al browser, ma anche
e la cache del service worker. Poiché il corpo di una risposta non può essere utilizzato più di una volta, sono necessarie due copie
per eseguire questa operazione. Per annullare lo stream, devi annullare entrambi i rami risultanti. Avviamento di un live streaming
in genere lo bloccheranno per l'intera durata, impedendo ad altri lettori di bloccarlo.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Flussi di byte leggibili
Per gli stream che rappresentano i byte, viene fornita una versione estesa dello stream leggibile in modo efficiente, in particolare riducendo al minimo le copie. I flussi di byte consentono il buffer del modello lettori (BYOB) da acquisire. L'implementazione predefinita può fornire una gamma di output diversi come come stringhe o buffer di array nel caso di WebSocket, mentre i flussi di byte garantiscono l'output dei byte. Inoltre, i lettori BYOB offrono vantaggi in termini di stabilità. Questo è perché se un buffer si scollega, è possibile garantire che non scriva due volte nello stesso buffer, evitando così le condizioni di gara. I lettori BYOB possono ridurre il numero di volte in cui il browser deve essere eseguito garbage collection, perché può riutilizzare i buffer.
Creazione di un flusso di byte leggibile
Puoi creare uno stream di byte leggibile passando un parametro type
aggiuntivo alla
costruttore ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
All'origine sottostante di un flusso di byte leggibile viene assegnato un ReadableByteStreamController
a
manipolare. Il suo metodo ReadableByteStreamController.enqueue()
prende un argomento chunk
il cui valore
è un ArrayBufferView
. La proprietà ReadableByteStreamController.byobRequest
restituisce lo stato attuale
Richiesta di pull BYOB o nullo in caso contrario. Infine, ReadableByteStreamController.desiredSize
restituisce la dimensione desiderata per riempire la coda interna dello stream controllato.
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream()
è queuingStrategy
.
È un oggetto che definisce facoltativamente una strategia di accodamento per il flusso, che prende una
:
highWaterMark
: un numero non negativo di byte che indica il livello elevato dello stream utilizzando questa strategia di accodamento. Viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietàReadableByteStreamController.desiredSize
appropriata. Inoltre, stabilisce quando viene chiamato il metodopull()
dell'origine sottostante.
Metodi getReader()
e read()
Puoi quindi ottenere l'accesso a un ReadableStreamBYOBReader
impostando il parametro mode
di conseguenza:
ReadableStream.getReader({ mode: "byob" })
. Ciò consente un controllo più preciso sul buffer
allo scopo di evitare copie. Per leggere dal flusso di byte, devi chiamare
ReadableStreamBYOBReader.read(view)
, dove view
è un
ArrayBufferView
Esempio di codice di flusso di byte leggibili
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
La seguente funzione restituisce flussi di byte leggibili che consentono una lettura zero-copy efficiente di un generato in modo casuale. Anziché utilizzare una dimensione del chunk predeterminata di 1024, tenta di riempire fornito dallo sviluppatore, consentendo il controllo completo.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
I meccanismi di uno stream scrivibile
Un flusso in scrittura è una destinazione in cui è possibile scrivere dati, rappresentata in JavaScript da un
WritableStream
. Questo
funge da astrazione sopra un sink sottostante, ovvero un sink di I/O di livello inferiore in cui
vengono scritti dati non elaborati.
I dati vengono scritti nel flusso tramite un writer, un blocco alla volta. Un chunk può prendere un una moltitudine di forme, proprio come i blocchi in un lettore. Puoi utilizzare il codice che desideri generare i blocchi pronti per la scrittura; l'autore e il codice associato è chiamato producer.
Quando un autore viene creato e inizia a scrivere su uno stream (ossia un autore attivo), si parla di bloccato. Solo un autore può scrivere in uno stream scrivibile alla volta. Se vuoi un'altra autore per iniziare a scrivere sul tuo stream, in genere devi rilasciarlo e poi allegarlo a un altro autore.
Una coda interna tiene traccia dei blocchi scritti nel flusso, ma non ancora vengono elaborati dal sink sottostante.
Una strategia di accodamento è un oggetto che determina come un flusso dovrebbe segnalare la contropressione in base a lo stato della sua coda interna. La strategia di accodamento assegna una dimensione a ciascun blocco e confronta dimensione totale di tutti i blocchi in coda fino a un numero specificato, noto come livello massimo.
Il costrutto finale è chiamato controller. A ogni flusso scrivibile è associato un controller ti consente di controllare lo stream (ad esempio per interromperlo).
Creazione di un flusso scrivibile
L'interfaccia di WritableStream
di
l'API Streams fornisce un'astrazione standard per la scrittura di flussi di dati in una destinazione, nota
come lavandino. Questo oggetto è dotato di contropressione e accodamento integrati. Puoi creare uno stream scrivibile
chiama il suo costruttore
WritableStream()
Ha un parametro facoltativo underlyingSink
, che rappresenta un oggetto
con metodi e proprietà che definiscono il comportamento dell'istanza di flusso creata.
underlyingSink
underlyingSink
può includere i seguenti metodi facoltativi definiti dallo sviluppatore. controller
passato ad alcuni metodi è un parametro
WritableStreamDefaultController
start(controller)
: questo metodo viene chiamato immediatamente quando l'oggetto viene creato. La i contenuti di questo metodo dovrebbero avere l'accesso al sink sottostante. Se questo processo deve essere se eseguita in modo asincrono, può restituire una promessa per segnalare l'esito positivo o negativo.write(chunk, controller)
: questo metodo viene chiamato quando un nuovo blocco di dati (specificato nelchunk
) è pronto per essere scritto nel sink sottostante. Può restituire una promessa è un segnale di successo o errore dell'operazione di scrittura. Questo metodo verrà chiamato solo dopo i precedenti scritture riuscite e mai dopo la chiusura o l'interruzione del flusso.close(controller)
: questo metodo verrà chiamato se l'app segnala che ha terminato la scrittura di grandi dimensioni nel flusso. I contenuti devono fare tutto ciò che è necessario per finalizzare le scritture sink sottostante e rilasciarne l'accesso. Se questo processo è asincrono, può restituire un promettere di indicare successo o fallimento. Questo metodo viene chiamato solo dopo tutte le scritture in coda hanno avuto successo.abort(reason)
: questo metodo verrà chiamato se l'app segnala che vuole chiudere bruscamente il flusso e lo metteranno in stato di errore. Può eseguire la pulizia di tutte le risorse conservate, comeclose()
, maabort()
verrà chiamato anche se le scritture sono in coda. Questi blocchi verranno lanciati di distanza. Se questo processo è asincrono, può restituire una promessa per segnalare l'esito positivo o negativo. La Il parametroreason
contiene un valoreDOMString
che descrive perché lo stream è stato interrotto.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
La
WritableStreamDefaultController
dell'API Streams rappresenta un controller che consente il controllo dello stato di WritableStream
durante l'impostazione, man mano che vengono inviati più blocchi per la scrittura o alla fine della scrittura. Durante la creazione
un WritableStream
, al sink sottostante viene assegnato un WritableStreamDefaultController
da manipolare. WritableStreamDefaultController
ha un solo metodo:
WritableStreamDefaultController.error()
,
causando un errore in tutte le interazioni future con lo stream associato.
WritableStreamDefaultController
supporta anche una proprietà signal
che restituisce un'istanza di
AbortSignal
,
consentendo l'arresto di un'operazione WritableStream
se necessario.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore WritableStream()
è queuingStrategy
.
È un oggetto che definisce facoltativamente una strategia di accodamento per il flusso, che richiede due
parametri:
highWaterMark
: un numero non negativo che indica il livello massimo dello stream utilizzando questa strategia di accodamento.size(chunk)
: una funzione che calcola e restituisce la dimensione non negativa finita del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietàWritableStreamDefaultWriter.desiredSize
appropriata.
Metodi getWriter()
e write()
Per scrivere in uno stream scrivibile, hai bisogno di un writer, che sarà un
WritableStreamDefaultWriter
. Il metodo getWriter()
dell'interfaccia WritableStream
restituisce un
nuova istanza di WritableStreamDefaultWriter
e blocca il flusso a quell'istanza. Mentre
lo stream viene bloccato, nessun altro autore può essere acquisito fino a quando non viene rilasciato quello corrente.
La write()
del metodo
WritableStreamDefaultWriter
scrive un blocco di dati passato in WritableStream
e nel sink sottostante, quindi restituisce
una promessa che si risolve a indicare l'esito positivo o negativo dell'operazione di scrittura. Ricorda che
"riuscito" fino al sink sottostante; potrebbe indicare che il blocco è stato accettato
e non necessariamente che venga salvato in sicurezza
alla sua destinazione definitiva.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
La proprietà locked
Puoi verificare se uno stream scrivibile è bloccato accedendo alla relativa
WritableStream.locked
proprietà.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Esempio di codice di flusso scrivibile
L'esempio di codice seguente mostra tutti i passaggi nella pratica.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Collegamento di uno stream leggibile a uno stream scrivibile
Uno stream leggibile può essere inviato a uno stream scrivibile attraverso
pipeTo()
.
ReadableStream.pipeTo()
indirizza l'elemento ReadableStream
corrente a un determinato WritableStream
e restituisce un
una promessa che si soddisfa quando il processo di piping viene completato correttamente o rifiuta se eventuali errori sono stati
riscontrati.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Creazione di un flusso di trasformazione
L'interfaccia TransformStream
dell'API Streams rappresenta un set di dati trasformabili. Tu
crea un flusso di trasformazione chiamando il suo costruttore TransformStream()
, che crea e restituisce
un oggetto di flusso di trasformazione dai gestori specificati. Il costruttore TransformStream()
accetta
al suo primo argomento un oggetto JavaScript facoltativo che rappresenta transformer
. Questi oggetti possono
che contengono uno dei seguenti metodi:
transformer
start(controller)
: questo metodo viene chiamato immediatamente quando l'oggetto viene creato. Tipicamente questo viene utilizzato per accodare blocchi di prefissi, utilizzandocontroller.enqueue()
. Questi blocchi verranno letti dal lato leggibile ma non dipendono da scritture sul lato scrivibile. Se questo numero è asincrono, ad esempio perché l'acquisizione dei blocchi di prefisso richiede uno sforzo, la funzione può restituire una promessa di segnalare l'esito positivo o negativo. una promessa rifiutata genera un errore flusso di dati. Eventuali eccezioni generate verranno nuovamente generate dal costruttoreTransformStream()
.transform(chunk, controller)
: questo metodo viene chiamato quando un nuovo blocco originariamente scritto nel in scrittura è pronta per essere trasformata. L'implementazione del flusso garantisce che questa funzione verrà chiamato solo dopo che le trasformazioni precedenti sono andate a buon fine e mai prima chestart()
completata o dopo la chiamata diflush()
. Questa funzione esegue l'effettiva trasformazione del flusso di trasformazione. Può accodare i risultati utilizzandocontroller.enqueue()
. Questo consente a un singolo blocco scritto sul lato scrivibile di produrre zero o più blocchi sul un lato leggibile, a seconda di quante volte viene chiamatocontroller.enqueue()
. Se il processo di di trasformazione è asincrona, questa funzione può restituire una promessa per segnalare l'esito positivo o negativo del la trasformazione. Una promessa rifiutata genera un errore sia sul lato leggibile sia su quello scrivibile della e trasformare un flusso di dati. Se non viene fornito alcun metodotransform()
, viene utilizzata la trasformazione dell'identità, che accoda blocchi senza modifiche dal lato scrivibile al lato leggibile.flush(controller)
: questo metodo viene chiamato dopo che tutti i blocchi scritti sul lato scrivibile sono stati trasformata correttamente passando pertransform()
e il lato scrivibile sta per essere chiuso. In genere viene utilizzato per accodare blocchi di suffissi sul lato leggibile, anche prima di questo viene chiusa. Se il processo di svuotamento è asincrono, la funzione può restituire una promessa a indicatori di esito positivo o negativo; il risultato verrà comunicato al chiamante distream.writable.write()
. Inoltre, una promessa rifiutata genera un errore sia nella risposta leggibile lati in scrittura dello stream. La generazione di un'eccezione viene considerata come la restituzione di un'eccezione la promessa.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Strategie di accodamento writableStrategy
e readableStrategy
Il secondo e il terzo parametro facoltativi del costruttore TransformStream()
sono facoltativi
Strategie di accodamento writableStrategy
e readableStrategy
. Sono definiti come descritto
leggibile e in scrittura
rispettivamente.
Esempio di codice di flusso per la trasformazione
Il seguente esempio di codice mostra un semplice flusso di trasformazione in azione.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Inserimento di un flusso leggibile attraverso un flusso di trasformazione
La pipeThrough()
dell'interfaccia ReadableStream
fornisce un modo concatenabile per collegare il flusso di corrente
attraverso un flusso di trasformazione o qualsiasi altra coppia scrivibile/leggibile. In genere, la segnalazione di uno stream blocca
per tutta la durata della pipeline, impedendo ad altri lettori di bloccarla.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Il prossimo esempio di codice (un po' inventato) mostra come implementare un "shouting" versione di fetch()
che maiuscole tutto il testo utilizzando la promessa di risposta restituita
come stream
e l'utilizzo delle maiuscole
in blocchi per blocco. Il vantaggio di questo approccio è che non occorre attendere
l'intero documento da scaricare, il che può fare un'enorme differenza quando si gestiscono file di grandi dimensioni.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
La demo seguente mostra i flussi leggibili, scrivibili e trasformabili in azione. Include anche esempi
di pipeThrough()
e pipeTo()
catene di tubi e dimostra anche tee()
. Se vuoi, puoi eseguire
alla demo nella sua finestra oppure consulta la
codice sorgente.
Stream utili disponibili nel browser
Esistono diversi stream utili integrati direttamente nel browser. Puoi creare facilmente un
ReadableStream
da un blob. La Blob
il metodo stream() dell'interfaccia restituisce
un ReadableStream
che, alla lettura, restituisce i dati contenuti nel blob. Ricorda inoltre che
L'oggetto File
è un tipo specifico di
Blob
e può essere utilizzato in qualsiasi contesto utilizzabile da un blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Le varianti di streaming di TextDecoder.decode()
e TextEncoder.encode()
vengono chiamate
TextDecoderStream
e
TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
È facile comprimere o decomprimere un file
CompressionStream
e
DecompressionStream
trasforma i flussi
rispettivamente. L'esempio di codice seguente mostra come scaricare la specifica Streams, comprimerla (gzip)
nel browser e scrivi il file compresso direttamente sul disco.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
L'API File System Access dell'API
FileSystemWritableFileStream
:
mentre gli stream di richiesta fetch()
sperimentali sono
esempi di flussi scrivibili in circolazione.
L'API Serial fa un uso intensivo degli stream leggibili e scrivibili.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Infine, l'API WebSocketStream
integra gli stream con l'API WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Risorse utili
- Specifica degli stream
- Demo associate
- polyfill degli stream
- 2016: l'anno degli stream web
- Generatori e iteratori asincroni
- Visualizzatore stream
Ringraziamenti
Questo articolo è stato esaminato da Jake Archibald François Beaufort Mario Rossi Mattias Buelens, Surma, Joe Medley e Andrea Rossi. I post del blog di Jake Archibald mi hanno aiutato molto a capire i flussi di dati. Alcuni esempi di codice sono ispirati dall'utente GitHub Le esplorazioni e le funzionalità di @bellbind parti della prosa si basano fortemente sul Documenti web MDN sugli stream. La Streams standard autori hanno fatto un enorme lavoro scrivere questa specifica. Immagine hero di Ryan Lara su Annulla schermata.