Stream: la guida definitiva

Scopri come utilizzare stream leggibili, scrivibili e di trasformazione con l'API Streams.

L'API Streams consente di accedere in modo programmatico ai flussi di dati ricevuti tramite la rete o creati con qualsiasi mezzo localmente e di elaborarli con JavaScript. Lo streaming prevede la suddivisione di una risorsa che vuoi ricevere, inviare o trasformare in piccoli blocchi, per poi elaborarli bit per bit. Anche se lo streaming è un compito che i browser svolgono comunque quando ricevono asset come HTML o video da mostrare sulle pagine web, questa funzionalità non è mai stata disponibile per JavaScript prima dell'introduzione di fetch con gli stream nel 2015.

In precedenza, se volevi elaborare una risorsa di qualche tipo (un video, un file di testo e così via), dovevano scaricare l'intero file, attendere che venisse deserializzato in un formato adatto e poi elaborarlo. Poiché i flussi sono disponibili per JavaScript, tutto cambia. Ora puoi elaborare i dati non elaborati con JavaScript progressivamente, non appena sono disponibili sul client, senza dover generare un buffer, una stringa o un blob. Ciò sblocca una serie di casi d'uso, alcuni dei quali sono elencati di seguito:

  • Effetti video:trasmette uno stream video leggibile attraverso uno stream di trasformazione che applica effetti in tempo reale.
  • (De)compressione dei dati: inoltro di uno stream di file tramite uno stream di trasformazione che lo (de)comprime in modo selettivo.
  • Decodifica delle immagini: inoltro di uno stream di risposta HTTP tramite uno stream di trasformazione che decodifica i byte in dati bitmap e poi tramite un altro stream di trasformazione che traduce le bitmap in file PNG. Se lo installi all'interno dell'handler fetch di un service worker, puoi eseguire il polyfill in modo trasparente di nuovi formati di immagini come AVIF.

Supporto browser

ReadableStream e WritableStream

Supporto dei browser

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Origine

TransformStream

Supporto dei browser

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Origine

Concetti principali

Prima di entrare nei dettagli sui vari tipi di stream, vorrei presentarti alcuni concetti fondamentali.

Pezzi

Un blocco è un singolo dato che viene scritto o letto da un flusso. Può essere di qualsiasi tipo; i flussi possono anche contenere blocchi di tipi diversi. La maggior parte delle volte, un chunk non sarà l'unità di dati più atomica per un determinato stream. Ad esempio, uno stream di byte potrebbe contenere chunk costituiti da unità di 16 KiB Uint8Array anziché da singoli byte.

Stream leggibili

Uno stream leggibile rappresenta un'origine dati da cui puoi leggere. In altre parole, i dati vengono generati da uno stream leggibile. Concretamente, uno stream leggibile è un'istanza della classe ReadableStream.

Stream scrivibili

Un flusso scrivibile rappresenta una destinazione per i dati in cui è possibile scrivere. In altre parole, i dati vengono inseriti in un flusso scrivibile. Nello specifico, uno stream in scrittura è un'istanza della classeWritableStream.

Trasformare gli stream

Uno stream di trasformazione è costituito da una coppia di stream: uno stream in scrittura, noto come lato in scrittura, e uno stream in lettura, noto come lato in lettura. Una metafora reale per questo sarebbe un interprete simultaneo che traduce da una lingua all'altra in tempo reale. In modo specifico per lo stream di trasformazione, la scrittura sul lato in cui è possibile scrivere comporta la disponibilità di nuovi dati per la lettura dal lato in cui è possibile leggere. Concretamente, qualsiasi oggetto con una proprietà writable e una proprietà readable può fungere da flusso di trasformazione. Tuttavia, la classe TransformStream standard semplifica la creazione di una coppia di questo tipo che è correttamente inserita.

Catene per tubi

Gli stream vengono utilizzati principalmente collegandoli tra loro. Uno stream leggibile può essere incanalato direttamente in uno stream scrivibile utilizzando il metodo pipeTo() dello stream leggibile oppure può essere incanalato prima in uno o più stream di trasformazione utilizzando il metodo pipeTo() dello stream leggibile. Un insieme di stream concatenati in questo modo è chiamato catena di pipe.

Contropressione

Una volta creata una catena di pipe, vengono propagati gli indicatori relativi alla velocità con cui i chunk devono attraversarla. Se un passaggio della catena non è ancora in grado di accettare i chunk, viene propagato un segnale all'indietro tramite la catena di pipe, fino a quando all'origine originale non viene chiesto di interrompere la produzione di chunk così rapidamente. Questo processo di normalizzazione del flusso è chiamato contropressione.

Teeing

Uno stream leggibile può essere suddiviso (dal nome della forma di una "T" maiuscola) utilizzando il relativo metodo tee(). In questo modo lo stream verrà bloccato, ovvero non sarà più utilizzabile direttamente, ma verranno creati due nuovi stream, chiamati branch, che possono essere utilizzati indipendentemente. È importante anche perché gli stream non possono essere riavvolti o riavviati. Scopri di più in seguito.

Diagramma di una catena di pipe composta da uno stream leggibile proveniente da una chiamata all'API fetch che viene poi incanalato in uno stream di trasformazione il cui output viene suddiviso e inviato al browser per il primo stream leggibile risultante e alla cache del service worker per il secondo stream leggibile risultante.
Una catena di tubi.

La struttura di uno stream leggibile

Uno stream leggibile è un'origine dati rappresentata in JavaScript da un oggetto ReadableStream che proviene da un'origine sottostante. Il costruttore ReadableStream() crea e restituisce un oggetto stream leggibile dai gestori specificati. Esistono due tipi di origini sottostanti:

  • Le origini push inviano costantemente dati quando li hai aperti ed è tua responsabilità avviare, mettere in pausa o annullare l'accesso allo stream. Alcuni esempi sono stream video in diretta, eventi inviati dal server o WebSocket.
  • Le origini pull richiedono di richiedere esplicitamente i dati dopo la connessione. Alcuni esempi sono le operazioni HTTP tramite chiamate fetch() o XMLHttpRequest.

I dati in streaming vengono letti in sequenza in piccoli pezzi chiamati chunk. I chunk inseriti in uno stream vengono messi in coda. Ciò significa che sono in attesa in coda, pronti per essere letti. Una coda interna tiene traccia dei chunk che non sono ancora stati letti.

Una strategia di accodamento è un oggetto che determina in che modo un flusso deve segnalare la contropressione in base allo stato della sua coda interna. La strategia di accodamento assegna una dimensione a ciascun blocco e confronta la dimensione totale di tutti i blocchi in coda con un numero specificato, noto come punta massima.

I chunk all'interno dello stream vengono letti da un lettore. Questo lettore recupera i dati un chunk alla volta, consentendoti di eseguire qualsiasi tipo di operazione. Il lettore più l'altro codice di elaborazione associato viene chiamato consumer.

Il costrutto successivo in questo contesto è chiamato controller. A ogni stream leggibile è associato un controller che, come suggerisce il nome, ti consente di controllarlo.

Solo un lettore può leggere uno stream alla volta. Quando un lettore viene creato e inizia a leggere uno stream (ovvero diventa un lettore attivo), viene bloccato. Se vuoi che un altro lettore prenda il controllo della lettura dello stream, in genere devi rilasciare il primo lettore prima di fare qualsiasi altra cosa (anche se puoi dirigere gli stream).

Creazione di uno stream leggibile

Puoi creare uno stream leggibile chiamando il relativo costruttore ReadableStream(). Il costruttore ha un argomento facoltativo underlyingSource, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza di stream creata.

underlyingSource

In questo modo è possibile utilizzare i seguenti metodi facoltativi definiti dallo sviluppatore:

  • start(controller): viene chiamato immediatamente al momento della costruzione dell'oggetto. Il metodo può accedere all'origine dello stream ed eseguire qualsiasi altra operazione necessaria per configurare la funzionalità di streaming. Se questa procedura deve essere eseguita in modo asincrono, il metodo può restituire una promessa per segnalare il successo o l'errore. Il parametro controller passato a questo metodo è un ReadableStreamDefaultController.
  • pull(controller): può essere utilizzato per controllare lo stream man mano che vengono recuperati più chunk. Viene chiamata ripetutamente finché la coda interna di blocchi del flusso non è piena, fino a quando la coda non raggiunge il suo limite massimo. Se il risultato della chiamata a pull() è una promessa,pull() non verrà richiamato di nuovo finché la promessa non verrà soddisfatta. Se la promessa viene rifiutata, lo stream genera un errore.
  • cancel(reason): viene chiamato quando il consumatore dello stream annulla lo stream.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController supporta i seguenti metodi:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream() è queuingStrategy. Si tratta di un oggetto che, facoltativamente, definisce una strategia di coda per lo stream, che accetta due parametri:

  • highWaterMark: un numero non negativo che indica il livello massimo dello stream che utilizza questa strategia di coda.
  • size(chunk): una funzione che calcola e restituisce la dimensione finita non negativa del valore del chunk specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà ReadableStreamDefaultController.desiredSize appropriata. Regola anche quando viene chiamato il metodo pull() dell'origine sottostante.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

I metodi getReader() e read()

Per leggere da uno stream leggibile, è necessario un lettore, che sarà un ReadableStreamDefaultReader. Il metodo getReader() dell'interfaccia ReadableStream crea un lettore e blocca lo stream su di esso. Mentre lo stream è bloccato, nessun altro lettore può essere acquisito fino a quando questo non viene rilasciato.

Il metodo read() dell'interfaccia ReadableStreamDefaultReader restituisce una promessa che fornisce l'accesso al prossimo blocco nella coda interna dello stream. Soddisfa o rifiuta con un risultato a seconda dello stato del stream. Le diverse possibilità sono le seguenti:

  • Se è disponibile un chunk, la promessa verrà soddisfatta con un oggetto del tipo
    { value: chunk, done: false }.
  • Se lo stream viene chiuso, la promessa verrà soddisfatta con un oggetto del tipo
    { value: undefined, done: true }.
  • In caso di errori relativi allo stream, la promessa viene rifiutata e viene restituito l'errore pertinente.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

La proprietà locked

Puoi verificare se uno stream leggibile è bloccato accedendo alla relativa proprietà ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Esempi di codice per stream leggibili

L'esempio di codice riportato di seguito mostra tutti i passaggi in azione. Per prima cosa, crea un ReadableStream che nel suo underlyingSource argomento (ovvero la classe TimestampSource) definisce un metodo start(). Questo metodo indica a controller dello stream di enqueue() un timestamp ogni secondo per dieci secondi. Infine, dice al controller di close() lo stream. Per fruire di questo stream, crei un lettore con il metodo getReader() e chiami read() finché lo stream è done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iterazione asincrona

Controllare se lo stream è done in ogni iterazione del loop read() potrebbe non essere l'API più comoda. Fortunatamente, a breve sarà disponibile un modo migliore per farlo: l'iterazione asincrona.

for await (const chunk of stream) {
  console.log(chunk);
}

Una soluzione alternativa per utilizzare l'iterazione asincrona oggi è implementare il comportamento con un polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Creazione di uno stream leggibile

Il metodo tee() dell'interfaccia ReadableStream suddivide lo stream leggibile corrente, restituendo un array di due elementi contenente i due rami risultanti come nuove istanze ReadableStream. In questo modo, due lettori possono leggere uno stream contemporaneamente. Ad esempio, puoi farlo in un service worker se vuoi recuperare una risposta dal server e trasmetterla in streaming al browser, ma anche alla cache del service worker. Poiché il corpo di una risposta non può essere utilizzato più di una volta, per farlo sono necessarie due copie. Per annullare lo stream, devi annullare entrambi i rami risultanti. In genere, l'avvio di uno stream lo blocca per tutta la durata, impedendo ad altri lettori di bloccarlo.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Flussi di byte leggibili

Per gli stream che rappresentano byte, viene fornita una versione estesa dello stream leggibile per gestire i byte in modo efficiente, in particolare riducendo al minimo le copie. Gli stream di byte consentono di acquisire lettori bring-your-own-buffer (BYOB). L'implementazione predefinita può fornire una serie di output diversi, come stringhe o buffer di array nel caso di WebSocket, mentre gli stream di byte garantiscono l'output di byte. Inoltre, i lettori BYOB offrono vantaggi in termini di stabilità. Questo accade perché se un buffer si scollega, è possibile garantire che non venga scritto nello stesso buffer due volte, evitando così le condizioni di gara. I lettori BYOB possono ridurre il numero di volte in cui il browser deve eseguire la raccolta dei rifiuti, perché può riutilizzare i buffer.

Creazione di un flusso di byte leggibile

Puoi creare uno stream di byte leggibile passando un parametro type aggiuntivo al costruttore ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

All'origine sottostante di uno stream di byte leggibile viene assegnato un ReadableByteStreamController da manipolare. Il suo metodo ReadableByteStreamController.enqueue() prende un argomento chunk il cui valore è un ArrayBufferView. La proprietà ReadableByteStreamController.byobRequest restituisce la richiesta pull BYOB corrente o null se non ce ne sono. Infine, la proprietà ReadableByteStreamController.desiredSize restituisce la dimensione desiderata per riempire la coda interna dello stream controllato.

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream() è queuingStrategy. Si tratta di un oggetto che, facoltativamente, definisce una strategia di coda per lo stream, che accetta un parametro:

  • highWaterMark: un numero non negativo di byte che indica il livello elevato dello stream utilizzando questa strategia di accodamento. Viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà ReadableByteStreamController.desiredSize appropriata. Regola anche quando viene chiamato il metodo pull() dell'origine sottostante.

I metodi getReader() e read()

Puoi quindi accedere a un ReadableStreamBYOBReader impostando di conseguenza il parametro mode: ReadableStream.getReader({ mode: "byob" }). Ciò consente un controllo più preciso sull'allocazione del buffer per evitare le copie. Per leggere dallo stream di byte, devi chiamare ReadableStreamBYOBReader.read(view), dove view è un ArrayBufferView.

Esempio di codice di stream di byte leggibile

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

La seguente funzione restituisce flussi di byte leggibili che consentono una lettura zero-copy efficiente di un array generato in modo casuale. Anziché utilizzare una dimensione del chunk predeterminata di 1024, tenta di riempire il buffer fornito dallo sviluppatore, consentendo il controllo completo.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

La struttura di uno stream in cui è possibile scrivere

Uno stream in scrittura è una destinazione in cui puoi scrivere dati, rappresentata in JavaScript da un oggetto WritableStream. Questa funge da astrazione sopra un destinatario sottostante, un destinatario I/O di livello inferiore in cui vengono scritti i dati non elaborati.

I dati vengono scritti nello stream tramite un scrittore, un blocco alla volta. Un chunk può assumere una molteplicità di forme, proprio come i chunk in un reader. Puoi utilizzare il codice che preferisci per produrre i chunk pronti per la scrittura. Lo scriptwriter più il codice associato è chiamato producer.

Quando un autore viene creato e inizia a scrivere su uno stream (ossia un autore attivo), questo viene bloccato. Solo un autore può scrivere in uno stream scrivibile alla volta. Se vuoi che un altro autore inizi a scrivere nel tuo stream, in genere devi rilasciarlo prima di associarlo a un altro autore.

Una coda interna tiene traccia dei chunk che sono stati scritti nello stream, ma che non sono stati ancora elaborati dal sink sottostante.

Una strategia di coda è un oggetto che determina in che modo uno stream deve segnalare la contropressione in base allo stato della coda interna. La strategia di coda assegna una dimensione a ogni chunk e confronta la dimensione totale di tutti i chunk nella coda con un numero specificato, noto come soglia massima.

La struttura finale è chiamata controller. A ogni flusso scrivibile è associato un controller che ti consente di controllarlo (ad esempio, per interromperlo).

Creazione di uno stream in cui scrivere

L'interfaccia WritableStream dell'API Streams fornisce un'astrazione standard per la scrittura di flussi di dati in una destinazione, nota come sink. Questo oggetto è dotato di backpressure e code integrate. Puoi creare uno stream in cui scrivere chiamando il relativo costruttore WritableStream(). Ha un parametro facoltativo underlyingSink, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza dello stream creata.

underlyingSink

underlyingSink può includere i seguenti metodi facoltativi definiti dallo sviluppatore. Il parametro controller trasmesso ad alcuni dei metodi è un WritableStreamDefaultController.

  • start(controller): questo metodo viene chiamato immediatamente al momento della creazione dell'oggetto. I contenuti di questo metodo devono avere lo scopo di ottenere l'accesso al sink sottostante. Se questo processo deve essere eseguito in modo asincrono, può restituire una promessa per segnalare l'esito positivo o negativo.
  • write(chunk, controller): questo metodo viene chiamato quando un nuovo blocco di dati (specificato nel parametro chunk) è pronto per essere scritto nello scomparto sottostante. Può restituire una promessa per indicare il successo o l'errore dell'operazione di scrittura. Questo metodo verrà chiamato solo dopo che le scritture precedenti sono andate a buon fine e mai dopo la chiusura o l'interruzione dello stream.
  • close(controller): questo metodo viene chiamato se l'app indica che ha terminato di scrivere gli chunk nello stream. I contenuti devono fare tutto il necessario per finalizzare le scritture nel flusso di destinazione sottostante e rilasciarne l'accesso. Se questo processo è asincrono, può restituire una promessa per segnalare l'esito positivo o negativo. Questo metodo verrà chiamato solo dopo che tutte le scritture in coda sono andate a buon fine.
  • abort(reason): questo metodo viene chiamato se l'app segnala che vuole chiudere bruscamente lo stream e lo imposta in uno stato di errore. Può ripulire le risorse trattenute, in modo simile a close(), ma abort() verrà chiamato anche se le scritture sono in coda. Quei blocchi verranno scartati. Se questo processo è asincrono, può restituire una promessa per segnalare l'esito positivo o negativo. Il parametro reason contiene un DOMString che descrive il motivo dell'interruzione dello stream.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

L'interfaccia WritableStreamDefaultController dell'API Streams rappresenta un controller che consente di controllare lo stato di un WritableStream durante la configurazione, man mano che vengono inviati altri chunk per la scrittura o al termine della scrittura. Quando viene costruito un WritableStream, al sink sottostante viene assegnata un'istanza WritableStreamDefaultController corrispondente da manipolare. WritableStreamDefaultController ha un solo metodo: WritableStreamDefaultController.error(), che causa errori in qualsiasi interazione futura con lo stream associato. WritableStreamDefaultController supporta anche una proprietà signal che restituisce un'istanza di AbortSignal, consentendo di interrompere un'operazione WritableStream, se necessario.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore WritableStream() è queuingStrategy. Si tratta di un oggetto che, facoltativamente, definisce una strategia di coda per lo stream, che accetta due parametri:

  • highWaterMark: un numero non negativo che indica il livello massimo dello stream che utilizza questa strategia di coda.
  • size(chunk): una funzione che calcola e restituisce la dimensione non negativa finita del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà WritableStreamDefaultWriter.desiredSize appropriata.

I metodi getWriter() e write()

Per scrivere in uno stream scrivibile, hai bisogno di un writer, che sarà un WritableStreamDefaultWriter. Il metodo getWriter() dell'interfaccia WritableStream restituisce una nuova istanza di WritableStreamDefaultWriter e blocca lo stream su quell'istanza. Mentre lo stream è bloccato, non è possibile acquisire altri autori finché non viene rilasciato quello corrente.

Il metodo write() dell'interfaccia WritableStreamDefaultWriter scrive un blocco di dati passato in un WritableStream e nel relativo sink sottostante, quindi restituisce una promessa che si risolve per indicare il successo o il fallimento dell'operazione di scrittura. Tieni presente che il significato di "successo" dipende dal sink sottostante; potrebbe indicare che il chunk è stato accettato e non necessariamente che sia stato salvato in modo sicuro nella destinazione finale.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

La proprietà locked

Puoi verificare se uno stream scrivibile è bloccato accedendo alla relativa proprietà WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Esempio di codice per stream modificabili

L'esempio di codice riportato di seguito mostra tutti i passaggi in azione.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Inoltro di uno stream leggibile a uno stream in cui è possibile scrivere

Uno stream leggibile può essere incanalato in uno stream scrivibile tramite il metodo pipeTo() dello stream leggibile. ReadableStream.pipeTo() inoltra l'attuale ReadableStream a un determinato WritableStream e restituisce una promessa che viene soddisfatta al termine della procedura di inoltro o rifiutata se si sono verificati errori.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Creazione di un flusso di trasformazione

L'interfaccia TransformStream dell'API Streams rappresenta un insieme di dati trasformabili. Puoi creare un flusso di trasformazione chiamando il suo costruttore TransformStream(), che crea e restituisce un oggetto flusso di trasformazione dai gestori specificati. Il costruttore TransformStream() accetta come primo argomento un oggetto JavaScript facoltativo che rappresenta il transformer. Questi oggetti possono contenere uno dei seguenti metodi:

transformer

  • start(controller): questo metodo viene chiamato immediatamente al momento della creazione dell'oggetto. In genere, questo viene utilizzato per mettere in coda i chunk di prefisso utilizzando controller.enqueue(). Questi chunk verranno letti dal lato leggibile, ma non dipendono da eventuali scritture sul lato scrivibile. Se questo processo iniziale è asincrono, ad esempio perché richiede un po' di impegno per acquisire i chunk del prefisso, la funzione può restituire una promessa per segnalare il successo o il fallimento; una promessa rifiutata causerà un errore nello stream. Eventuali eccezioni lanciate verranno rilanciate dal costruttore TransformStream().
  • transform(chunk, controller): questo metodo viene chiamato quando un nuovo chunk scritto inizialmente sul lato scrivibile è pronto per essere trasformato. L'implementazione dello stream garantisce che questa funzione verrà chiamata solo dopo il completamento delle trasformazioni precedenti e mai prima del completamento di start() o dopo la chiamata di flush(). Questa funzione esegue il lavoro di trasformazione effettivo dello stream di trasformazione. Può mettere in coda i risultati utilizzando controller.enqueue(). In questo modo, un singolo chunk scritto sul lato in scrittura può generare zero o più chunk sul lato in lettura, a seconda del numero di volte in cui viene chiamato controller.enqueue(). Se la procedura di trasformazione è asincrona, questa funzione può restituire una promessa per segnalare il successo o l'errore della trasformazione. Una promessa rifiutata genera un errore sia per i lati leggibili che per quelli scrivibili dello stream di trasformazione. Se non viene fornito alcun metodo transform(), viene utilizzata la trasformazione di identità, che mette in coda i chunk invariati dal lato in cui è possibile scrivere al lato in cui è possibile leggere.
  • flush(controller): questo metodo viene chiamato dopo che tutti i chunk scritti sul lato scrivibile sono stati trasformati passando correttamente per transform() e il lato scrivibile sta per essere chiuso. In genere viene utilizzato per mettere in coda i chunk dei suffissi sul lato leggibile, prima che anche questo si chiuda. Se il processo di svuotamento è asincrono, la funzione può restituire una promessa per segnalare l'esito positivo o negativo. Il risultato verrà comunicato al chiamante di stream.writable.write(). Inoltre, una promessa rifiutata genera un errore sia sul lato leggibile sia su quello in scrittura dello stream. La generazione di un'eccezione viene considerata come la restituzione di una promessa rifiutata.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Strategie di accodamento writableStrategy e readableStrategy

Il secondo e il terzo parametro facoltativi del costruttore TransformStream() sono le strategie di accodamento writableStrategy e readableStrategy facoltative. Sono definiti rispettivamente nelle sezioni dei flussi di dati leggibili e in scrittura.

Esempio di codice per lo stream di trasformazione

Il seguente esempio di codice mostra un semplice stream di trasformazione in azione.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Inoltro di uno stream leggibile tramite uno stream di trasformazione

Il metodo pipeThrough() dell'interfaccia ReadableStream fornisce un modo incatenabile per incanalare lo stream corrente tramite uno stream di trasformazione o qualsiasi altra coppia di scrittura/lettura. La pipe di uno stream in genere blocca l'intera durata della pipeline, impedendo ad altri lettori di bloccarla.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Il successivo esempio di codice (un po' inventato) mostra come implementare una versione "shouting" di fetch() che racchiude tutto il testo in maiuscolo utilizzando la promessa di risposta restituita come flusso e utilizzando le maiuscole per blocco. Il vantaggio di questo approccio è che non devi attendere il download dell'intero documento, il che può fare un'enorme differenza quando si tratta di file di grandi dimensioni.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

La demo di seguito mostra gli stream leggibili, scrivibili e di trasformazione in azione. Include anche esempi di catene di tubi pipeThrough() e pipeTo() e mostra anche tee(). Se vuoi, puoi eseguire la demo in una finestra separata o visualizzare il codice sorgente.

Stream utili disponibili nel browser

Esistono diversi stream utili integrati direttamente nel browser. Puoi creare facilmente un ReadableStream da un blob. Il metodo stream() dell'interfaccia Blob restituisce un ReadableStream che, al momento della lettura, restituisce i dati contenuti nel blob. Inoltre, ricorda che un oggetto File è un tipo specifico di Blob e può essere utilizzato in qualsiasi contesto in cui può essere utilizzato un blob.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Le varianti di streaming di TextDecoder.decode() e TextEncoder.encode() si chiamano TextDecoderStream e TextEncoderStream rispettivamente.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Comprimere o decomprimere un file è facile con gli stream di trasformazione CompressionStream e DecompressionStream. L'esempio di codice riportato di seguito mostra come scaricare la specifica Streams, comprimerla (gzip) direttamente nel browser e scrivere il file compresso direttamente sul disco.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Gli FileSystemWritableFileStream dell'API File System Access e gli stream di richieste fetch() sperimentali sono esempi di stream scrivibili disponibili pubblicamente.

L'API Serial fa un uso intensivo di stream sia leggibili che scrivibili.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Infine, l'API WebSocketStream integra gli stream con l'API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Risorse utili

Ringraziamenti

Questo articolo è stato esaminato da Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley e Adam Rice. I post del blog di Jake Archibald mi hanno aiutato molto a comprendere gli stream. Alcuni esempi di codice sono ispirati alle esplorazioni dell'utente GitHub @bellbind e alcune parti del testo si basano molto sulle documentazioni web di MDN su Streams. Gli autori di Streams Standard hanno svolto un lavoro straordinario per scrivere questa specifica. Immagine hero di Ryan Lara su Unsplash.