Découvrez comment utiliser les flux lisibles, inscriptibles et de transformation avec l'API Streams.
L'API Streams vous permet d'accéder par programmation aux flux de données reçus sur le réseau ou créés localement par quelque moyen que ce soit, et de les traiter avec JavaScript. Le streaming consiste à décomposer une ressource que vous souhaitez recevoir, envoyer ou transformer en petits blocs, puis à traiter ces blocs bit par bit. Bien que les navigateurs diffusent déjà des éléments tels que le code HTML ou les vidéos à afficher sur les pages Web, cette fonctionnalité n'a jamais été disponible pour JavaScript avant l'introduction des flux fetch
en 2015.
Auparavant, si vous souhaitiez traiter une ressource quelconque (vidéo, fichier texte, etc.), vous deviez télécharger l'intégralité du fichier, attendre qu'il soit désérialisé dans un format approprié, puis le traiter. Avec les flux disponibles pour JavaScript, tout change. Vous pouvez désormais traiter les données brutes avec JavaScript de manière progressive dès qu'elles sont disponibles sur le client, sans avoir à générer de tampon, de chaîne ni de blob. Cela ouvre la voie à de nombreux cas d'utilisation, dont certains sont listés ci-dessous :
- Effets vidéo : en transmettant un flux vidéo lisible via un flux de transformation qui applique des effets en temps réel.
- (Dé)compression des données : canalisation d'un flux de fichiers via un flux de transformation qui le (dé)compresse de manière sélective.
- Décodage d'images : canalisation d'un flux de réponse HTTP via un flux de transformation qui décode les octets en données bitmap, puis via un autre flux de transformation qui traduit les bitmaps en PNG. Si elle est installée dans le gestionnaire
fetch
d'un service worker, cela vous permet de polyfiller de manière transparente de nouveaux formats d'image comme AVIF.
Prise en charge des navigateurs
ReadableStream et WritableStream
TransformStream
Concepts fondamentaux
Avant de vous présenter en détail les différents types de flux, laissez-moi vous expliquer quelques concepts de base.
Morceaux
Un bloc est une donnée unique écrite dans un flux ou lue à partir de celui-ci. Il peut être de n'importe quel type. Les flux peuvent même contenir des blocs de différents types. La plupart du temps, un bloc ne sera pas l'unité de données la plus atomique pour un flux donné. Par exemple, un flux d'octets peut contenir des blocs de 16 Kio Uint8Array
, au lieu d'octets individuels.
Flux lisibles
Un flux lisible représente une source de données que vous pouvez lire. En d'autres termes, les données proviennent d'un flux lisible. Concrètement, un flux lisible est une instance de la classe ReadableStream
.
Flux accessibles en écriture
Un flux accessible en écriture représente une destination pour les données dans laquelle vous pouvez écrire. En d'autres termes, les données sont insérées dans un flux accessible en écriture. Concrètement, un flux accessible en écriture est une instance de la classe WritableStream
.
Transformer des flux
Un flux de transformation se compose d'une paire de flux : un flux accessible en écriture, appelé côté accessible en écriture, et un flux accessible en lecture, appelé côté accessible en lecture.
Une métaphore concrète serait celle d'un interprète simultané qui traduit d'une langue à une autre à la volée.
D'une manière spécifique au flux de transformation, l'écriture sur le côté accessible en écriture entraîne la mise à disposition de nouvelles données pour la lecture à partir du côté accessible en lecture. Concrètement, tout objet comportant une propriété writable
et une propriété readable
peut servir de flux de transformation. Cependant, la classe TransformStream
standard facilite la création d'une telle paire correctement intriquée.
Chaînes de tuyaux
Les flux sont principalement utilisés en les redirigeant les uns vers les autres. Un flux lisible peut être redirigé directement vers un flux inscriptible à l'aide de la méthode pipeTo()
du flux lisible, ou il peut être redirigé d'abord vers un ou plusieurs flux de transformation à l'aide de la méthode pipeThrough()
du flux lisible. Un ensemble de flux regroupés de cette manière est appelé "chaîne de tuyaux".
Contre-pression
Une fois une chaîne de canalisations construite, elle propage des signaux concernant la vitesse à laquelle les blocs doivent y circuler. Si une étape de la chaîne ne peut pas encore accepter de blocs, elle propage un signal en arrière dans la chaîne de canalisations, jusqu'à ce que la source d'origine soit invitée à arrêter de produire des blocs aussi rapidement. Ce processus de flux de normalisation est appelé "rétropression".
Teeing
Un flux lisible peut être dupliqué (d'après la forme d'un "T" majuscule) à l'aide de sa méthode tee()
.
Cela verrouillera le flux, c'est-à-dire qu'il ne sera plus directement utilisable. Toutefois, cela créera deux nouveaux flux, appelés branches, qui pourront être utilisés indépendamment.
La mise en file d'attente est également importante, car les flux ne peuvent pas être rembobinés ni redémarrés. Nous y reviendrons plus tard.
Mécanismes d'un flux lisible
Un flux lisible est une source de données représentée en JavaScript par un objet ReadableStream
qui provient d'une source sous-jacente. Le constructeur ReadableStream()
crée et renvoie un objet de flux lisible à partir des gestionnaires fournis. Il existe deux types de sources sous-jacentes :
- Les sources push vous envoient constamment des données lorsque vous y avez accédé. C'est à vous de démarrer, de mettre en pause ou d'annuler l'accès au flux. Par exemple, les flux vidéo en direct, les événements envoyés par le serveur ou les WebSockets.
- Les sources d'extraction nécessitent que vous demandiez explicitement des données une fois que vous y êtes connecté. Par exemple, les opérations HTTP via des appels
fetch()
ouXMLHttpRequest
.
Les données de flux sont lues de manière séquentielle par petits éléments appelés blocs. Les blocs placés dans un flux sont dits en file d'attente. Cela signifie qu'ils sont en attente dans une file d'attente et prêts à être lus. Une file d'attente interne permet de suivre les blocs qui n'ont pas encore été lus.
Une stratégie de mise en file d'attente est un objet qui détermine comment un flux doit signaler la contre-pression en fonction de l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque bloc et compare la taille totale de tous les blocs de la file d'attente à un nombre spécifié, appelé seuil haut.
Les blocs du flux sont lus par un lecteur. Ce lecteur récupère les données par blocs, ce qui vous permet d'effectuer n'importe quel type d'opération sur celles-ci. Le lecteur et le code de traitement qui l'accompagne sont appelés consommateur.
La prochaine construction dans ce contexte est appelée contrôleur. Chaque flux lisible est associé à un contrôleur qui, comme son nom l'indique, vous permet de contrôler le flux.
Un seul lecteur peut lire un flux à la fois. Lorsqu'un lecteur est créé et commence à lire un flux (c'est-à-dire qu'il devient un lecteur actif), il est verrouillé. Si vous souhaitez qu'un autre lecteur prenne le relais pour lire votre flux, vous devez généralement libérer le premier lecteur avant toute autre action (bien que vous puissiez dupliquer les flux).
Créer un flux lisible
Pour créer un flux lisible, vous devez appeler son constructeur ReadableStream()
.
Le constructeur comporte un argument facultatif underlyingSource
, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux construite.
Le underlyingSource
Il peut utiliser les méthodes facultatives suivantes, définies par le développeur :
start(controller)
: appelé immédiatement lors de la construction de l'objet. La méthode peut accéder à la source du flux et effectuer toute autre opération nécessaire à la configuration de la fonctionnalité de flux. Si ce processus doit être effectué de manière asynchrone, la méthode peut renvoyer une promesse pour signaler la réussite ou l'échec. Le paramètrecontroller
transmis à cette méthode est unReadableStreamDefaultController
.pull(controller)
: peut être utilisé pour contrôler le flux à mesure que d'autres blocs sont récupérés. Elle est appelée à plusieurs reprises tant que la file d'attente interne des blocs du flux n'est pas pleine, jusqu'à ce que la file d'attente atteigne son seuil haut. Si l'appel depull()
renvoie une promesse,pull()
ne sera pas rappelé tant que cette promesse ne sera pas tenue. Si la promesse est rejetée, le flux génère une erreur.cancel(reason)
: appelé lorsque le consommateur de flux annule le flux.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
accepte les méthodes suivantes :
ReadableStreamDefaultController.close()
ferme le flux associé.ReadableStreamDefaultController.enqueue()
met en file d'attente un bloc donné dans le flux associé.ReadableStreamDefaultController.error()
entraîne une erreur pour toute interaction future avec le flux associé.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
Le queuingStrategy
Le deuxième argument, également facultatif, du constructeur ReadableStream()
est queuingStrategy
.
Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend deux paramètres :
highWaterMark
: nombre non négatif indiquant le seuil haut du flux utilisant cette stratégie de mise en file d'attente.size(chunk)
: fonction qui calcule et renvoie la taille finie et non négative de la valeur de bloc donnée. Le résultat est utilisé pour déterminer la contre-pression, qui se manifeste par la propriétéReadableStreamDefaultController.desiredSize
appropriée. Il régit également le moment où la méthodepull()
de la source sous-jacente est appelée.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Les méthodes getReader()
et read()
Pour lire un flux lisible, vous avez besoin d'un lecteur, qui sera un ReadableStreamDefaultReader
.
La méthode getReader()
de l'interface ReadableStream
crée un lecteur et verrouille le flux sur celui-ci. Tant que le flux est verrouillé, aucun autre lecteur ne peut être acquis tant que celui-ci n'est pas libéré.
La méthode read()
de l'interface ReadableStreamDefaultReader
renvoie une promesse donnant accès au prochain bloc de la file d'attente interne du flux. Il est exécuté ou rejeté avec un résultat en fonction de l'état du flux. Voici les différentes possibilités :
- Si un bloc est disponible, la promesse sera tenue avec un objet de la forme
{ value: chunk, done: false }
. - Si le flux est fermé, la promesse sera tenue avec un objet de la forme
{ value: undefined, done: true }
. - Si le flux génère une erreur, la promesse sera rejetée avec l'erreur correspondante.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Propriété locked
Vous pouvez vérifier si un flux lisible est verrouillé en accédant à sa propriété ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemples de code de flux lisibles
L'exemple de code ci-dessous montre toutes les étapes en action. Vous créez d'abord un ReadableStream
qui, dans son argument underlyingSource
(c'est-à-dire la classe TimestampSource
), définit une méthode start()
.
Cette méthode indique au controller
du flux de enqueue()
un code temporel toutes les secondes pendant dix secondes.
Enfin, il indique au contrôleur de close()
le flux. Vous consommez ce flux en créant un lecteur via la méthode getReader()
et en appelant read()
jusqu'à ce que le flux soit done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Itération asynchrone
Vérifier à chaque itération de boucle read()
si le flux est done
n'est peut-être pas l'API la plus pratique.
Heureusement, il existera bientôt une meilleure façon de procéder : l'itération asynchrone.
for await (const chunk of stream) {
console.log(chunk);
}
Pour utiliser l'itération asynchrone aujourd'hui, vous pouvez implémenter le comportement avec un polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Redirection d'un flux lisible
La méthode tee()
de l'interface ReadableStream
met en file d'attente le flux lisible actuel, renvoyant un tableau à deux éléments contenant les deux branches résultantes en tant que nouvelles instances ReadableStream
. Cela permet à deux lecteurs de lire un flux simultanément. Par exemple, vous pouvez le faire dans un service worker si vous souhaitez récupérer une réponse du serveur et la diffuser en continu vers le navigateur, mais aussi vers le cache du service worker. Étant donné qu'un corps de réponse ne peut pas être consommé plusieurs fois, vous avez besoin de deux copies pour ce faire. Pour annuler le flux, vous devez ensuite annuler les deux branches résultantes. En général, le fait de mettre un flux en file d'attente le verrouille pour toute sa durée, ce qui empêche les autres lecteurs de le verrouiller.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Flux d'octets lisibles
Pour les flux représentant des octets, une version étendue du flux lisible est fournie pour gérer efficacement les octets, en particulier en minimisant les copies. Les flux d'octets permettent d'acquérir des lecteurs BYOB (Bring Your Own Buffer). L'implémentation par défaut peut générer différentes sorties, telles que des chaînes ou des tampons de tableau dans le cas des WebSockets, tandis que les flux d'octets garantissent une sortie d'octets. De plus, les lecteurs BYOB offrent des avantages en termes de stabilité. En effet, si un tampon se détache, il peut garantir qu'il n'y a pas d'écriture dans le même tampon deux fois, ce qui évite les conditions de concurrence. Les lecteurs BYOB peuvent réduire le nombre de fois où le navigateur doit exécuter la récupération de mémoire, car ils peuvent réutiliser les tampons.
Créer un flux d'octets lisible
Vous pouvez créer un flux d'octets lisible en transmettant un paramètre type
supplémentaire au constructeur ReadableStream()
.
new ReadableStream({ type: 'bytes' });
Le underlyingSource
La source sous-jacente d'un flux d'octets lisible reçoit un ReadableByteStreamController
à manipuler. Sa méthode ReadableByteStreamController.enqueue()
accepte un argument chunk
dont la valeur est un ArrayBufferView
. La propriété ReadableByteStreamController.byobRequest
renvoie la demande d'extraction BYOB actuelle ou la valeur null si aucune n'est disponible. Enfin, la propriété ReadableByteStreamController.desiredSize
renvoie la taille souhaitée pour remplir la file d'attente interne du flux contrôlé.
Le queuingStrategy
Le deuxième argument, également facultatif, du constructeur ReadableStream()
est queuingStrategy
.
Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend un paramètre :
highWaterMark
: nombre non négatif d'octets indiquant le seuil maximal du flux utilisant cette stratégie de mise en file d'attente. Cela permet de déterminer la contre-pression, qui se manifeste par le biais de la propriétéReadableByteStreamController.desiredSize
appropriée. Il régit également le moment où la méthodepull()
de la source sous-jacente est appelée.
Les méthodes getReader()
et read()
Vous pouvez ensuite accéder à un ReadableStreamBYOBReader
en définissant le paramètre mode
en conséquence :
ReadableStream.getReader({ mode: "byob" })
. Cela permet de contrôler plus précisément l'allocation des tampons afin d'éviter les copies. Pour lire à partir du flux d'octets, vous devez appeler ReadableStreamBYOBReader.read(view)
, où view
est un ArrayBufferView
.
Exemple de code de flux d'octets lisible
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
La fonction suivante renvoie des flux d'octets lisibles qui permettent une lecture efficace sans copie d'un tableau généré de manière aléatoire. Au lieu d'utiliser une taille de bloc prédéterminée de 1 024, il tente de remplir le tampon fourni par le développeur, ce qui permet un contrôle total.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Mécanismes d'un flux accessible en écriture
Un flux inscriptible est une destination dans laquelle vous pouvez écrire des données, représentée en JavaScript par un objet WritableStream
. Il sert d'abstraction au-dessus d'un récepteur sous-jacent, qui est un récepteur d'E/S de niveau inférieur dans lequel les données brutes sont écrites.
Les données sont écrites dans le flux via un writer, un bloc à la fois. Un bloc peut prendre de nombreuses formes, tout comme les blocs d'un lecteur. Vous pouvez utiliser le code de votre choix pour générer les blocs prêts à être écrits. L'écrivain et le code associé sont appelés producteur.
Lorsqu'un writer est créé et commence à écrire dans un flux (un writer actif), il est dit verrouillé. Un seul rédacteur peut écrire dans un flux accessible en écriture à la fois. Si vous souhaitez qu'un autre rédacteur commence à écrire dans votre flux, vous devez généralement le libérer avant d'y associer un autre rédacteur.
Une file d'attente interne permet de suivre les blocs qui ont été écrits dans le flux, mais qui n'ont pas encore été traités par le récepteur sous-jacent.
Une stratégie de mise en file d'attente est un objet qui détermine comment un flux doit signaler la contre-pression en fonction de l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque bloc et compare la taille totale de tous les blocs de la file d'attente à un nombre spécifié, appelé seuil haut.
La construction finale est appelée contrôleur. Chaque flux inscriptible est associé à un contrôleur qui vous permet de contrôler le flux (par exemple, pour l'annuler).
Créer un flux accessible en écriture
L'interface WritableStream
de l'API Streams fournit une abstraction standard pour écrire des données de flux vers une destination, appelée récepteur. Cet objet est fourni avec une gestion de la contre-pression et une mise en file d'attente intégrées. Pour créer un flux accessible en écriture, vous devez appeler son constructeur WritableStream()
.
Il comporte un paramètre underlyingSink
facultatif, qui représente un objet avec des méthodes et des propriétés qui définissent le comportement de l'instance de flux construite.
Le underlyingSink
underlyingSink
peut inclure les méthodes facultatives définies par le développeur suivantes. Le paramètre controller
transmis à certaines méthodes est un WritableStreamDefaultController
.
start(controller)
: cette méthode est appelée immédiatement lors de la construction de l'objet. Le contenu de cette méthode doit viser à accéder au récepteur sous-jacent. Si ce processus doit être effectué de manière asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec.write(chunk, controller)
: cette méthode est appelée lorsqu'un nouveau bloc de données (spécifié dans le paramètrechunk
) est prêt à être écrit dans le récepteur sous-jacent. Il peut renvoyer une promesse pour signaler la réussite ou l'échec de l'opération d'écriture. Cette méthode ne sera appelée qu'une fois les écritures précédentes effectuées, et jamais après la fermeture ou l'abandon du flux.close(controller)
: cette méthode est appelée si l'application signale qu'elle a terminé d'écrire des blocs dans le flux. Le contenu doit faire tout ce qui est nécessaire pour finaliser les écritures dans le récepteur sous-jacent et libérer l'accès à celui-ci. Si ce processus est asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec. Cette méthode ne sera appelée qu'une fois que toutes les écritures mises en file d'attente auront réussi.abort(reason)
: cette méthode est appelée si l'application indique qu'elle souhaite fermer brusquement le flux et le mettre dans un état d'erreur. Il peut nettoyer toutes les ressources retenues, commeclose()
, maisabort()
sera appelé même si les écritures sont mises en file d'attente. Ces blocs seront supprimés. Si ce processus est asynchrone, il peut renvoyer une promesse pour signaler la réussite ou l'échec. Le paramètrereason
contient unDOMString
décrivant pourquoi le flux a été abandonné.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
L'interface
WritableStreamDefaultController
de l'API Streams représente un contrôleur permettant de contrôler l'état d'un WritableStream
lors de la configuration, à mesure que des blocs sont envoyés pour l'écriture ou à la fin de l'écriture. Lors de la construction d'un WritableStream
, le récepteur sous-jacent reçoit une instance WritableStreamDefaultController
correspondante à manipuler. WritableStreamDefaultController
ne comporte qu'une seule méthode : WritableStreamDefaultController.error()
, qui provoque une erreur pour toute interaction future avec le flux associé.
WritableStreamDefaultController
accepte également une propriété signal
qui renvoie une instance de AbortSignal
, ce qui permet d'arrêter une opération WritableStream
si nécessaire.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
Le queuingStrategy
Le deuxième argument, également facultatif, du constructeur WritableStream()
est queuingStrategy
.
Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, qui prend deux paramètres :
highWaterMark
: nombre non négatif indiquant le seuil haut du flux utilisant cette stratégie de mise en file d'attente.size(chunk)
: fonction qui calcule et renvoie la taille finie non négative de la valeur de bloc donnée. Le résultat est utilisé pour déterminer la contre-pression, qui se manifeste par la propriétéWritableStreamDefaultWriter.desiredSize
appropriée.
Les méthodes getWriter()
et write()
Pour écrire dans un flux accessible en écriture, vous avez besoin d'un writer, qui sera un WritableStreamDefaultWriter
. La méthode getWriter()
de l'interface WritableStream
renvoie une nouvelle instance de WritableStreamDefaultWriter
et verrouille le flux sur cette instance. Tant que le flux est verrouillé, aucun autre rédacteur ne peut être acquis tant que celui en cours n'est pas libéré.
La méthode write()
de l'interface WritableStreamDefaultWriter
écrit un bloc de données transmis dans un WritableStream
et son récepteur sous-jacent, puis renvoie une promesse qui se résout pour indiquer la réussite ou l'échec de l'opération d'écriture. Notez que la signification du terme "réussite" dépend du récepteur sous-jacent. Il peut indiquer que le bloc a été accepté, et pas nécessairement qu'il a été enregistré de manière sécurisée dans sa destination finale.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Propriété locked
Vous pouvez vérifier si un flux accessible en écriture est verrouillé en accédant à sa propriété WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemple de code de flux inscriptible
L'exemple de code ci-dessous montre toutes les étapes en action.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Transférer un flux lisible vers un flux inscriptible
Un flux lisible peut être redirigé vers un flux accessible en écriture via la méthode pipeTo()
du flux lisible.
ReadableStream.pipeTo()
transmet le ReadableStream
actuel à un WritableStream
donné et renvoie une promesse qui est tenue lorsque le processus de transmission se termine correctement, ou qui est refusée si des erreurs ont été rencontrées.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Créer un flux de transformation
L'interface TransformStream
de l'API Streams représente un ensemble de données transformables. Vous créez un flux de transformation en appelant son constructeur TransformStream()
, qui crée et renvoie un objet de flux de transformation à partir des gestionnaires fournis. Le constructeur TransformStream()
accepte comme premier argument un objet JavaScript facultatif représentant le transformer
. Ces objets peuvent contenir l'une des méthodes suivantes :
Le transformer
start(controller)
: cette méthode est appelée immédiatement lors de la construction de l'objet. Il est généralement utilisé pour mettre en file d'attente des blocs de préfixe à l'aide decontroller.enqueue()
. Ces blocs seront lus du côté lisible, mais ne dépendent d'aucune écriture du côté accessible en écriture. Si ce processus initial est asynchrone, par exemple parce qu'il faut un certain effort pour acquérir les blocs de préfixes, la fonction peut renvoyer une promesse pour signaler le succès ou l'échec. Une promesse refusée entraînera une erreur dans le flux. Toutes les exceptions générées seront régénérées par le constructeurTransformStream()
.transform(chunk, controller)
: cette méthode est appelée lorsqu'un nouveau bloc initialement écrit sur le côté accessible en écriture est prêt à être transformé. L'implémentation du flux garantit que cette fonction ne sera appelée qu'une fois les transformations précédentes réussies, et jamais avant la fin destart()
ni après l'appel deflush()
. Cette fonction effectue le travail de transformation réel du flux de transformation. Il peut mettre en file d'attente les résultats à l'aide decontroller.enqueue()
. Cela permet à un seul bloc écrit sur le côté accessible en écriture de générer zéro ou plusieurs blocs sur le côté accessible en lecture, selon le nombre de fois oùcontroller.enqueue()
est appelé. Si le processus de transformation est asynchrone, cette fonction peut renvoyer une promesse pour signaler la réussite ou l'échec de la transformation. Une promesse refusée générera une erreur sur les côtés lisibles et accessibles en écriture du flux de transformation. Si aucune méthodetransform()
n'est fournie, la transformation d'identité est utilisée, ce qui met en file d'attente les blocs inchangés du côté accessible en écriture vers le côté accessible en lecture.flush(controller)
: cette méthode est appelée une fois que tous les blocs écrits sur le côté accessible en écriture ont été transformés en passant avec succès partransform()
et que le côté accessible en écriture est sur le point d'être fermé. Cette méthode est généralement utilisée pour mettre en file d'attente des blocs de suffixe du côté lisible, avant que celui-ci ne soit également fermé. Si le processus de vidage est asynchrone, la fonction peut renvoyer une promesse pour signaler la réussite ou l'échec. Le résultat sera communiqué à l'appelant destream.writable.write()
. De plus, une promesse refusée générera une erreur pour les côtés lisibles et accessibles en écriture du flux. Le déclenchement d'une exception est traité de la même manière que le renvoi d'une promesse refusée.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Stratégies de mise en file d'attente writableStrategy
et readableStrategy
Les deuxième et troisième paramètres facultatifs du constructeur TransformStream()
sont les stratégies de mise en file d'attente facultatives writableStrategy
et readableStrategy
. Elles sont définies comme indiqué dans les sections sur les flux lisibles et inscriptibles, respectivement.
Exemple de code de flux de transformation
L'exemple de code suivant montre un simple flux de transformation en action.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Transférer un flux lisible via un flux de transformation
La méthode pipeThrough()
de l'interface ReadableStream
permet de canaliser le flux actuel de manière chaînable via un flux de transformation ou toute autre paire lisible/inscriptible. Le transfert d'un flux le verrouille généralement pendant toute la durée du transfert, ce qui empêche d'autres lecteurs de le verrouiller.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
L'exemple de code suivant (un peu artificiel) montre comment implémenter une version "criée" de fetch()
qui met en majuscules tout le texte en consommant la promesse de réponse renvoyée en tant que flux et en mettant en majuscules bloc par bloc. L'avantage de cette approche est que vous n'avez pas besoin d'attendre le téléchargement complet du document, ce qui peut faire une énorme différence lorsque vous traitez des fichiers volumineux.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Démo
La démonstration ci-dessous montre les flux lisibles, inscriptibles et de transformation en action. Il inclut également des exemples de chaînes de pipes pipeThrough()
et pipeTo()
, et illustre tee()
. Vous pouvez éventuellement exécuter la démonstration dans sa propre fenêtre ou afficher le code source.
Flux utiles disponibles dans le navigateur
Un certain nombre de flux utiles sont intégrés au navigateur. Vous pouvez facilement créer un ReadableStream
à partir d'un blob. La méthode stream() de l'interface Blob
renvoie un ReadableStream
qui, une fois lu, renvoie les données contenues dans le blob. Rappelez-vous également qu'un objet File
est un type spécifique de Blob
et peut être utilisé dans n'importe quel contexte où un blob peut l'être.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Les variantes de flux de TextDecoder.decode()
et TextEncoder.encode()
sont appelées respectivement TextDecoderStream
et TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
La compression ou la décompression d'un fichier est facile avec les flux de transformation CompressionStream
et DecompressionStream
, respectivement. L'exemple de code ci-dessous montre comment télécharger la spécification Streams, la compresser (gzip) directement dans le navigateur et écrire le fichier compressé directement sur le disque.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Les FileSystemWritableFileStream
de l'API File System Access et les flux de requêtes fetch()
expérimentaux sont des exemples de flux inscriptibles.
L'API Serial utilise intensivement les flux lisibles et inscriptibles.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Enfin, l'API WebSocketStream
intègre les flux à l'API WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Ressources utiles
- Spécification des flux
- Démonstrations associées
- Polyfill pour les flux
- 2016 : l'année des flux Web
- Itérateurs et générateurs asynchrones
- Visualiseur de flux
Remerciements
Cet article a été examiné par Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley et Adam Rice. Les articles de blog de Jake Archibald m'ont beaucoup aidé à comprendre les flux. Certains exemples de code s'inspirent des explorations de l'utilisateur GitHub @bellbind, et certaines parties du texte s'appuient fortement sur la documentation MDN Web Docs sur les flux. Les auteurs de la spécification Streams Standard ont fait un travail remarquable. Image de couverture par Ryan Lara sur Unsplash.