Stream: la guida definitiva

Scopri come utilizzare i flussi leggibili, scrivibili e di trasformazione con l'API Streams.

L'API Streams consente di accedere in modo programmatico ai flussi di dati ricevuti tramite la rete o creati localmente con qualsiasi mezzo e di elaborarli con JavaScript. Lo streaming prevede la suddivisione di una risorsa che vuoi ricevere, inviare o trasformare in piccoli blocchi, che vengono poi elaborati un po' alla volta. Lo streaming è qualcosa che i browser fanno comunque quando ricevono asset come HTML o video da mostrare nelle pagine web, ma questa funzionalità non è mai stata disponibile per JavaScript prima dell'introduzione di fetch con gli stream nel 2015.

In precedenza, se volevi elaborare una risorsa di qualche tipo (che si tratti di un video, di un file di testo e così via), dovevi scaricare l'intero file, attendere che venisse deserializzato in un formato adatto e poi elaborarlo. Con gli stream disponibili per JavaScript, tutto cambia. Ora puoi elaborare i dati non elaborati con JavaScript in modo progressivo non appena sono disponibili sul client, senza dover generare un buffer, una stringa o un blob. Ciò consente di sbloccare una serie di casi d'uso, alcuni dei quali sono elencati di seguito:

  • Effetti video:invio di un flusso video leggibile tramite un flusso di trasformazione che applica gli effetti in tempo reale.
  • Compressione/decompressione dei dati:invio di un flusso di file tramite un flusso di trasformazione che lo comprime/decomprime in modo selettivo.
  • Decodifica delle immagini:invio di un flusso di risposta HTTP tramite un flusso di trasformazione che decodifica i byte in dati bitmap e poi tramite un altro flusso di trasformazione che traduce le bitmap in PNG. Se installato all'interno del gestore fetch di un service worker, ciò consente di eseguire il polyfill in modo trasparente di nuovi formati di immagine come AVIF.

Supporto browser

ReadableStream e WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

Concetti principali

Prima di entrare nel dettaglio dei vari tipi di stream, vorrei presentarti alcuni concetti fondamentali.

Bocconcini

Un chunk è un singolo dato scritto o letto da uno stream. Può essere di qualsiasi tipo; gli stream possono contenere anche blocchi di tipi diversi. La maggior parte delle volte, un segmento non sarà l'unità di dati più atomica per un determinato stream. Ad esempio, un flusso di byte potrebbe contenere blocchi costituiti da 16 KiB Uint8Array, anziché singoli byte.

Stream leggibili

Uno stream leggibile rappresenta un'origine dati da cui puoi leggere. In altre parole, i dati vengono estratti da uno stream leggibile. In concreto, un flusso leggibile è un'istanza della classe ReadableStream.

Stream scrivibili

Uno stream scrivibile rappresenta una destinazione in cui puoi scrivere i dati. In altre parole, i dati vengono inseriti in uno stream scrivibile. Nello specifico, un flusso scrivibile è un'istanza della classe WritableStream.

Trasformare gli stream

Un flusso di trasformazione è costituito da una coppia di flussi: un flusso scrivibile, noto come lato scrivibile, e un flusso leggibile, noto come lato leggibile. Una metafora del mondo reale potrebbe essere un interprete simultaneo che traduce da una lingua all'altra in tempo reale. In modo specifico per lo stream di trasformazione, la scrittura sul lato scrivibile comporta la disponibilità di nuovi dati per la lettura dal lato leggibile. In concreto, qualsiasi oggetto con una proprietà writable e una proprietà readable può fungere da flusso di trasformazione. Tuttavia, la classe TransformStream standard semplifica la creazione di una coppia di questo tipo correttamente entangled.

Catene per tubi

Gli stream vengono utilizzati principalmente per collegarli tra loro. Un flusso leggibile può essere inviato direttamente a un flusso scrivibile utilizzando il metodo pipeTo() del flusso leggibile oppure può essere inviato tramite uno o più flussi di trasformazione utilizzando il metodo pipeThrough() del flusso leggibile. Un insieme di stream collegati tra loro in questo modo viene chiamato pipe chain.

Contropressione

Una volta costruita una catena di pipe, questa propagherà i segnali relativi alla velocità con cui i chunk devono fluire al suo interno. Se un passaggio della catena non può ancora accettare i segmenti, propaga un segnale all'indietro attraverso la catena di pipe, finché alla fine alla sorgente originale viene comunicato di interrompere la produzione di segmenti così rapidamente. Questo processo di normalizzazione del flusso è chiamato contropressione.

Teeing

Uno stream leggibile può essere suddiviso (il nome deriva dalla forma della lettera maiuscola "T") utilizzando il metodo tee(). In questo modo, lo stream verrà bloccato, ovvero non sarà più utilizzabile direttamente. Tuttavia, verranno creati due nuovi stream, chiamati rami, che possono essere utilizzati in modo indipendente. Il teeing è importante anche perché gli stream non possono essere riavvolti o riavviati. Scopri di più in seguito.

Diagramma di una catena di pipe costituita da uno stream leggibile proveniente da una chiamata all'API Fetch, che viene poi inviato tramite pipe a uno stream di trasformazione il cui output viene suddiviso e inviato al browser per il primo stream leggibile risultante e alla cache del service worker per il secondo stream leggibile risultante.
Una catena di pipe.

Meccanismi di un flusso leggibile

Un flusso leggibile è un'origine dati rappresentata in JavaScript da un oggetto ReadableStream che deriva da un'origine sottostante. Il costruttore ReadableStream() crea e restituisce un oggetto stream leggibile dai gestori specificati. Esistono due tipi di origine sottostante:

  • Le origini push inviano costantemente dati quando vi accedi e spetta a te avviare, mettere in pausa o annullare l'accesso allo stream. Alcuni esempi includono stream video live, eventi inviati dal server o WebSocket.
  • Le origini pull richiedono di richiedere esplicitamente i dati una volta connessi. Alcuni esempi includono operazioni HTTP tramite chiamate fetch() o XMLHttpRequest.

I dati di streaming vengono letti in sequenza in piccole parti chiamate chunk. I chunk inseriti in un flusso vengono accodati. Ciò significa che sono in attesa in una coda pronti per essere letti. Una coda interna tiene traccia dei chunk che non sono ancora stati letti.

Una strategia di accodamento è un oggetto che determina in che modo un flusso deve segnalare la contropressione in base allo stato della sua coda interna. La strategia di accodamento assegna una dimensione a ogni blocco e confronta la dimensione totale di tutti i blocchi nella coda con un numero specificato, noto come livello di riempimento.

I blocchi all'interno dello stream vengono letti da un lettore. Questo lettore recupera i dati un blocco alla volta, consentendoti di eseguire qualsiasi tipo di operazione. Il lettore più l'altro codice di elaborazione associato viene chiamato consumatore.

Il costrutto successivo in questo contesto è chiamato controller. Ogni flusso leggibile ha un controller associato che, come suggerisce il nome, ti consente di controllare il flusso.

Un solo lettore può leggere un flusso alla volta; quando un lettore viene creato e inizia a leggere un flusso (ovvero diventa un lettore attivo), viene bloccato. Se vuoi che un altro lettore prenda il tuo posto nella lettura dello stream, in genere devi rilasciare il primo lettore prima di fare qualsiasi altra cosa (anche se puoi fare tee degli stream).

Creare uno stream leggibile

Crea uno stream leggibile chiamando il relativo costruttore ReadableStream(). Il costruttore ha un argomento facoltativo underlyingSource, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza dello stream creata.

underlyingSource

Può utilizzare i seguenti metodi facoltativi definiti dallo sviluppatore:

  • start(controller): viene chiamato immediatamente quando viene creato l'oggetto. Il metodo può accedere all'origine del flusso ed eseguire qualsiasi altra operazione necessaria per configurare la funzionalità di streaming. Se questa operazione deve essere eseguita in modo asincrono, il metodo può restituire una promessa per segnalare l'esito positivo o negativo. Il parametro controller passato a questo metodo è un ReadableStreamDefaultController.
  • pull(controller): può essere utilizzato per controllare lo stream man mano che vengono recuperati altri chunk. Viene chiamato ripetutamente finché la coda interna di segmenti dello stream non è piena, fino a quando la coda non raggiunge il livello massimo. Se il risultato della chiamata di pull() è una promessa, pull() non verrà richiamato finché la promessa non sarà mantenuta. Se la promessa viene rifiutata, lo stream genererà un errore.
  • cancel(reason): chiamato quando il consumatore dello stream annulla lo stream.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController supporta i seguenti metodi:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream() è queuingStrategy. È un oggetto che definisce facoltativamente una strategia di accodamento per lo stream, che accetta due parametri:

  • highWaterMark: un numero non negativo che indica il livello massimo del flusso che utilizza questa strategia di accodamento.
  • size(chunk): una funzione che calcola e restituisce la dimensione finita non negativa del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà ReadableStreamDefaultController.desiredSize appropriata. Determina anche quando viene chiamato il metodo pull() dell'origine sottostante.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

I metodi getReader() e read()

Per leggere da uno stream leggibile, è necessario un lettore, che sarà un ReadableStreamDefaultReader. Il metodo getReader() dell'interfaccia ReadableStream crea un lettore e blocca lo stream. Mentre lo stream è bloccato, non è possibile acquisire altri lettori finché questo non viene rilasciato.

Il metodo read() dell'interfaccia ReadableStreamDefaultReader restituisce una promessa che fornisce l'accesso al successivo chunk nella coda interna dello stream. Esegue o rifiuta con un risultato a seconda dello stato dello stream. Le diverse possibilità sono le seguenti:

  • Se è disponibile un blocco, la promessa verrà soddisfatta con un oggetto del modulo
    { value: chunk, done: false }.
  • Se lo stream viene chiuso, la promessa verrà soddisfatta con un oggetto del modulo
    { value: undefined, done: true }.
  • Se lo stream genera un errore, la promessa verrà rifiutata con l'errore pertinente.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Proprietà locked

Puoi verificare se un flusso leggibile è bloccato accedendo alla relativa proprietà ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Esempi di codice di stream leggibili

L'esempio di codice riportato di seguito mostra tutti i passaggi in azione. Per prima cosa, crea un ReadableStream che nell'argomento underlyingSource (ovvero la classe TimestampSource) definisce un metodo start(). Questo metodo indica al controller dello stream di enqueue() un timestamp ogni secondo per dieci secondi. Infine, indica al controller di close() lo stream. Consumi questo stream creando un lettore tramite il metodo getReader() e chiamando read() finché lo stream non è done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iterazione asincrona

Controllare a ogni iterazione del ciclo read() se lo stream è done potrebbe non essere l'API più conveniente. Fortunatamente, presto ci sarà un modo migliore per farlo: l'iterazione asincrona.

for await (const chunk of stream) {
  console.log(chunk);
}

Una soluzione alternativa per utilizzare l'iterazione asincrona oggi è implementare il comportamento con un polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Creare un flusso leggibile

Il metodo tee() dell'interfaccia ReadableStream prepara lo stream leggibile corrente, restituendo un array di due elementi contenente i due rami risultanti come nuove istanze ReadableStream. In questo modo due lettori possono leggere un flusso contemporaneamente. Ad esempio, potresti farlo in un service worker se vuoi recuperare una risposta dal server e trasmetterla in streaming al browser, ma anche alla cache del service worker. Poiché un corpo della risposta non può essere utilizzato più di una volta, sono necessarie due copie per farlo. Per annullare lo stream, devi annullare entrambi i rami risultanti. L'avvio di uno stream in genere lo blocca per tutta la durata, impedendo ad altri lettori di bloccarlo.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Stream di byte leggibili

Per i flussi che rappresentano byte, viene fornita una versione estesa del flusso leggibile per gestire i byte in modo efficiente, in particolare riducendo al minimo le copie. I flussi di byte consentono l'acquisizione di lettori BYOB (Bring Your Own Buffer). L'implementazione predefinita può fornire una gamma di output diversi, ad esempio stringhe o buffer di array nel caso di WebSocket, mentre i flussi di byte garantiscono l'output di byte. Inoltre, i lettori BYOB hanno vantaggi in termini di stabilità. Questo perché se un buffer si stacca, può garantire che non venga scritto due volte nello stesso buffer, evitando così le condizioni di competizione. I lettori BYOB possono ridurre il numero di volte in cui il browser deve eseguire la garbage collection, perché possono riutilizzare i buffer.

Creazione di uno stream di byte leggibile

Puoi creare un flusso di byte leggibile passando un parametro type aggiuntivo al costruttore ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

L'origine sottostante di un flusso di byte leggibile riceve un ReadableByteStreamController da manipolare. Il suo metodo ReadableByteStreamController.enqueue() accetta un argomento chunk il cui valore è un ArrayBufferView. La proprietà ReadableByteStreamController.byobRequest restituisce l'attuale pull request BYOB o null se non è presente. Infine, la proprietà ReadableByteStreamController.desiredSize restituisce le dimensioni desiderate per riempire la coda interna dello stream controllato.

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream() è queuingStrategy. È un oggetto che definisce facoltativamente una strategia di accodamento per lo stream, che accetta un parametro:

  • highWaterMark: un numero non negativo di byte che indica il livello massimo del flusso utilizzando questa strategia di accodamento. Viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà ReadableByteStreamController.desiredSize appropriata. Determina anche quando viene chiamato il metodo pull() dell'origine sottostante.

I metodi getReader() e read()

Puoi quindi accedere a un ReadableStreamBYOBReader impostando il parametro mode di conseguenza: ReadableStream.getReader({ mode: "byob" }). Ciò consente un controllo più preciso dell'allocazione del buffer per evitare copie. Per leggere dallo stream di byte, devi chiamare ReadableStreamBYOBReader.read(view), dove view è un ArrayBufferView.

Esempio di codice di flusso di byte leggibile

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

La seguente funzione restituisce flussi di byte leggibili che consentono una lettura efficiente senza copia di un array generato in modo casuale. Anziché utilizzare una dimensione del blocco predeterminata di 1024, tenta di riempire il buffer fornito dallo sviluppatore, consentendo il controllo completo.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Meccanismi di un flusso scrivibile

Un flusso scrivibile è una destinazione in cui puoi scrivere dati, rappresentata in JavaScript da un oggetto WritableStream. Questo funge da astrazione sopra un sink sottostante, un sink I/O di livello inferiore in cui vengono scritti i dati non elaborati.

I dati vengono scritti nello stream tramite un writer, un blocco alla volta. Un blocco può assumere molte forme, proprio come i blocchi in un lettore. Puoi utilizzare il codice che preferisci per produrre i blocchi pronti per la scrittura; lo scrittore più il codice associato sono chiamati produttore.

Quando viene creato un writer e inizia a scrivere in uno stream (un writer attivo), si dice che è bloccato. Solo un writer può scrivere in uno stream scrivibile alla volta. Se vuoi che un altro autore inizi a scrivere nel tuo stream, in genere devi rilasciarlo prima di allegare un altro autore.

Una coda interna tiene traccia dei blocchi scritti nel flusso ma non ancora elaborati dal sink sottostante.

Una strategia di accodamento è un oggetto che determina in che modo un flusso deve segnalare la contropressione in base allo stato della sua coda interna. La strategia di accodamento assegna una dimensione a ogni blocco e confronta la dimensione totale di tutti i blocchi nella coda con un numero specificato, noto come livello di riempimento.

L'ultimo costrutto è chiamato controller. Ogni stream scrivibile ha un controller associato che ti consente di controllarlo (ad esempio, di interromperlo).

Creare uno stream scrivibile

L'interfaccia WritableStream dell'API Streams fornisce un'astrazione standard per la scrittura di dati di streaming in una destinazione, nota come sink. Questo oggetto è dotato di contropressione e accodamento integrati. Crea uno stream scrivibile chiamando il relativo costruttore WritableStream(). Ha un parametro facoltativo underlyingSink, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza dello stream creata.

underlyingSink

underlyingSink può includere i seguenti metodi facoltativi definiti dallo sviluppatore. Il parametro controller passato ad alcuni metodi è un WritableStreamDefaultController.

  • start(controller): questo metodo viene chiamato immediatamente quando viene creato l'oggetto. I contenuti di questo metodo devono mirare a ottenere l'accesso al sink sottostante. Se questo processo deve essere eseguito in modo asincrono, può restituire una promessa per segnalare l'esito positivo o negativo.
  • write(chunk, controller): questo metodo verrà chiamato quando un nuovo blocco di dati (specificato nel parametro chunk) è pronto per essere scritto nel sink sottostante. Può restituire una promessa per segnalare l'esito positivo o negativo dell'operazione di scrittura. Questo metodo verrà chiamato solo dopo che le scritture precedenti sono andate a buon fine e mai dopo la chiusura o l'interruzione del flusso.
  • close(controller): Questo metodo verrà chiamato se l'app segnala di aver terminato la scrittura dei chunk nel flusso. I contenuti devono fare tutto il necessario per finalizzare le scritture nel sink sottostante e rilasciarne l'accesso. Se questo processo è asincrono, può restituire una promessa per segnalare l'esito positivo o negativo. Questo metodo verrà chiamato solo dopo che tutte le scritture in coda sono state eseguite correttamente.
  • abort(reason): questo metodo verrà chiamato se l'app segnala che vuole chiudere bruscamente lo stream e metterlo in uno stato di errore. Può pulire le risorse in attesa, proprio come close(), ma abort() verrà chiamato anche se le scritture sono in coda. Questi blocchi verranno eliminati. Se questo processo è asincrono, può restituire una promessa per segnalare l'esito positivo o negativo. Il parametro reason contiene un DOMString che descrive il motivo per cui lo stream è stato interrotto.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

L'interfaccia WritableStreamDefaultController dell'API Streams rappresenta un controller che consente di controllare lo stato di un WritableStream durante la configurazione, man mano che vengono inviati altri chunk per la scrittura o al termine della scrittura. Quando viene creato un WritableStream, al sink sottostante viene fornita un'istanza WritableStreamDefaultController corrispondente da manipolare. WritableStreamDefaultController ha un solo metodo: WritableStreamDefaultController.error(), che causa errori in qualsiasi interazione futura con lo stream associato. WritableStreamDefaultController supporta anche una proprietà signal che restituisce un'istanza di AbortSignal, consentendo l'interruzione di un'operazione WritableStream, se necessario.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore WritableStream() è queuingStrategy. È un oggetto che definisce facoltativamente una strategia di accodamento per lo stream, che accetta due parametri:

  • highWaterMark: un numero non negativo che indica il livello massimo del flusso che utilizza questa strategia di accodamento.
  • size(chunk): una funzione che calcola e restituisce la dimensione finita non negativa del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà WritableStreamDefaultWriter.desiredSize appropriata.

I metodi getWriter() e write()

Per scrivere in uno stream scrivibile, devi disporre di un writer, che sarà un WritableStreamDefaultWriter. Il metodo getWriter() dell'interfaccia WritableStream restituisce una nuova istanza di WritableStreamDefaultWriter e blocca lo stream su questa istanza. Mentre lo stream è bloccato, non è possibile acquisire un altro autore finché quello attuale non viene rilasciato.

Il metodo write() dell'interfaccia WritableStreamDefaultWriter scrive un blocco di dati passato in un WritableStream e nel relativo sink sottostante, quindi restituisce una promessa che si risolve per indicare l'esito positivo o negativo dell'operazione di scrittura. Tieni presente che il significato di "successo" dipende dal sink sottostante. Potrebbe indicare che il blocco è stato accettato e non necessariamente che è stato salvato in modo sicuro nella sua destinazione finale.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Proprietà locked

Puoi verificare se un flusso scrivibile è bloccato accedendo alla relativa proprietà WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Esempio di codice di stream scrivibile

Il codice di esempio riportato di seguito mostra tutti i passaggi in azione.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Trasferimento di un flusso leggibile a un flusso scrivibile

Un flusso leggibile può essere inviato a un flusso scrivibile tramite il metodo pipeTo() del flusso leggibile. ReadableStream.pipeTo() invia la ReadableStream corrente a un determinato WritableStream e restituisce una promessa che viene soddisfatta al termine del processo di piping o rifiutata in caso di errori.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Creazione di un flusso di trasformazione

L'interfaccia TransformStream dell'API Streams rappresenta un insieme di dati trasformabili. Crei un flusso di trasformazione chiamando il relativo costruttore TransformStream(), che crea e restituisce un oggetto flusso di trasformazione dai gestori specificati. Il costruttore TransformStream() accetta come primo argomento un oggetto JavaScript facoltativo che rappresenta transformer. Questi oggetti possono contenere uno dei seguenti metodi:

transformer

  • start(controller): questo metodo viene chiamato immediatamente quando viene creato l'oggetto. In genere viene utilizzato per accodare i blocchi di prefisso, utilizzando controller.enqueue(). Questi blocchi verranno letti dal lato leggibile, ma non dipendono da alcuna scrittura sul lato scrivibile. Se questo processo iniziale è asincrono, ad esempio perché è necessario un certo impegno per acquisire i blocchi di prefissi, la funzione può restituire una promessa per segnalare l'esito positivo o negativo; una promessa rifiutata genererà un errore nel flusso. Eventuali eccezioni generate verranno generate nuovamente dal costruttore TransformStream().
  • transform(chunk, controller): questo metodo viene chiamato quando un nuovo blocco scritto originariamente sul lato scrivibile è pronto per essere trasformato. L'implementazione dello stream garantisce che questa funzione venga chiamata solo dopo che le trasformazioni precedenti sono andate a buon fine e mai prima che start() sia stata completata o dopo che è stata chiamata flush(). Questa funzione esegue il lavoro di trasformazione effettivo dello stream di trasformazione. Può mettere in coda i risultati utilizzando controller.enqueue(). Ciò consente a un singolo blocco scritto sul lato scrivibile di generare zero o più blocchi sul lato leggibile, a seconda di quante volte viene chiamato controller.enqueue(). Se il processo di trasformazione è asincrono, questa funzione può restituire una promessa per segnalare l'esito positivo o negativo della trasformazione. Una promessa rifiutata genererà un errore sia per la parte leggibile che per quella scrivibile dello stream di trasformazione. Se non viene fornito alcun metodo transform(), viene utilizzata la trasformazione dell'identità, che accoda i blocchi invariati dal lato scrivibile al lato leggibile.
  • flush(controller): questo metodo viene chiamato dopo che tutti i blocchi scritti sul lato scrivibile sono stati trasformati superando correttamente transform() e il lato scrivibile sta per essere chiuso. In genere viene utilizzato per accodare i blocchi di suffisso al lato leggibile, prima che anche questo venga chiuso. Se il processo di svuotamento è asincrono, la funzione può restituire una promessa per segnalare l'esito positivo o negativo; il risultato verrà comunicato al chiamante di stream.writable.write(). Inoltre, una promessa rifiutata genererà un errore sia nel lato leggibile che in quello scrivibile dello stream. Generare un'eccezione equivale a restituire una promessa rifiutata.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Le strategie di accodamento writableStrategy e readableStrategy

Il secondo e il terzo parametro facoltativo del costruttore TransformStream() sono le strategie di accodamento facoltative writableStrategy e readableStrategy. Sono definiti come descritto nelle sezioni stream leggibile e stream scrivibile rispettivamente.

Esempio di codice di trasformazione dello stream

Il seguente esempio di codice mostra un semplice stream di trasformazione in azione.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Invio di un flusso leggibile tramite un flusso di trasformazione

Il metodo pipeThrough() dell'interfaccia ReadableStream fornisce un modo concatenabile per inviare il flusso corrente tramite un flusso di trasformazione o qualsiasi altra coppia scrivibile/leggibile. Il piping di uno stream in genere lo blocca per la durata del pipe, impedendo ad altri lettori di bloccarlo.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Il seguente esempio di codice (un po' artificioso) mostra come implementare una versione "urlata" di fetch() che converte tutto il testo in maiuscolo consumando la promessa di risposta restituita come flusso e convertendo in maiuscolo blocco per blocco. Il vantaggio di questo approccio è che non devi attendere il download dell'intero documento, il che può fare una grande differenza quando si ha a che fare con file di grandi dimensioni.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

La demo seguente mostra i flussi leggibili, scrivibili e di trasformazione in azione. Include anche esempi di catene di pipe pipeThrough() e pipeTo() e mostra anche tee(). Se vuoi, puoi eseguire la demo in una finestra separata o visualizzare il codice sorgente.

Stream utili disponibili nel browser

Nel browser sono integrati diversi stream utili. Puoi creare facilmente un ReadableStream da un blob. Il metodo stream() dell'interfaccia Blob restituisce un ReadableStream che, una volta letto, restituisce i dati contenuti nel blob. Ricorda inoltre che un oggetto File è un tipo specifico di Blob e può essere utilizzato in qualsiasi contesto in cui può essere utilizzato un blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Le varianti di streaming di TextDecoder.decode() e TextEncoder.encode() sono chiamate TextDecoderStream e TextEncoderStream rispettivamente.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

La compressione o la decompressione di un file è semplice con i flussi di trasformazione CompressionStream e DecompressionStream rispettivamente. L'esempio di codice riportato di seguito mostra come scaricare la specifica Streams, comprimerla (gzip) direttamente nel browser e scrivere il file compresso direttamente sul disco.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

L'API File System Access, FileSystemWritableFileStream e gli stream di richieste sperimentali fetch() sono esempi di stream scrivibili in natura.

L'API Serial utilizza molto sia i flussi leggibili che quelli scrivibili.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Infine, l'API WebSocketStream integra gli stream con l'API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Risorse utili

Ringraziamenti

Questo articolo è stato rivisto da Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley e Adam Rice. I post del blog di Jake Archibald mi hanno aiutato molto a capire gli stream. Alcuni esempi di codice sono ispirati alle esplorazioni dell'utente GitHub @bellbind e parti del testo si basano in gran parte sulla documentazione MDN Web Docs su Streams. Gli autori dello standard Streams hanno fatto un ottimo lavoro nella stesura di questa specifica. Immagine hero di Ryan Lara su Unsplash.