Découvrez comment utiliser des flux lisibles, enregistrables et transformables avec l'API Streams.
L'API Streams vous permet d'accéder de manière programmatique aux flux de données reçus sur le réseau ou créés par quelque moyen que ce soit localement, et de les traiter avec JavaScript. Le streaming consiste à décomposer une ressource que vous souhaitez recevoir, envoyer ou transformer en petits blocs, puis à traiter ces blocs bit par bit. Bien que les navigateurs effectuent de toute façon la diffusion en continu lorsqu'ils reçoivent des éléments tels que du code HTML ou des vidéos à afficher sur des pages Web, cette fonctionnalité n'a jamais été disponible pour JavaScript avant l'introduction de fetch
avec des flux en 2015.
Auparavant, si vous souhaitiez traiter une ressource (une vidéo, un fichier texte, etc.), vous deviez télécharger l'intégralité du fichier, attendre qu'il soit désérialisé dans un format approprié, puis le traiter. Avec les flux disponibles pour JavaScript, tout cela change. Vous pouvez désormais traiter progressivement les données brutes avec JavaScript dès qu'elles sont disponibles sur le client, sans avoir à générer de tampon, de chaîne ni de blob. Cela ouvre un certain nombre de cas d'utilisation, dont certains sont listés ci-dessous:
- Effets vidéo:transfert d'un flux vidéo lisible via un flux de transformation qui applique des effets en temps réel.
- (Dé)compression des données:transfert d'un flux de fichiers via un flux de transformation qui le (dé)compresse de manière sélective.
- Décodage d'image:transfert d'un flux de réponse HTTP via un flux de transformation qui décode les octets en données bitmap, puis via un autre flux de transformation qui traduit les bitmaps en fichiers PNG. S'il est installé dans le gestionnaire
fetch
d'un service worker, vous pouvez polyfiller de manière transparente de nouveaux formats d'image tels que AVIF.
Prise en charge des navigateurs
ReadableStream et WritableStream
TransformStream
Concepts fondamentaux
Avant d'entrer dans les détails des différents types de flux, laissez-moi vous présenter quelques concepts fondamentaux.
Morceaux
Un bloc est un élément de données unique écrit dans un flux ou lu à partir de celui-ci. Il peut être de n'importe quel type. Les flux peuvent même contenir des segments de différents types. La plupart du temps, un bloc ne sera pas l'unité de données la plus atomique pour un flux donné. Par exemple, un flux d'octets peut contenir des segments composés de 16 ko d'unités Uint8Array
au lieu d'octets individuels.
Flux lisibles
Un flux lisible représente une source de données que vous pouvez lire. En d'autres termes, les données sortent d'un flux lisible. Concrètement, un flux lisible est une instance de la classe ReadableStream
.
Flux en écriture
Un flux en écriture représente une destination dans laquelle vous pouvez écrire des données. En d'autres termes, les données entrent dans un flux enregistrable. Concrètement, un flux en écriture est une instance de la classe WritableStream
.
Flux de transformation
Un flux de transformation se compose d'une paire de flux: un flux en écriture, appelé côté en écriture, et un flux en lecture, appelé côté en lecture.
Pour illustrer ce concept, imaginons un interprète simultané qui traduit d'une langue à une autre en temps réel.
De manière spécifique au flux de transformation, l'écriture sur le côté en écriture permet de mettre de nouvelles données à disposition pour la lecture à partir du côté lisible. Concrètement, tout objet avec une propriété writable
et une propriété readable
peut servir de flux de transformation. Toutefois, la classe TransformStream
standard permet de créer plus facilement une telle paire correctement enchevêtrée.
Chaînes de tuyaux
Les flux sont principalement utilisés en les transmettant les uns aux autres. Un flux lisible peut être redirigé directement vers un flux enregistrable à l'aide de la méthode pipeTo()
du flux lisible, ou il peut d'abord être redirigé via un ou plusieurs flux de transformation à l'aide de la méthode pipeThrough()
du flux lisible. Un ensemble de flux connectés de cette manière est appelé chaîne de canalisation.
Contre-pression
Une fois qu'une chaîne de canaux est créée, elle propage des signaux indiquant la vitesse à laquelle les blocs doivent y circuler. Si une étape de la chaîne ne peut pas encore accepter de blocs, elle propage un signal à rebours dans la chaîne de canalisation, jusqu'à ce que la source d'origine soit invitée à arrêter de produire des blocs si rapidement. Ce processus de normalisation du débit est appelé contre-pression.
Teeing
Un flux lisible peut être branché (du nom de la forme d'un "T" majuscule) à l'aide de sa méthode tee()
.
Cela bloque le flux, c'est-à-dire qu'il n'est plus directement utilisable. Toutefois, cela crée deux nouveaux flux, appelés branches, qui peuvent être consommés indépendamment.
Le teeing est également important, car les flux ne peuvent pas être rembobinés ni redémarrés. Nous y reviendrons plus tard.
Fonctionnement d'un flux lisible
Un flux lisible est une source de données représentée en JavaScript par un objet ReadableStream
qui provient d'une source sous-jacente. Le constructeur ReadableStream()
crée et renvoie un objet de flux lisible à partir des gestionnaires donnés. Il existe deux types de sources sous-jacentes:
- Les sources push vous envoient constamment des données lorsque vous y avez accédé. C'est à vous de démarrer, de suspendre ou d'annuler l'accès au flux. Exemples : flux vidéo en direct, événements envoyés par le serveur ou WebSockets.
- Les sources pull vous obligent à leur demander explicitement des données une fois que vous y êtes connecté. Par exemple, les opérations HTTP via des appels
fetch()
ouXMLHttpRequest
.
Les données de flux sont lues de manière séquentielle en petits morceaux appelés blocs. Les segments placés dans un flux sont dits mis en file d'attente. Cela signifie qu'elles sont en attente dans une file d'attente et prêtes à être lues. Une file d'attente interne permet de suivre les blocs qui n'ont pas encore été lus.
Une stratégie de mise en file d'attente est un objet qui détermine comment un flux doit signaler la contre-pression en fonction de l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque segment et compare la taille totale de tous les segments de la file d'attente à un nombre spécifié, appelé point d'inflexion.
Les segments du flux sont lus par un lecteur. Ce lecteur récupère les données un bloc à la fois, ce qui vous permet d'effectuer n'importe quel type d'opération dessus. Le lecteur et l'autre code de traitement qui l'accompagne sont appelés consommateur.
Le concept suivant dans ce contexte s'appelle un contrôleur. Chaque flux lisible est associé à un contrôleur qui, comme son nom l'indique, vous permet de contrôler le flux.
Un seul lecteur peut lire un flux à la fois. Lorsqu'un lecteur est créé et commence à lire un flux (c'est-à-dire qu'il devient un lecteur actif), il est verrouillé. Si vous souhaitez qu'un autre lecteur prenne en charge la lecture de votre flux, vous devez généralement libérer le premier lecteur avant de faire quoi que ce soit d'autre (bien que vous puissiez brancher des flux).
Créer un flux lisible
Pour créer un flux lisible, appelez son constructeur ReadableStream()
.
Le constructeur possède un argument facultatif underlyingSource
, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux créée.
Le underlyingSource
Pour ce faire, vous pouvez utiliser les méthodes facultatives définies par le développeur suivantes:
start(controller)
: appelé immédiatement lors de la création de l'objet. La méthode peut accéder à la source de flux et effectuer toutes les autres opérations requises pour configurer la fonctionnalité de flux. Si ce processus doit être effectué de manière asynchrone, la méthode peut renvoyer une promesse pour signaler la réussite ou l'échec. Le paramètrecontroller
transmis à cette méthode est unReadableStreamDefaultController
.pull(controller)
: permet de contrôler le flux à mesure que d'autres segments sont récupérés. Il est appelé à plusieurs reprises tant que la file d'attente interne de blocs du flux n'est pas pleine, jusqu'à ce que la file d'attente atteigne son niveau maximal. Si le résultat de l'appel depull()
est une promesse,pull()
n'est pas appelé à nouveau tant que cette promesse n'est pas remplie. Si la promesse est refusée, le flux devient erroné.cancel(reason)
: appelé lorsque le client du flux annule le flux.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
accepte les méthodes suivantes:
ReadableStreamDefaultController.close()
ferme le flux associé.ReadableStreamDefaultController.enqueue()
met en file d'attente un bloc donné dans le flux associé.ReadableStreamDefaultController.error()
entraîne une erreur lors des futures interactions avec le flux associé.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
Le queuingStrategy
Le deuxième argument, également facultatif, du constructeur ReadableStream()
est queuingStrategy
.
Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend deux paramètres:
highWaterMark
: nombre non négatif indiquant le niveau maximal du flux à l'aide de cette stratégie de mise en file d'attente.size(chunk)
: fonction qui calcule et renvoie la taille finie non négative de la valeur de bloc donnée. Le résultat permet de déterminer la contre-pression, qui s'affiche via la propriétéReadableStreamDefaultController.desiredSize
appropriée. Il détermine également le moment où la méthodepull()
de la source sous-jacente est appelée.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Méthodes getReader()
et read()
Pour lire à partir d'un flux lisible, vous avez besoin d'un lecteur, qui sera un ReadableStreamDefaultReader
.
La méthode getReader()
de l'interface ReadableStream
crée un lecteur et verrouille le flux dessus. Tant que le flux est verrouillé, aucun autre lecteur ne peut être acquis tant que celui-ci n'est pas libéré.
La méthode read()
de l'interface ReadableStreamDefaultReader
renvoie une promesse permettant d'accéder au prochain segment de la file d'attente interne du flux. Il répond ou refuse avec un résultat en fonction de l'état du flux. Voici les différentes possibilités:
- Si un bloc est disponible, la promesse est remplie avec un objet au format
{ value: chunk, done: false }
. - Si le flux est fermé, la promesse est remplie avec un objet de la forme
{ value: undefined, done: true }
. - Si le flux est associé à une erreur, la promesse est rejetée avec l'erreur correspondante.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Propriété locked
Vous pouvez vérifier si un flux lisible est verrouillé en accédant à sa propriété ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemples de code de flux lisibles
L'exemple de code ci-dessous montre toutes les étapes en action. Vous devez d'abord créer un ReadableStream
qui, dans son argument underlyingSource
(c'est-à-dire la classe TimestampSource
), définit une méthode start()
.
Cette méthode indique au controller
du flux de enqueue()
un code temporel toutes les secondes pendant dix secondes.
Enfin, il indique au contrôleur de close()
le flux. Vous consommez ce flux en créant un lecteur via la méthode getReader()
et en appelant read()
jusqu'à ce que le flux soit done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Itération asynchrone
Vérifier à chaque itération de la boucle read()
si le flux est done
n'est peut-être pas l'API la plus pratique.
Heureusement, il existe bientôt une meilleure façon de procéder: l'itération asynchrone.
for await (const chunk of stream) {
console.log(chunk);
}
Pour utiliser l'itération asynchrone aujourd'hui, vous pouvez implémenter le comportement avec un polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Brancher un flux lisible
La méthode tee()
de l'interface ReadableStream
scinde le flux lisible actuel, renvoyant un tableau à deux éléments contenant les deux branches résultantes en tant que nouvelles instances ReadableStream
. Cela permet à deux lecteurs de lire un flux simultanément. Vous pouvez le faire, par exemple, dans un service worker si vous souhaitez récupérer une réponse du serveur et la diffuser dans le navigateur, mais aussi dans le cache du service worker. Étant donné qu'un corps de réponse ne peut pas être utilisé plus d'une fois, vous avez besoin de deux copies pour ce faire. Pour annuler le flux, vous devez ensuite annuler les deux branches générées. Le teeing d'un flux le verrouille généralement pendant toute la durée, ce qui empêche les autres lecteurs de le verrouiller.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Flux d'octets lisibles
Pour les flux représentant des octets, une version étendue du flux lisible est fournie pour gérer efficacement les octets, en particulier en réduisant les copies. Les flux d'octets permettent d'acquérir des lecteurs BYOB (Bring Your Own Buffer). L'implémentation par défaut peut générer différentes sorties, telles que des chaînes ou des tampons de tableau dans le cas des WebSockets, tandis que les flux d'octets garantissent la sortie d'octets. De plus, les lecteurs BYOB offrent des avantages en termes de stabilité. En effet, si un tampon se détache, il peut garantir qu'on n'écrit pas deux fois dans le même tampon, ce qui évite les conditions de course. Les lecteurs BYOB peuvent réduire le nombre de fois où le navigateur doit exécuter le nettoyage de la mémoire, car ils peuvent réutiliser les tampons.
Créer un flux d'octets lisible
Vous pouvez créer un flux d'octets lisible en transmettant un paramètre type
supplémentaire au constructeur ReadableStream()
.
new ReadableStream({ type: 'bytes' });
Le underlyingSource
La source sous-jacente d'un flux d'octets lisible est associée à un ReadableByteStreamController
à manipuler. Sa méthode ReadableByteStreamController.enqueue()
accepte un argument chunk
dont la valeur est un ArrayBufferView
. La propriété ReadableByteStreamController.byobRequest
renvoie la requête de pull BYOB actuelle, ou "null" si elle n'existe pas. Enfin, la propriété ReadableByteStreamController.desiredSize
renvoie la taille souhaitée pour remplir la file d'attente interne du flux contrôlé.
Le queuingStrategy
Le deuxième argument, également facultatif, du constructeur ReadableStream()
est queuingStrategy
.
Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend un paramètre:
highWaterMark
: nombre non négatif d'octets indiquant le niveau de la ligne de flottaison du flux à l'aide de cette stratégie de mise en file d'attente. Il permet de déterminer la contre-pression, qui se manifeste via la propriétéReadableByteStreamController.desiredSize
appropriée. Il détermine également le moment où la méthodepull()
de la source sous-jacente est appelée.
Méthodes getReader()
et read()
Vous pouvez ensuite accéder à un ReadableStreamBYOBReader
en définissant le paramètre mode
en conséquence : ReadableStream.getReader({ mode: "byob" })
. Cela permet de contrôler plus précisément l'allocation de tampons afin d'éviter les copies. Pour lire à partir du flux d'octets, vous devez appeler ReadableStreamBYOBReader.read(view)
, où view
est un ArrayBufferView
.
Exemple de code de flux d'octets lisible
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
La fonction suivante renvoie des flux d'octets lisibles qui permettent une lecture efficace sans copie d'un tableau généré de manière aléatoire. Au lieu d'utiliser une taille de fragment prédéterminée de 1 024, il tente de remplir la mémoire tampon fournie par le développeur, ce qui permet un contrôle total.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Mécanisme d'un flux en écriture
Un flux en écriture est une destination dans laquelle vous pouvez écrire des données, représentées en JavaScript par un objet WritableStream
. Il s'agit d'une abstraction au-dessus d'un évier sous-jacent, un évier d'E/S de niveau inférieur dans lequel les données brutes sont écrites.
Les données sont écrites dans le flux via un enregistreur, un bloc à la fois. Un bloc peut prendre une multitude de formes, tout comme les blocs d'un lecteur. Vous pouvez utiliser le code de votre choix pour produire les blocs prêts à être écrits. L'écrivain et le code associé sont appelés producteur.
Lorsqu'un écrivain est créé et commence à écrire dans un flux (un écrivain actif), il est dit qu'il est verrouillé. Un seul écrivain peut écrire dans un flux en écriture à la fois. Si vous souhaitez qu'un autre éditeur commence à écrire dans votre flux, vous devez généralement le libérer avant d'y associer un autre éditeur.
Une file d'attente interne permet de suivre les segments qui ont été écrits dans le flux, mais qui n'ont pas encore été traités par le sink sous-jacent.
Une stratégie de mise en file d'attente est un objet qui détermine comment un flux doit signaler la contre-pression en fonction de l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque segment et compare la taille totale de tous les segments de la file d'attente à un nombre spécifié, appelé point d'inflexion.
La construction finale est appelée contrôleur. Chaque flux en écriture est associé à un contrôleur qui vous permet de le contrôler (par exemple, pour l'arrêter).
Créer un flux en écriture
L'interface WritableStream
de l'API Streams fournit une abstraction standard pour écrire des données de streaming vers une destination, appelée "récepteur". Cet objet est fourni avec une rétropression et une mise en file d'attente intégrées. Vous créez un flux en écrivant en appelant son constructeur WritableStream()
.
Il comporte un paramètre underlyingSink
facultatif, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux créée.
Le underlyingSink
underlyingSink
peut inclure les méthodes facultatives définies par le développeur suivantes. Le paramètre controller
transmis à certaines des méthodes est un WritableStreamDefaultController
.
start(controller)
: cette méthode est appelée immédiatement lors de la création de l'objet. Le contenu de cette méthode doit viser à accéder au collecteur sous-jacent. Si ce processus doit être effectué de manière asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec.write(chunk, controller)
: cette méthode est appelée lorsqu'un nouveau bloc de données (spécifié dans le paramètrechunk
) est prêt à être écrit dans le collecteur sous-jacent. Il peut renvoyer une promesse pour signaler la réussite ou l'échec de l'opération d'écriture. Cette méthode n'est appelée que lorsque les écritures précédentes ont réussi, et jamais après la fermeture ou l'abandon du flux.close(controller)
: cette méthode est appelée si l'application signale qu'elle a terminé d'écrire des segments dans le flux. Le contenu doit faire tout ce qui est nécessaire pour finaliser les écritures dans le récepteur sous-jacent et libérer l'accès à celui-ci. Si ce processus est asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec. Cette méthode ne sera appelée qu'après la réussite de toutes les écritures mises en file d'attente.abort(reason)
: cette méthode est appelée si l'application signale qu'elle souhaite fermer brusquement le flux et le placer dans un état d'erreur. Il peut nettoyer toutes les ressources détenues, commeclose()
, maisabort()
est appelé même si des écritures sont mises en file d'attente. Ces blocs seront supprimés. Si ce processus est asynchrone, il peut renvoyer une promesse pour signaler un succès ou un échec. Le paramètrereason
contient unDOMString
décrivant pourquoi le flux a été interrompu.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
L'interface WritableStreamDefaultController
de l'API Streams représente un contrôleur permettant de contrôler l'état d'un WritableStream
lors de la configuration, à mesure que d'autres segments sont envoyés pour l'écriture ou à la fin de l'écriture. Lors de la création d'un WritableStream
, une instance WritableStreamDefaultController
correspondante est attribuée au collecteur sous-jacent pour le manipuler. WritableStreamDefaultController
n'a qu'une seule méthode : WritableStreamDefaultController.error()
, ce qui entraîne des erreurs lors des futures interactions avec le flux associé.
WritableStreamDefaultController
accepte également une propriété signal
qui renvoie une instance de AbortSignal
, ce qui permet d'arrêter une opération WritableStream
si nécessaire.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
Le queuingStrategy
Le deuxième argument, également facultatif, du constructeur WritableStream()
est queuingStrategy
.
Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend deux paramètres:
highWaterMark
: nombre non négatif indiquant le niveau maximal du flux à l'aide de cette stratégie de mise en file d'attente.size(chunk)
: fonction qui calcule et renvoie la taille finie non négative de la valeur de bloc donnée. Le résultat permet de déterminer la contre-pression, qui s'affiche via la propriétéWritableStreamDefaultWriter.desiredSize
appropriée.
Méthodes getWriter()
et write()
Pour écrire dans un flux en écriture, vous avez besoin d'un éditeur, qui sera un WritableStreamDefaultWriter
. La méthode getWriter()
de l'interface WritableStream
renvoie une nouvelle instance de WritableStreamDefaultWriter
et verrouille le flux sur cette instance. Tant que le flux est verrouillé, aucun autre éditeur ne peut être acquis tant que l'éditeur actuel n'est pas libéré.
La méthode write()
de l'interface WritableStreamDefaultWriter
écrit un bloc de données transmis dans un WritableStream
et son évier sous-jacent, puis renvoie une promesse qui se résout pour indiquer le succès ou l'échec de l'opération d'écriture. Notez que la signification de "succès" dépend du collecteur sous-jacent. Il peut indiquer que le fragment a été accepté, mais pas nécessairement qu'il est enregistré de manière sécurisée à sa destination finale.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Propriété locked
Vous pouvez vérifier si un flux en écriture est verrouillé en accédant à sa propriété WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemple de code de flux en écriture
L'exemple de code ci-dessous montre toutes les étapes en action.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Canaliser un flux lisible vers un flux enregistrable
Un flux lisible peut être redirigé vers un flux en écriture via la méthode pipeTo()
du flux lisible.
ReadableStream.pipeTo()
redirige le ReadableStream
actuel vers un WritableStream
donné et renvoie une promesse qui est remplie lorsque le processus de pipeage se termine correctement, ou qui est refusée en cas d'erreur.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Créer un flux de transformation
L'interface TransformStream
de l'API Streams représente un ensemble de données transformables. Vous créez un flux de transformation en appelant son constructeur TransformStream()
, qui crée et renvoie un objet de flux de transformation à partir des gestionnaires donnés. Le constructeur TransformStream()
accepte comme premier argument un objet JavaScript facultatif représentant le transformer
. Ces objets peuvent contenir l'une des méthodes suivantes:
Le transformer
start(controller)
: cette méthode est appelée immédiatement lors de la création de l'objet. En règle générale, cette méthode permet d'ajouter des segments de préfixe à la file d'attente à l'aide decontroller.enqueue()
. Ces blocs seront lus à partir du côté lisible, mais ne dépendent d'aucune écriture sur le côté en écriture. Si ce processus initial est asynchrone, par exemple parce qu'il faut un certain effort pour acquérir les segments de préfixe, la fonction peut renvoyer une promesse pour signaler la réussite ou l'échec. Une promesse refusée entraînera une erreur sur le flux. Toutes les exceptions générées seront à nouveau générées par le constructeurTransformStream()
.transform(chunk, controller)
: cette méthode est appelée lorsqu'un nouveau bloc écrit à l'origine sur le côté en écriture est prêt à être transformé. L'implémentation du flux garantit que cette fonction ne sera appelée qu'après la réussite des transformations précédentes, et jamais avant la fin destart()
ou après l'appel deflush()
. Cette fonction effectue le travail de transformation réel du flux de transformation. Il peut mettre en file d'attente les résultats à l'aide decontroller.enqueue()
. Cela permet à un seul bloc écrit sur le côté en écriture de générer zéro ou plusieurs blocs sur le côté lisible, en fonction du nombre d'appels decontroller.enqueue()
. Si le processus de transformation est asynchrone, cette fonction peut renvoyer une promesse pour signaler le succès ou l'échec de la transformation. Une promesse refusée génère une erreur à la fois sur les côtés lisibles et en écriture du flux de transformation. Si aucune méthodetransform()
n'est fournie, la transformation d'identité est utilisée, ce qui met en file d'attente des blocs inchangés du côté en écriture vers le côté en lecture.flush(controller)
: cette méthode est appelée après que tous les segments écrits sur le côté en écriture ont été transformés en passant partransform()
, et que le côté en écriture est sur le point d'être fermé. En général, cette méthode permet d'ajouter des segments de suffixe à la file d'attente du côté lisible, avant que celui-ci ne soit également fermé. Si le processus de vidage est asynchrone, la fonction peut renvoyer une promesse pour signaler un succès ou un échec. Le résultat sera communiqué à l'appelant destream.writable.write()
. De plus, une promesse refusée génère une erreur à la fois sur les côtés lisibles et enregistrables du flux. Le fait de générer une exception est traité de la même manière que le renvoi d'une promesse refusée.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Stratégies de mise en file d'attente writableStrategy
et readableStrategy
Les deuxième et troisième paramètres facultatifs du constructeur TransformStream()
sont des stratégies de mise en file d'attente writableStrategy
et readableStrategy
facultatives. Elles sont définies comme indiqué dans les sections sur les flux lisibles et accessibles en écriture, respectivement.
Exemple de code de flux de transformation
L'exemple de code suivant montre un flux de transformation simple en action.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Canaliser un flux lisible via un flux de transformation
La méthode pipeThrough()
de l'interface ReadableStream
permet de canaliser le flux actuel via un flux de transformation ou toute autre paire en lecture/écriture. Le pipeage d'un flux le verrouille généralement pendant toute la durée du pipeage, ce qui empêche les autres lecteurs de le verrouiller.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
L'exemple de code suivant (un peu artificiel) montre comment implémenter une version "criante" de fetch()
qui met en majuscule tout le texte en consommant la promesse de réponse renvoyée en tant que flux et en mettant en majuscule chaque segment. L'avantage de cette approche est que vous n'avez pas besoin d'attendre que l'intégralité du document soit téléchargée, ce qui peut faire une énorme différence lorsque vous travaillez avec de gros fichiers.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Démo
La démonstration ci-dessous montre les flux lisibles, enregistrables et de transformation en action. Il inclut également des exemples de chaînes de canalisation pipeThrough()
et pipeTo()
, ainsi qu'une démonstration de tee()
. Vous pouvez éventuellement exécuter la démonstration dans une fenêtre distincte ou afficher le code source.
Flux utiles disponibles dans le navigateur
Le navigateur intègre un certain nombre de flux utiles. Vous pouvez facilement créer un ReadableStream
à partir d'un blob. La méthode stream() de l'interface Blob
renvoie un ReadableStream
qui, lors de la lecture, renvoie les données contenues dans le blob. N'oubliez pas non plus qu'un objet File
est un type spécifique de Blob
et qu'il peut être utilisé dans n'importe quel contexte où un blob peut l'être.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Les variantes de streaming de TextDecoder.decode()
et TextEncoder.encode()
sont appelées respectivement TextDecoderStream
et TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
La compression ou la décompression d'un fichier est facile avec les flux de transformation CompressionStream
et DecompressionStream
, respectivement. L'exemple de code ci-dessous montre comment télécharger la spécification Streams, la compresser (gzip) directement dans le navigateur et écrire le fichier compressé directement sur le disque.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
FileSystemWritableFileStream
de l'API File System Access et les flux de requêtes fetch()
expérimentaux sont des exemples de flux en écriture dans la nature.
L'API Serial utilise beaucoup de flux lisibles et enregistrables.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Enfin, l'API WebSocketStream
intègre les flux à l'API WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Ressources utiles
- Spécification des flux
- Démonstrations associées
- Polyfill Streams
- 2016 : l'année des flux Web
- Itérateurs et générateurs asynchrones
- Visualiseur de flux
Remerciements
Cet article a été relu par Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley et Adam Rice. Les articles de blog de Jake Archibald m'ont beaucoup aidé à comprendre les flux. Certains des exemples de code sont inspirés des explorations de l'utilisateur GitHub @bellbind, et certaines parties du texte s'appuient fortement sur les documentations Web MDN sur les flux. Les auteurs de la norme Streams ont fait un travail remarquable en rédigeant cette spécification. Image principale par Ryan Lara sur Unsplash.