Flux : guide définitif

Découvrez comment utiliser des flux lisibles et accessibles en écriture, et comment transformer des flux avec l'API Streams.

L'API Streams vous permet d'accéder de manière programmatique aux flux de données reçus sur le réseau ou créés par quelque moyen que ce soit localement, et de les traiter avec JavaScript. Le traitement par flux consiste à décomposer une ressource que vous souhaitez recevoir, envoyer ou transformer en petites fragments, puis à traiter ces fragments petit à petit. Bien que les navigateurs effectuent de toute façon la diffusion en continu lorsqu'ils reçoivent des éléments tels que du code HTML ou des vidéos à afficher sur des pages Web, cette fonctionnalité n'a jamais été disponible pour JavaScript avant l'introduction de fetch avec des flux en 2015.

Auparavant, si vous souhaitiez traiter une ressource (une vidéo, un fichier texte, etc.), vous deviez télécharger l'intégralité du fichier, attendre qu'il soit désérialisé dans un format approprié, puis le traiter. Avec la disponibilité des flux dans JavaScript, tout cela change. Vous pouvez désormais traiter progressivement les données brutes avec JavaScript dès qu'elles sont disponibles sur le client, sans avoir à générer de tampon, de chaîne ni de blob. Cela débloque un certain nombre de cas d'utilisation, dont certains sont listés ci-dessous:

  • Effets vidéo : transfert d'un flux vidéo lisible via un flux de transformation qui applique des effets en temps réel.
  • (Dé)compression de données : transfert d'un flux de fichiers via un flux de transformation qui le (dé)compresse de manière sélective.
  • Décodage d'image:canalisation d'un flux de réponse HTTP par le biais d'un flux de transformation qui décode les octets en données bitmap, puis via un autre flux de transformation qui convertit les bitmaps en fichiers PNG. S'il est installé dans le gestionnaire fetch d'un service worker, vous pouvez émuler de manière transparente de nouveaux formats d'image tels que AVIF.

Prise en charge des navigateurs

ReadableStream et WritableStream

Navigateurs pris en charge

  • Chrome : 43.
  • Edge : 14.
  • Firefox : 65.
  • Safari : 10.1.

Source

TransformStream

Navigateurs pris en charge

  • Chrome : 67
  • Edge : 79.
  • Firefox: 102
  • Safari : 14.1.

Source

Concepts fondamentaux

Avant d'aborder plus en détail les différents types de flux, laissez-moi vous présenter quelques concepts de base.

Morceaux

Un bloc est un élément de données unique écrit dans un flux ou lu à partir de celui-ci. Il peut être de n'importe quel type. Les flux peuvent même contenir des fragments de différents types. La plupart du temps, un bloc ne sera pas l'unité de données la plus atomique pour un flux donné. Par exemple, un flux d'octets peut contenir des fragments constitués de 16 Kio d'unités Uint8Array, au lieu d'octets uniques.

Flux lisibles

Un flux lisible représente une source de données que vous pouvez lire. En d'autres termes, les données sortent d'un flux lisible. Concrètement, un flux lisible est une instance de la classe ReadableStream.

Flux accessibles en écriture

Un flux en écriture représente une destination dans laquelle vous pouvez écrire des données. En d'autres termes, les données entrent dans un flux enregistrable. Concrètement, un flux en écriture est une instance de la classe WritableStream.

Transformer des flux

Un flux de transformation se compose d'une paire de flux : un flux en écriture, appelé côté en écriture, et un flux en lecture, appelé côté en lecture. Une métaphore concrète de ce processus serait un interprète simultané qui traduit d'une langue à une autre en temps réel. De manière spécifique au flux de transformation, l'écriture sur le côté en écriture permet de mettre de nouvelles données à disposition pour la lecture à partir du côté lisible. Concrètement, tout objet avec une propriété writable et une propriété readable peut servir de flux de transformation. Toutefois, la classe TransformStream standard permet de créer plus facilement une telle paire correctement enchevêtrée.

Chaînes de tuyauterie

Les flux sont principalement utilisés par transmission entre eux. Un flux lisible peut être redirigé directement vers un flux enregistrable à l'aide de la méthode pipeTo() du flux lisible, ou il peut d'abord être redirigé via un ou plusieurs flux de transformation à l'aide de la méthode pipeThrough() du flux lisible. Un ensemble de flux connectés de cette manière est appelé chaîne de canalisation.

Contre-pression

Une fois qu'une chaîne de canaux est créée, elle propage des signaux sur la vitesse à laquelle les blocs doivent y circuler. Si une étape de la chaîne ne peut pas encore accepter de blocs, elle propage un signal à rebours dans la chaîne de canalisation, jusqu'à ce que la source d'origine soit invitée à arrêter de produire des blocs si rapidement. Ce processus de normalisation du débit est appelé contre-pression.

Teeing

Un flux lisible peut être branché (du nom de la forme d'un "T" majuscule) à l'aide de sa méthode tee(). Cela verrouille le flux, c'est-à-dire qu'il n'est plus utilisable directement. Toutefois, cela crée deux nouveaux flux, appelés branches, qui peuvent être consommés indépendamment. Il est également important de définir un point de départ, car les flux ne peuvent pas être rembobinés ni redémarrés (nous y reviendrons plus tard).

Schéma d'une chaîne de canalisation composée d'un flux lisible provenant d'un appel à l'API fetch, qui est ensuite canalisé via un flux de transformation dont la sortie est branchée, puis envoyée au navigateur pour le premier flux lisible obtenu et au cache du service worker pour le deuxième flux lisible obtenu.
Chaîne de canaux.

Mécanisme d'un flux lisible

Un flux lisible est une source de données représentée en JavaScript par un objet ReadableStream qui provient d'une source sous-jacente. Le constructeur ReadableStream() crée et renvoie un objet de flux lisible à partir des gestionnaires donnés. Il existe deux types de sources sous-jacentes :

  • Les sources push vous envoient constamment des données lorsque vous y avez accédé. C'est à vous de démarrer, de suspendre ou d'annuler l'accès au flux. Il peut s'agir de flux vidéo en direct, d'événements envoyés par le serveur ou de WebSockets.
  • Les sources pull vous obligent à demander explicitement des données à ces sources une fois que vous y êtes connecté. Les opérations HTTP via des appels fetch() ou XMLHttpRequest en sont des exemples.

Les données de flux sont lues de manière séquentielle en petits fragments appelés segments. Les segments placés dans un flux sont mis en file d'attente. Cela signifie qu'ils sont en attente et prêts à être lus. Une file d'attente interne assure le suivi des fragments qui n'ont pas encore été lus.

Une stratégie de mise en file d'attente est un objet qui détermine comment un flux doit signaler la contre-pression en fonction de l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque fragment et compare la taille totale de tous les fragments de la file d'attente à un nombre spécifié, appelé point d'inflexion.

Les fragments du flux sont lus par un lecteur. Ce lecteur récupère les données un bloc à la fois, ce qui vous permet d'effectuer n'importe quel type d'opération dessus. Le lecteur et l'autre code de traitement qui l'accompagne sont appelés consommateur.

La structure suivante dans ce contexte s'appelle un contrôleur. Chaque flux lisible est associé à un contrôleur qui, comme son nom l'indique, vous permet de le contrôler.

Un seul lecteur peut lire un flux à la fois. Lorsqu'un lecteur est créé et commence à lire un flux (c'est-à-dire qu'il devient un lecteur actif), il est verrouillé sur celui-ci. Si vous souhaitez qu'un autre lecteur prenne en charge la lecture de votre flux, vous devez généralement libérer le premier lecteur avant toute autre action (bien que vous puissiez retirer les flux).

Créer un flux lisible

Vous créez un flux lisible en appelant son constructeur ReadableStream(). Le constructeur possède un argument facultatif underlyingSource, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux créée.

underlyingSource

Pour ce faire, vous pouvez utiliser les méthodes facultatives définies par le développeur suivantes :

  • start(controller) : appelé immédiatement lors de la création de l'objet. La méthode peut accéder à la source du flux et effectuer toute autre opération nécessaire à la configuration de la fonctionnalité de diffusion. Si ce processus doit être effectué de manière asynchrone, la méthode peut renvoyer une promesse pour signaler la réussite ou l'échec. Le paramètre controller transmis à cette méthode est un ReadableStreamDefaultController.
  • pull(controller) : permet de contrôler le flux à mesure que d'autres segments sont récupérés. Elle est appelée à plusieurs reprises tant que la file d'attente interne de blocs du flux n'est pas pleine, jusqu'à ce qu'elle atteigne son niveau maximal. Si le résultat de l'appel de pull() est une promesse, pull() n'est pas appelé à nouveau tant que cette promesse n'est pas remplie. Si la promesse est rejetée, le flux devient erroné.
  • cancel(reason) : appelé lorsque le consommateur du flux l'annule.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController accepte les méthodes suivantes :

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Le deuxième argument, également facultatif, du constructeur ReadableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend deux paramètres :

  • highWaterMark: nombre non négatif indiquant la marque des hautes eaux du cours d'eau utilisant cette stratégie de mise en file d'attente.
  • size(chunk) : fonction qui calcule et renvoie la taille finie non négative de la valeur de bloc donnée. Le résultat permet de déterminer la contre-pression, qui se manifeste via la propriété ReadableStreamDefaultController.desiredSize appropriée. Elle régit également l'appel de la méthode pull() de la source sous-jacente.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Les méthodes getReader() et read()

Pour lire à partir d'un flux lisible, vous avez besoin d'un lecteur, qui sera un ReadableStreamDefaultReader. La méthode getReader() de l'interface ReadableStream crée un lecteur et verrouille le flux dessus. Tant que le flux est verrouillé, aucun autre lecteur ne peut être acquis tant que celui-ci n'est pas libéré.

La méthode read() de l'interface ReadableStreamDefaultReader renvoie une promesse permettant d'accéder au prochain segment de la file d'attente interne du flux. Il remplit ou refuse avec un résultat en fonction de l'état du flux. Les différentes possibilités sont les suivantes :

  • Si un fragment est disponible, la promesse est tenue avec un objet au format
    { value: chunk, done: false }.
  • Si le flux est fermé, la promesse est remplie avec un objet au format
    { value: undefined, done: true }.
  • Si le flux est associé à une erreur, la promesse est rejetée avec l'erreur correspondante.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Propriété locked

Vous pouvez vérifier si un flux lisible est verrouillé en accédant à sa propriété ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Exemples de code de flux lisibles

L'exemple de code ci-dessous montre toutes les étapes en action. Vous devez d'abord créer un ReadableStream qui, dans son argument underlyingSource (c'est-à-dire la classe TimestampSource), définit une méthode start(). Cette méthode indique au controller du flux de enqueue() un code temporel toutes les secondes pendant dix secondes. Enfin, il demande au contrôleur de close() le flux. Pour consommer ce flux, créez un lecteur via la méthode getReader() et appelez read() jusqu'à ce que le flux soit done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Itération asynchrone

Vérifier à chaque itération de la boucle read() si le flux est done n'est peut-être pas l'API la plus pratique. Heureusement, il existe bientôt une meilleure façon de procéder : l'itération asynchrone.

for await (const chunk of stream) {
  console.log(chunk);
}

Une solution de contournement pour utiliser l'itération asynchrone aujourd'hui consiste à implémenter le comportement avec un polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Créer un flux lisible

La méthode tee() de l'interface ReadableStream lit le flux lisible actuel et renvoie un tableau à deux éléments contenant les deux branches obtenues en tant que nouvelles instances ReadableStream. Cela permet à deux lecteurs de lire un flux simultanément. Vous pouvez le faire, par exemple, dans un service worker si vous souhaitez récupérer une réponse du serveur et la diffuser dans le navigateur, mais aussi dans le cache du service worker. Étant donné qu'un corps de réponse ne peut pas être utilisé plus d'une fois, vous avez besoin de deux copies pour ce faire. Pour annuler le flux, vous devez annuler les deux branches générées. Le teeing d'un flux le verrouille généralement pendant toute la durée, ce qui empêche les autres lecteurs de le verrouiller.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Flux d'octets lisibles

Pour les flux représentant des octets, une version étendue du flux lisible est fournie pour gérer efficacement les octets, en particulier en réduisant au maximum les copies. Les flux d'octets permettent d'acquérir des lecteurs BYOB (Bring Your Own Buffer). L'implémentation par défaut peut générer une gamme de sorties différentes, telles que des chaînes ou des tampons de tableau dans le cas des WebSockets, tandis que les flux d'octets garantissent la sortie d'octets. De plus, les lecteurs BYOB offrent des avantages en termes de stabilité. En effet, si un tampon se détache, il peut garantir qu'on n'écrit pas deux fois dans le même tampon, ce qui évite les conditions de concurrence. Les lecteurs BYOB peuvent réduire le nombre de fois où le navigateur doit exécuter la récupération de mémoire, car ils peuvent réutiliser les tampons.

Créer un flux d'octets lisible

Vous pouvez créer un flux d'octets lisible en transmettant un paramètre type supplémentaire au constructeur ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

La source sous-jacente d'un flux d'octets lisible reçoit un ReadableByteStreamController à manipuler. Sa méthode ReadableByteStreamController.enqueue() accepte un argument chunk dont la valeur est un ArrayBufferView. La propriété ReadableByteStreamController.byobRequest renvoie la requête de pull BYOB actuelle, ou la valeur null si elle n'existe pas. Enfin, la propriété ReadableByteStreamController.desiredSize renvoie la taille souhaitée pour remplir la file d'attente interne du flux contrôlé.

queuingStrategy

Le deuxième argument, également facultatif, du constructeur ReadableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend un paramètre :

  • highWaterMark : nombre non négatif d'octets indiquant le niveau maximal du flux à l'aide de cette stratégie de mise en file d'attente. Il permet de déterminer la contre-pression, qui se manifeste via la propriété ReadableByteStreamController.desiredSize appropriée. Il détermine également le moment où la méthode pull() de la source sous-jacente est appelée.

Les méthodes getReader() et read()

Vous pouvez ensuite accéder à un ReadableStreamBYOBReader en définissant le paramètre mode en conséquence : ReadableStream.getReader({ mode: "byob" }). Cela permet de contrôler plus précisément l'allocation de tampons afin d'éviter les copies. Pour lire à partir du flux d'octets, vous devez appeler ReadableStreamBYOBReader.read(view), où view est un ArrayBufferView.

Exemple de code de flux d'octets lisible

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

La fonction suivante renvoie des flux d'octets lisibles qui permettent une lecture efficace sans copie d'un tableau généré de manière aléatoire. Au lieu d'utiliser une taille de bloc prédéterminée de 1 024, il tente de remplir la mémoire tampon fournie par le développeur, ce qui permet un contrôle total.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mécanisme d'un flux en écriture

Un flux en écriture est une destination dans laquelle vous pouvez écrire des données, représentées en JavaScript par un objet WritableStream. Il s'agit d'une abstraction au-dessus d'un évier sous-jacent, un évier d'E/S de niveau inférieur dans lequel les données brutes sont écrites.

Les données sont écrites dans le flux via un écrivain, un bloc à la fois. Un bloc peut prendre de nombreuses formes, tout comme les blocs d'un lecteur. Vous pouvez utiliser le code de votre choix pour produire les fragments prêts à être écrits. Le rédacteur et le code associé sont appelés producteurs.

Lorsqu'un éditeur est créé et commence à écrire dans un flux (un éditeur actif), il est dit verrouillé. Un seul rédacteur peut écrire dans un flux accessible en écriture à la fois. Si vous souhaitez qu'un autre éditeur commence à écrire dans votre flux, vous devez généralement le libérer avant d'y associer un autre éditeur.

Une file d'attente interne permet de suivre les segments qui ont été écrits dans le flux, mais qui n'ont pas encore été traités par le sink sous-jacent.

Une stratégie de mise en file d'attente est un objet qui détermine comment un flux doit signaler la contre-pression en fonction de l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque fragment et compare la taille totale de tous les fragments de la file d'attente à un nombre spécifié, appelé point d'inflexion.

La construction finale est appelée contrôleur. Chaque flux en écriture est associé à un contrôleur qui vous permet de le contrôler (par exemple, pour l'arrêter).

Créer un flux en écriture

L'interface WritableStream de l'API Streams fournit une abstraction standard pour écrire des données de streaming vers une destination, appelée "récepteur". Cet objet est fourni avec une rétropression et une mise en file d'attente intégrées. Vous créez un flux en écrivant en appelant son constructeur WritableStream(). Il comporte un paramètre underlyingSink facultatif, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux créée.

underlyingSink

underlyingSink peut inclure les méthodes facultatives définies par le développeur suivantes. Le paramètre controller transmis à certaines méthodes est WritableStreamDefaultController.

  • start(controller) : cette méthode est appelée immédiatement lors de la création de l'objet. Le contenu de cette méthode doit viser à accéder au collecteur sous-jacent. Si ce processus doit être effectué de manière asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec.
  • write(chunk, controller) : cette méthode est appelée lorsqu'un nouveau bloc de données (spécifié dans le paramètre chunk) est prêt à être écrit dans le collecteur sous-jacent. Il peut renvoyer une promesse pour signaler la réussite ou l'échec de l'opération d'écriture. Cette méthode n'est appelée que lorsque les écritures précédentes ont réussi, et jamais après la fermeture ou l'abandon du flux.
  • close(controller) : cette méthode est appelée si l'application signale qu'elle a terminé d'écrire des segments dans le flux. Le contenu doit faire tout ce qui est nécessaire pour finaliser les écritures dans le récepteur sous-jacent et libérer l'accès à celui-ci. Si ce processus est asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec. Cette méthode ne sera appelée qu'après la réussite de toutes les écritures mises en file d'attente.
  • abort(reason) : cette méthode est appelée si l'application indique qu'elle souhaite fermer brusquement le flux et le placer dans un état d'erreur. Il peut nettoyer toutes les ressources détenues, comme close(), mais abort() est appelé même si des écritures sont mises en file d'attente. Ces fragments seront jetés. Si ce processus est asynchrone, il peut renvoyer une promesse indiquant la réussite ou l'échec de l'opération. Le paramètre reason contient un DOMString décrivant pourquoi le flux a été interrompu.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

L'interface WritableStreamDefaultController de l'API Streams représente un contrôleur permettant de contrôler l'état d'un WritableStream lors de la configuration, à mesure que d'autres blocs sont envoyés pour l'écriture ou à la fin de l'écriture. Lors de la création d'un WritableStream, une instance WritableStreamDefaultController correspondante est attribuée au collecteur sous-jacent pour le manipuler. WritableStreamDefaultController n'a qu'une seule méthode : WritableStreamDefaultController.error(), ce qui entraîne des erreurs lors des futures interactions avec le flux associé. WritableStreamDefaultController accepte également une propriété signal qui renvoie une instance de AbortSignal, ce qui permet d'arrêter une opération WritableStream si nécessaire.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Le deuxième argument, également facultatif, du constructeur WritableStream() est queuingStrategy. Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend deux paramètres :

  • highWaterMark: nombre non négatif indiquant la marque des hautes eaux du cours d'eau utilisant cette stratégie de mise en file d'attente.
  • size(chunk) : fonction qui calcule et renvoie la taille finie non négative de la valeur de bloc donnée. Le résultat permet de déterminer la contre-pression, qui s'affiche via la propriété WritableStreamDefaultWriter.desiredSize appropriée.

Les méthodes getWriter() et write()

Pour écrire dans un flux en écriture, vous avez besoin d'un écrivain, qui sera un WritableStreamDefaultWriter. La méthode getWriter() de l'interface WritableStream renvoie une nouvelle instance de WritableStreamDefaultWriter et verrouille le flux sur cette instance. Lorsque le flux est verrouillé, aucun autre auteur ne peut être acquis tant que le flux actuel n'est pas libéré.

La méthode write() de l'interface WritableStreamDefaultWriter écrit un bloc de données transmis dans un WritableStream et son évier sous-jacent, puis renvoie une promesse qui se résout pour indiquer la réussite ou l'échec de l'opération d'écriture. Notez que la "réussite" dépend du récepteur sous-jacent. Cela peut indiquer que le fragment a été accepté, mais pas nécessairement qu'il est enregistré en toute sécurité dans sa destination finale.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Propriété locked

Vous pouvez vérifier si un flux en écriture est verrouillé en accédant à sa propriété WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Exemple de code de flux en écriture

L'exemple de code ci-dessous illustre toutes les étapes en action.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Canaliser un flux lisible vers un flux enregistrable

Un flux lisible peut être redirigé vers un flux en écriture via la méthode pipeTo() du flux lisible. ReadableStream.pipeTo() dirige le ReadableStream actuel vers un WritableStream donné et renvoie une promesse qui se termine une fois le processus terminé, ou le rejette en cas d'erreur.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Créer un flux de transformation

L'interface TransformStream de l'API Streams représente un ensemble de données transformables. Vous créez un flux de transformation en appelant son constructeur TransformStream(), qui crée et renvoie un objet de flux de transformation à partir des gestionnaires donnés. Le constructeur TransformStream() accepte comme premier argument un objet JavaScript facultatif représentant transformer. Ces objets peuvent contenir l'une des méthodes suivantes:

transformer

  • start(controller) : cette méthode est appelée immédiatement lors de la création de l'objet. En règle générale, il permet de mettre en file d'attente des fragments de préfixes à l'aide de controller.enqueue(). Ces blocs seront lus à partir du côté lisible, mais ne dépendent d'aucune écriture sur le côté en écriture. Si ce processus initial est asynchrone, par exemple parce qu'il faut un certain effort pour acquérir les segments de préfixe, la fonction peut renvoyer une promesse pour signaler un succès ou un échec. Une promesse refusée entraînera une erreur dans le flux. Toutes les exceptions générées seront renvoyées par le constructeur TransformStream().
  • transform(chunk, controller) : cette méthode est appelée lorsqu'un nouveau bloc écrit à l'origine sur le côté en écriture est prêt à être transformé. L'implémentation du flux garantit que cette fonction ne sera appelée qu'après la réussite des transformations précédentes, et jamais avant la fin de start() ou après l'appel de flush(). Cette fonction effectue le travail de transformation réel du flux de transformation. Il peut mettre en file d'attente les résultats à l'aide de controller.enqueue(). Cela permet à un seul bloc écrit sur le côté en écriture de générer zéro ou plusieurs blocs sur le côté en lecture, en fonction du nombre d'appels de controller.enqueue(). Si le processus de transformation est asynchrone, cette fonction peut renvoyer une promesse pour signaler la réussite ou l'échec de la transformation. Une promesse refusée génère une erreur à la fois sur les côtés lisibles et enregistrables du flux de transformation. Si aucune méthode transform() n'est fournie, la transformation d'identité est utilisée, ce qui met en file d'attente des segments inchangés du côté en écriture vers le côté en lecture.
  • flush(controller): cette méthode est appelée une fois que tous les fragments écrits sur le côté accessible en écriture ont été transformés en passant avec succès via transform(), et que le côté accessible en écriture est sur le point d'être fermé. Cette méthode est généralement utilisée pour mettre en file d'attente des segments de suffixe sur le côté lisible, avant que celui-ci ne soit également fermé. Si le processus de vidage est asynchrone, la fonction peut renvoyer une promesse pour signaler un succès ou un échec. Le résultat sera communiqué à l'appelant de stream.writable.write(). De plus, une promesse refusée générera une erreur au niveau des côtés lisibles et en écriture du flux. Le traitement d'une exception est le même que pour le renvoi d'une promesse refusée.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Stratégies de mise en file d'attente writableStrategy et readableStrategy

Les deuxième et troisième paramètres facultatifs du constructeur TransformStream() sont des stratégies de mise en file d'attente writableStrategy et readableStrategy facultatives. Elles sont définies comme indiqué dans les sections sur les flux lisibles et inscriptibles, respectivement.

Exemple de code de flux de transformation

L'exemple de code suivant montre un flux de transformation simple en action.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Transmettre un flux lisible via un flux de transformation

La méthode pipeThrough() de l'interface ReadableStream permet de canaliser le flux actuel via un flux de transformation ou toute autre paire en lecture/écriture. Le pipeage d'un flux le verrouille généralement pendant toute la durée du pipeage, ce qui empêche les autres lecteurs de le verrouiller.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

L'exemple de code suivant (un peu artificiel) montre comment mettre en œuvre une version "shouting" de fetch() qui met tout le texte en majuscules en consommant la promesse de réponse renvoyée sous la forme d'un flux et en mettant en majuscules une par une. L'avantage de cette approche est que vous n'avez pas besoin d'attendre que l'intégralité du document soit téléchargée, ce qui peut faire une énorme différence lorsque vous travaillez avec de gros fichiers.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Démo

La démonstration ci-dessous montre les flux lisibles, enregistrables et de transformation en action. Il inclut également des exemples de chaînes de canaux pipeThrough() et pipeTo(), ainsi qu'une démonstration de tee(). Vous pouvez éventuellement exécuter la démonstration dans sa propre fenêtre ou afficher le code source.

Flux utiles disponibles dans le navigateur

Le navigateur intègre un certain nombre de flux utiles. Vous pouvez facilement créer un ReadableStream à partir d'un blob. La méthode stream() de l'interface Blob renvoie un ReadableStream qui, lors de la lecture, renvoie les données contenues dans le blob. N'oubliez pas non plus qu'un objet File est un type spécifique de Blob et qu'il peut être utilisé dans n'importe quel contexte où un blob peut l'être.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Les variantes de streaming de TextDecoder.decode() et TextEncoder.encode() sont appelées respectivement TextDecoderStream et TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

La compression ou la décompression d'un fichier est facile avec les flux de transformation CompressionStream et DecompressionStream, respectivement. L'exemple de code ci-dessous montre comment télécharger la spécification Streams, la compresser (gzip) directement dans le navigateur et écrire le fichier compressé directement sur le disque.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Le FileSystemWritableFileStream de l'API File System Access et les flux de requêtes fetch() expérimentaux sont des exemples de flux accessibles en écriture.

L'API Serial utilise beaucoup de flux lisibles et enregistrables.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Enfin, l'API WebSocketStream intègre les flux à l'API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Ressources utiles

Remerciements

Cet article a été relu par Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley et Adam Rice. Les articles de blog de Jake Archibald m'ont beaucoup aidé à comprendre les flux. Certains des exemples de code sont inspirés des explorations de l'utilisateur GitHub @bellbind, et certaines parties du texte s'appuient fortement sur les documentations Web MDN sur les flux. Les auteurs de la norme Streams ont fait un travail remarquable en rédigeant cette spécification. Image principale par Ryan Lara sur Unsplash.