Flux : guide définitif

Découvrez comment utiliser les flux lisibles, inscriptibles et de transformation avec l'API Streams.

L'API Streams vous permet d'accéder par programmation aux flux de données reçus sur le réseau ou créés localement par quelque moyen que ce soit, et de les traiter avec JavaScript. Le streaming consiste à décomposer une ressource que vous souhaitez recevoir, envoyer ou transformer en petits blocs, puis à traiter ces blocs bit par bit. Bien que les navigateurs diffusent déjà des éléments tels que le code HTML ou les vidéos à afficher sur les pages Web, cette fonctionnalité n'a jamais été disponible pour JavaScript avant l'introduction des flux fetch en 2015.

Auparavant, si vous souhaitiez traiter une ressource quelconque (vidéo, fichier texte, etc.), vous deviez télécharger l'intégralité du fichier, attendre qu'il soit désérialisé dans un format approprié, puis le traiter. Avec les flux disponibles pour JavaScript, tout change. Vous pouvez désormais traiter les données brutes avec JavaScript de manière progressive dès qu'elles sont disponibles sur le client, sans avoir à générer de tampon, de chaîne ni de blob. Cela ouvre la voie à de nombreux cas d'utilisation, dont certains sont listés ci-dessous :

  • Effets vidéo : en transmettant un flux vidéo lisible via un flux de transformation qui applique des effets en temps réel.
  • (Dé)compression des données : canalisation d'un flux de fichiers via un flux de transformation qui le (dé)compresse de manière sélective.
  • Décodage d'images : canalisation d'un flux de réponse HTTP via un flux de transformation qui décode les octets en données bitmap, puis via un autre flux de transformation qui traduit les bitmaps en PNG. Si elle est installée dans le gestionnaire fetch d'un service worker, cela vous permet de polyfiller de manière transparente de nouveaux formats d'image comme AVIF.

Prise en charge des navigateurs

ReadableStream et WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

Concepts fondamentaux

Avant de vous présenter en détail les différents types de flux, laissez-moi vous expliquer quelques concepts de base.

Morceaux

Un bloc est une donnée unique écrite dans un flux ou lue à partir de celui-ci. Il peut être de n'importe quel type. Les flux peuvent même contenir des blocs de différents types. La plupart du temps, un bloc ne sera pas l'unité de données la plus atomique pour un flux donné. Par exemple, un flux d'octets peut contenir des blocs de 16 Kio Uint8Array, au lieu d'octets individuels.

Flux lisibles

Un flux lisible représente une source de données que vous pouvez lire. En d'autres termes, les données proviennent d'un flux lisible. Concrètement, un flux lisible est une instance de la classe ReadableStream.

Flux accessibles en écriture

Un flux accessible en écriture représente une destination pour les données dans laquelle vous pouvez écrire. En d'autres termes, les données sont insérées dans un flux accessible en écriture. Concrètement, un flux accessible en écriture est une instance de la classe WritableStream.

Transformer des flux

Un flux de transformation se compose d'une paire de flux : un flux accessible en écriture, appelé côté accessible en écriture, et un flux accessible en lecture, appelé côté accessible en lecture. Une métaphore concrète serait celle d'un interprète simultané qui traduit d'une langue à une autre à la volée. D'une manière spécifique au flux de transformation, l'écriture sur le côté accessible en écriture entraîne la mise à disposition de nouvelles données pour la lecture à partir du côté accessible en lecture. Concrètement, tout objet comportant une propriété writable et une propriété readable peut servir de flux de transformation. Cependant, la classe TransformStream standard facilite la création d'une telle paire correctement intriquée.

Chaînes de tuyaux

Les flux sont principalement utilisés en les redirigeant les uns vers les autres. Un flux lisible peut être redirigé directement vers un flux inscriptible à l'aide de la méthode pipeTo() du flux lisible, ou il peut être redirigé d'abord vers un ou plusieurs flux de transformation à l'aide de la méthode pipeThrough() du flux lisible. Un ensemble de flux regroupés de cette manière est appelé "chaîne de tuyaux".

Contre-pression

Une fois une chaîne de canalisations construite, elle propage des signaux concernant la vitesse à laquelle les blocs doivent y circuler. Si une étape de la chaîne ne peut pas encore accepter de blocs, elle propage un signal en arrière dans la chaîne de canalisations, jusqu'à ce que la source d'origine soit invitée à arrêter de produire des blocs aussi rapidement. Ce processus de flux de normalisation est appelé "rétropression".

Teeing

Un flux lisible peut être dupliqué (d'après la forme d'un "T" majuscule) à l'aide de sa méthode tee(). Cela verrouillera le flux, c'est-à-dire qu'il ne sera plus directement utilisable. Toutefois, cela créera deux nouveaux flux, appelés branches, qui pourront être utilisés indépendamment. La mise en file d'attente est également importante, car les flux ne peuvent pas être rembobinés ni redémarrés. Nous y reviendrons plus tard.

Diagramme d'une chaîne de canalisations composée d'un flux lisible provenant d'un appel à l'API Fetch, qui est ensuite canalisé via un flux de transformation dont la sortie est divisée et envoyée au navigateur pour le premier flux lisible résultant et au cache du service worker pour le deuxième flux lisible résultant.
Chaîne de pipelines.

Mécanismes d'un flux lisible

Un flux lisible est une source de données représentée en JavaScript par un objet ReadableStream qui provient d'une source sous-jacente. Le constructeur ReadableStream() crée et renvoie un objet de flux lisible à partir des gestionnaires fournis. Il existe deux types de sources sous-jacentes :

  • Les sources push vous envoient constamment des données lorsque vous y avez accédé. C'est à vous de démarrer, de mettre en pause ou d'annuler l'accès au flux. Par exemple, les flux vidéo en direct, les événements envoyés par le serveur ou les WebSockets.
  • Les sources d'extraction nécessitent que vous demandiez explicitement des données une fois que vous y êtes connecté. Par exemple, les opérations HTTP via des appels fetch() ou XMLHttpRequest.

Les données de flux sont lues de manière séquentielle par petits éléments appelés blocs. Les blocs placés dans un flux sont dits en file d'attente. Cela signifie qu'ils sont en attente dans une file d'attente et prêts à être lus. Une file d'attente interne permet de suivre les blocs qui n'ont pas encore été lus.

Une stratégie de mise en file d'attente est un objet qui détermine comment un flux doit signaler la contre-pression en fonction de l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque bloc et compare la taille totale de tous les blocs de la file d'attente à un nombre spécifié, appelé seuil haut.

Les blocs du flux sont lus par un lecteur. Ce lecteur récupère les données par blocs, ce qui vous permet d'effectuer n'importe quel type d'opération sur celles-ci. Le lecteur et le code de traitement qui l'accompagne sont appelés consommateur.

La prochaine construction dans ce contexte est appelée contrôleur. Chaque flux lisible est associé à un contrôleur qui, comme son nom l'indique, vous permet de contrôler le flux.

Un seul lecteur peut lire un flux à la fois. Lorsqu'un lecteur est créé et commence à lire un flux (c'est-à-dire qu'il devient un lecteur actif), il est verrouillé. Si vous souhaitez qu'un autre lecteur prenne le relais pour lire votre flux, vous devez généralement libérer le premier lecteur avant toute autre action (bien que vous puissiez dupliquer les flux).

Créer un flux lisible

Pour créer un flux lisible, vous devez appeler son constructeur ReadableStream(). Le constructeur comporte un argument facultatif underlyingSource, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux construite.

Le underlyingSource

Il peut utiliser les méthodes facultatives suivantes, définies par le développeur :

  • start(controller) : appelé immédiatement lors de la construction de l'objet. La méthode peut accéder à la source du flux et effectuer toute autre opération nécessaire à la configuration de la fonctionnalité de flux. Si ce processus doit être effectué de manière asynchrone, la méthode peut renvoyer une promesse pour signaler la réussite ou l'échec. Le paramètre controller transmis à cette méthode est un ReadableStreamDefaultController.
  • pull(controller) : peut être utilisé pour contrôler le flux à mesure que d'autres blocs sont récupérés. Elle est appelée à plusieurs reprises tant que la file d'attente interne des blocs du flux n'est pas pleine, jusqu'à ce que la file d'attente atteigne son seuil haut. Si l'appel de pull() renvoie une promesse, pull() ne sera pas rappelé tant que cette promesse ne sera pas tenue. Si la promesse est rejetée, le flux génère une erreur.
  • cancel(reason) : appelé lorsque le consommateur de flux annule le flux.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController accepte les méthodes suivantes :

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

Le queuingStrategy

Le deuxième argument, également facultatif, du constructeur ReadableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend deux paramètres :

  • highWaterMark : nombre non négatif indiquant le seuil haut du flux utilisant cette stratégie de mise en file d'attente.
  • size(chunk) : fonction qui calcule et renvoie la taille finie et non négative de la valeur de bloc donnée. Le résultat est utilisé pour déterminer la contre-pression, qui se manifeste par la propriété ReadableStreamDefaultController.desiredSize appropriée. Il régit également le moment où la méthode pull() de la source sous-jacente est appelée.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Les méthodes getReader() et read()

Pour lire un flux lisible, vous avez besoin d'un lecteur, qui sera un ReadableStreamDefaultReader. La méthode getReader() de l'interface ReadableStream crée un lecteur et verrouille le flux sur celui-ci. Tant que le flux est verrouillé, aucun autre lecteur ne peut être acquis tant que celui-ci n'est pas libéré.

La méthode read() de l'interface ReadableStreamDefaultReader renvoie une promesse donnant accès au prochain bloc de la file d'attente interne du flux. Il est exécuté ou rejeté avec un résultat en fonction de l'état du flux. Voici les différentes possibilités :

  • Si un bloc est disponible, la promesse sera tenue avec un objet de la forme
    { value: chunk, done: false }.
  • Si le flux est fermé, la promesse sera tenue avec un objet de la forme
    { value: undefined, done: true }.
  • Si le flux génère une erreur, la promesse sera rejetée avec l'erreur correspondante.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Propriété locked

Vous pouvez vérifier si un flux lisible est verrouillé en accédant à sa propriété ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Exemples de code de flux lisibles

L'exemple de code ci-dessous montre toutes les étapes en action. Vous créez d'abord un ReadableStream qui, dans son argument underlyingSource (c'est-à-dire la classe TimestampSource), définit une méthode start(). Cette méthode indique au controller du flux de enqueue() un code temporel toutes les secondes pendant dix secondes. Enfin, il indique au contrôleur de close() le flux. Vous consommez ce flux en créant un lecteur via la méthode getReader() et en appelant read() jusqu'à ce que le flux soit done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Itération asynchrone

Vérifier à chaque itération de boucle read() si le flux est done n'est peut-être pas l'API la plus pratique. Heureusement, il existera bientôt une meilleure façon de procéder : l'itération asynchrone.

for await (const chunk of stream) {
  console.log(chunk);
}

Pour utiliser l'itération asynchrone aujourd'hui, vous pouvez implémenter le comportement avec un polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Redirection d'un flux lisible

La méthode tee() de l'interface ReadableStream met en file d'attente le flux lisible actuel, renvoyant un tableau à deux éléments contenant les deux branches résultantes en tant que nouvelles instances ReadableStream. Cela permet à deux lecteurs de lire un flux simultanément. Par exemple, vous pouvez le faire dans un service worker si vous souhaitez récupérer une réponse du serveur et la diffuser en continu vers le navigateur, mais aussi vers le cache du service worker. Étant donné qu'un corps de réponse ne peut pas être consommé plusieurs fois, vous avez besoin de deux copies pour ce faire. Pour annuler le flux, vous devez ensuite annuler les deux branches résultantes. En général, le fait de mettre un flux en file d'attente le verrouille pour toute sa durée, ce qui empêche les autres lecteurs de le verrouiller.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Flux d'octets lisibles

Pour les flux représentant des octets, une version étendue du flux lisible est fournie pour gérer efficacement les octets, en particulier en minimisant les copies. Les flux d'octets permettent d'acquérir des lecteurs BYOB (Bring Your Own Buffer). L'implémentation par défaut peut générer différentes sorties, telles que des chaînes ou des tampons de tableau dans le cas des WebSockets, tandis que les flux d'octets garantissent une sortie d'octets. De plus, les lecteurs BYOB offrent des avantages en termes de stabilité. En effet, si un tampon se détache, il peut garantir qu'il n'y a pas d'écriture dans le même tampon deux fois, ce qui évite les conditions de concurrence. Les lecteurs BYOB peuvent réduire le nombre de fois où le navigateur doit exécuter la récupération de mémoire, car ils peuvent réutiliser les tampons.

Créer un flux d'octets lisible

Vous pouvez créer un flux d'octets lisible en transmettant un paramètre type supplémentaire au constructeur ReadableStream().

new ReadableStream({ type: 'bytes' });

Le underlyingSource

La source sous-jacente d'un flux d'octets lisible reçoit un ReadableByteStreamController à manipuler. Sa méthode ReadableByteStreamController.enqueue() accepte un argument chunk dont la valeur est un ArrayBufferView. La propriété ReadableByteStreamController.byobRequest renvoie la demande d'extraction BYOB actuelle ou la valeur null si aucune n'est disponible. Enfin, la propriété ReadableByteStreamController.desiredSize renvoie la taille souhaitée pour remplir la file d'attente interne du flux contrôlé.

Le queuingStrategy

Le deuxième argument, également facultatif, du constructeur ReadableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend un paramètre :

  • highWaterMark : nombre non négatif d'octets indiquant le seuil maximal du flux utilisant cette stratégie de mise en file d'attente. Cela permet de déterminer la contre-pression, qui se manifeste par le biais de la propriété ReadableByteStreamController.desiredSize appropriée. Il régit également le moment où la méthode pull() de la source sous-jacente est appelée.

Les méthodes getReader() et read()

Vous pouvez ensuite accéder à un ReadableStreamBYOBReader en définissant le paramètre mode en conséquence : ReadableStream.getReader({ mode: "byob" }). Cela permet de contrôler plus précisément l'allocation des tampons afin d'éviter les copies. Pour lire à partir du flux d'octets, vous devez appeler ReadableStreamBYOBReader.read(view), où view est un ArrayBufferView.

Exemple de code de flux d'octets lisible

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

La fonction suivante renvoie des flux d'octets lisibles qui permettent une lecture efficace sans copie d'un tableau généré de manière aléatoire. Au lieu d'utiliser une taille de bloc prédéterminée de 1 024, il tente de remplir le tampon fourni par le développeur, ce qui permet un contrôle total.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mécanismes d'un flux accessible en écriture

Un flux inscriptible est une destination dans laquelle vous pouvez écrire des données, représentée en JavaScript par un objet WritableStream. Il sert d'abstraction au-dessus d'un récepteur sous-jacent, qui est un récepteur d'E/S de niveau inférieur dans lequel les données brutes sont écrites.

Les données sont écrites dans le flux via un writer, un bloc à la fois. Un bloc peut prendre de nombreuses formes, tout comme les blocs d'un lecteur. Vous pouvez utiliser le code de votre choix pour générer les blocs prêts à être écrits. L'écrivain et le code associé sont appelés producteur.

Lorsqu'un writer est créé et commence à écrire dans un flux (un writer actif), il est dit verrouillé. Un seul rédacteur peut écrire dans un flux accessible en écriture à la fois. Si vous souhaitez qu'un autre rédacteur commence à écrire dans votre flux, vous devez généralement le libérer avant d'y associer un autre rédacteur.

Une file d'attente interne permet de suivre les blocs qui ont été écrits dans le flux, mais qui n'ont pas encore été traités par le récepteur sous-jacent.

Une stratégie de mise en file d'attente est un objet qui détermine comment un flux doit signaler la contre-pression en fonction de l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque bloc et compare la taille totale de tous les blocs de la file d'attente à un nombre spécifié, appelé seuil haut.

La construction finale est appelée contrôleur. Chaque flux inscriptible est associé à un contrôleur qui vous permet de contrôler le flux (par exemple, pour l'annuler).

Créer un flux accessible en écriture

L'interface WritableStream de l'API Streams fournit une abstraction standard pour écrire des données de flux vers une destination, appelée récepteur. Cet objet est fourni avec une gestion de la contre-pression et une mise en file d'attente intégrées. Pour créer un flux accessible en écriture, vous devez appeler son constructeur WritableStream(). Il comporte un paramètre underlyingSink facultatif, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux construite.

Le underlyingSink

underlyingSink peut inclure les méthodes facultatives définies par le développeur suivantes. Le paramètre controller transmis à certaines méthodes est un WritableStreamDefaultController.

  • start(controller) : cette méthode est appelée immédiatement lors de la construction de l'objet. Le contenu de cette méthode doit viser à accéder au récepteur sous-jacent. Si ce processus doit être effectué de manière asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec.
  • write(chunk, controller) : cette méthode est appelée lorsqu'un nouveau bloc de données (spécifié dans le paramètre chunk) est prêt à être écrit dans le récepteur sous-jacent. Il peut renvoyer une promesse pour signaler la réussite ou l'échec de l'opération d'écriture. Cette méthode ne sera appelée qu'une fois les écritures précédentes effectuées, et jamais après la fermeture ou l'abandon du flux.
  • close(controller) : cette méthode est appelée si l'application signale qu'elle a terminé d'écrire des blocs dans le flux. Le contenu doit faire tout ce qui est nécessaire pour finaliser les écritures dans le récepteur sous-jacent et libérer l'accès à celui-ci. Si ce processus est asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec. Cette méthode ne sera appelée qu'une fois que toutes les écritures mises en file d'attente auront réussi.
  • abort(reason) : cette méthode est appelée si l'application indique qu'elle souhaite fermer brusquement le flux et le mettre dans un état d'erreur. Il peut nettoyer toutes les ressources retenues, comme close(), mais abort() sera appelé même si les écritures sont mises en file d'attente. Ces blocs seront supprimés. Si ce processus est asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec. Le paramètre reason contient un DOMString décrivant pourquoi le flux a été abandonné.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

L'interface WritableStreamDefaultController de l'API Streams représente un contrôleur permettant de contrôler l'état d'un WritableStream lors de la configuration, à mesure que des blocs sont envoyés pour l'écriture ou à la fin de l'écriture. Lors de la construction d'un WritableStream, le récepteur sous-jacent reçoit une instance WritableStreamDefaultController correspondante à manipuler. WritableStreamDefaultController ne comporte qu'une seule méthode : WritableStreamDefaultController.error(), qui provoque une erreur pour toute interaction future avec le flux associé. WritableStreamDefaultController accepte également une propriété signal qui renvoie une instance de AbortSignal, ce qui permet d'arrêter une opération WritableStream si nécessaire.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

Le queuingStrategy

Le deuxième argument, également facultatif, du constructeur WritableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend deux paramètres :

  • highWaterMark : nombre non négatif indiquant le seuil haut du flux utilisant cette stratégie de mise en file d'attente.
  • size(chunk) : fonction qui calcule et renvoie la taille finie non négative de la valeur de bloc donnée. Le résultat est utilisé pour déterminer la contre-pression, qui se manifeste par la propriété WritableStreamDefaultWriter.desiredSize appropriée.

Les méthodes getWriter() et write()

Pour écrire dans un flux accessible en écriture, vous avez besoin d'un writer, qui sera un WritableStreamDefaultWriter. La méthode getWriter() de l'interface WritableStream renvoie une nouvelle instance de WritableStreamDefaultWriter et verrouille le flux sur cette instance. Tant que le flux est verrouillé, aucun autre rédacteur ne peut être acquis tant que celui en cours n'est pas libéré.

La méthode write() de l'interface WritableStreamDefaultWriter écrit un bloc de données transmis dans un WritableStream et son récepteur sous-jacent, puis renvoie une promesse qui se résout pour indiquer la réussite ou l'échec de l'opération d'écriture. Notez que la signification du terme "réussite" dépend du récepteur sous-jacent. Il peut indiquer que le bloc a été accepté, et pas nécessairement qu'il a été enregistré de manière sécurisée dans sa destination finale.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Propriété locked

Vous pouvez vérifier si un flux accessible en écriture est verrouillé en accédant à sa propriété WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Exemple de code de flux inscriptible

L'exemple de code ci-dessous montre toutes les étapes en action.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Transférer un flux lisible vers un flux inscriptible

Un flux lisible peut être redirigé vers un flux accessible en écriture via la méthode pipeTo() du flux lisible. ReadableStream.pipeTo() transmet le ReadableStream actuel à un WritableStream donné et renvoie une promesse qui est tenue lorsque le processus de transmission se termine correctement, ou qui est refusée si des erreurs ont été rencontrées.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Créer un flux de transformation

L'interface TransformStream de l'API Streams représente un ensemble de données transformables. Vous créez un flux de transformation en appelant son constructeur TransformStream(), qui crée et renvoie un objet de flux de transformation à partir des gestionnaires fournis. Le constructeur TransformStream() accepte comme premier argument un objet JavaScript facultatif représentant le transformer. Ces objets peuvent contenir l'une des méthodes suivantes :

Le transformer

  • start(controller) : cette méthode est appelée immédiatement lors de la construction de l'objet. Il est généralement utilisé pour mettre en file d'attente des blocs de préfixe à l'aide de controller.enqueue(). Ces blocs seront lus du côté lisible, mais ne dépendent d'aucune écriture du côté accessible en écriture. Si ce processus initial est asynchrone, par exemple parce qu'il faut un certain effort pour acquérir les blocs de préfixes, la fonction peut renvoyer une promesse pour signaler le succès ou l'échec. Une promesse refusée entraînera une erreur dans le flux. Toutes les exceptions générées seront régénérées par le constructeur TransformStream().
  • transform(chunk, controller) : cette méthode est appelée lorsqu'un nouveau bloc initialement écrit sur le côté accessible en écriture est prêt à être transformé. L'implémentation du flux garantit que cette fonction ne sera appelée qu'une fois les transformations précédentes réussies, et jamais avant la fin de start() ni après l'appel de flush(). Cette fonction effectue le travail de transformation réel du flux de transformation. Il peut mettre en file d'attente les résultats à l'aide de controller.enqueue(). Cela permet à un seul bloc écrit sur le côté accessible en écriture de générer zéro ou plusieurs blocs sur le côté accessible en lecture, selon le nombre de fois où controller.enqueue() est appelé. Si le processus de transformation est asynchrone, cette fonction peut renvoyer une promesse pour signaler la réussite ou l'échec de la transformation. Une promesse refusée générera une erreur sur les côtés lisibles et accessibles en écriture du flux de transformation. Si aucune méthode transform() n'est fournie, la transformation d'identité est utilisée, ce qui met en file d'attente les blocs inchangés du côté accessible en écriture vers le côté accessible en lecture.
  • flush(controller) : cette méthode est appelée une fois que tous les blocs écrits sur le côté accessible en écriture ont été transformés en passant avec succès par transform() et que le côté accessible en écriture est sur le point d'être fermé. Cette méthode est généralement utilisée pour mettre en file d'attente des blocs de suffixe du côté lisible, avant que celui-ci ne soit également fermé. Si le processus de vidage est asynchrone, la fonction peut renvoyer une promesse pour signaler la réussite ou l'échec. Le résultat sera communiqué à l'appelant de stream.writable.write(). De plus, une promesse refusée générera une erreur pour les côtés lisibles et accessibles en écriture du flux. Le déclenchement d'une exception est traité de la même manière que le renvoi d'une promesse refusée.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Stratégies de mise en file d'attente writableStrategy et readableStrategy

Les deuxième et troisième paramètres facultatifs du constructeur TransformStream() sont les stratégies de mise en file d'attente facultatives writableStrategy et readableStrategy. Elles sont définies comme indiqué dans les sections sur les flux lisibles et inscriptibles, respectivement.

Exemple de code de flux de transformation

L'exemple de code suivant montre un simple flux de transformation en action.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Transférer un flux lisible via un flux de transformation

La méthode pipeThrough() de l'interface ReadableStream permet de canaliser le flux actuel de manière chaînable via un flux de transformation ou toute autre paire lisible/inscriptible. Le transfert d'un flux le verrouille généralement pendant toute la durée du transfert, ce qui empêche d'autres lecteurs de le verrouiller.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

L'exemple de code suivant (un peu artificiel) montre comment implémenter une version "criée" de fetch() qui met en majuscules tout le texte en consommant la promesse de réponse renvoyée en tant que flux et en mettant en majuscules bloc par bloc. L'avantage de cette approche est que vous n'avez pas besoin d'attendre le téléchargement complet du document, ce qui peut faire une énorme différence lorsque vous traitez des fichiers volumineux.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Démo

La démonstration ci-dessous montre les flux lisibles, inscriptibles et de transformation en action. Il inclut également des exemples de chaînes de pipes pipeThrough() et pipeTo(), et illustre tee(). Vous pouvez éventuellement exécuter la démonstration dans sa propre fenêtre ou afficher le code source.

Flux utiles disponibles dans le navigateur

Un certain nombre de flux utiles sont intégrés au navigateur. Vous pouvez facilement créer un ReadableStream à partir d'un blob. La méthode stream() de l'interface Blob renvoie un ReadableStream qui, une fois lu, renvoie les données contenues dans le blob. Rappelez-vous également qu'un objet File est un type spécifique de Blob et peut être utilisé dans n'importe quel contexte où un blob peut l'être.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Les variantes de flux de TextDecoder.decode() et TextEncoder.encode() sont appelées respectivement TextDecoderStream et TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

La compression ou la décompression d'un fichier est facile avec les flux de transformation CompressionStream et DecompressionStream, respectivement. L'exemple de code ci-dessous montre comment télécharger la spécification Streams, la compresser (gzip) directement dans le navigateur et écrire le fichier compressé directement sur le disque.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Les FileSystemWritableFileStream de l'API File System Access et les flux de requêtes fetch() expérimentaux sont des exemples de flux inscriptibles.

L'API Serial utilise intensivement les flux lisibles et inscriptibles.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Enfin, l'API WebSocketStream intègre les flux à l'API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Ressources utiles

Remerciements

Cet article a été examiné par Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley et Adam Rice. Les articles de blog de Jake Archibald m'ont beaucoup aidé à comprendre les flux. Certains exemples de code s'inspirent des explorations de l'utilisateur GitHub @bellbind, et certaines parties du texte s'appuient fortement sur la documentation MDN Web Docs sur les flux. Les auteurs de la spécification Streams Standard ont fait un travail remarquable. Image de couverture par Ryan Lara sur Unsplash.