Découvrez comment utiliser des flux lisibles et accessibles en écriture, et comment transformer des flux avec l'API Streams.
L'API Streams vous permet d'accéder de manière programmatique aux flux de données reçus sur le réseau
ou créées par quelque moyen que ce soit, localement et
et les traiter avec JavaScript. Le traitement par flux implique de décomposer une ressource que vous souhaitez recevoir, envoyer ou transformer
puis les traiter petit à petit. Lorsque le streaming est quelque chose
le navigateur reçoit de toute façon des éléments tels que du code HTML ou des vidéos à afficher sur les pages Web.
n'était jamais disponible pour JavaScript avant l'introduction de fetch
avec les flux en 2015.
Auparavant, si vous vouliez traiter une ressource quelconque (qu'il s'agisse d'une vidéo, d'un fichier texte, etc.), vous devez télécharger le fichier entier, attendre qu'il soit désérialisé dans un format approprié, puis les traiter. Les flux étant accessibles en JavaScript, tout change. Vous pouvez désormais traiter les données brutes avec JavaScript progressivement dès qu'il est disponible sur le client, sans avoir à générer de tampon, de chaîne ou de blob. Cela débloque un certain nombre de cas d'utilisation, dont certains sont listés ci-dessous:
- Effets vidéo:canalisation d'un flux vidéo lisible par un flux de transformation qui applique des effets en temps réel.
- (dé)compression de données:acheminement d'un flux de fichiers via un flux de transformation qui (dé)compresse le fichier.
- Décodage d'image:canalisation d'un flux de réponse HTTP via un flux de transformation qui décode des octets
en données bitmap, puis dans un autre flux de transformation qui convertit les bitmaps en PNG. Si
installé dans le gestionnaire
fetch
d'un service worker, ce qui vous permet d'émuler un polyfill de manière transparente de nouveaux formats d'image comme AVIF.
Prise en charge des navigateurs
ReadableStream et WritableStream
Navigateurs pris en charge
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
TransformStream
Navigateurs pris en charge
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
- <ph type="x-smartling-placeholder">
Concepts fondamentaux
Avant d'entrer dans les détails des différents types de flux, voyons quelques concepts fondamentaux.
Fragments
Un fragment est une donnée unique écrite ou lue dans un flux. Il peut s'agir de n'importe quel
Type les flux peuvent même contenir des
blocs de différents types. La plupart du temps, un fragment ne sera pas le plus atomique
pour un flux donné. Par exemple, un flux d'octets peut contenir des fragments constitués de 16
Unités Kio Uint8Array
, au lieu d'octets uniques.
Flux lisibles
Un flux lisible représente une source de données à partir de laquelle vous pouvez lire des données. En d'autres termes, les données sont
à partir d'un flux lisible. Concrètement, un flux lisible est une instance de ReadableStream
.
.
Flux accessibles en écriture
Un flux accessible en écriture représente une destination de données dans laquelle vous pouvez écrire. En d’autres termes, les données
entre dans un flux accessible en écriture. Concrètement, un flux accessible en écriture est une instance
WritableStream
.
Transformer des flux
Un flux de transformation se compose d'une paire de flux: un flux accessible en écriture (appelé son côté accessible en écriture).
et un flux lisible, appelé "côté lisible".
Une métaphore du monde réel
pour cela serait un
interpréteur simultané
qui traduit d'une langue à une autre à la volée.
D'une manière spécifique au flux de transformation, écrire
vers le côté accessible en écriture, les nouvelles données sont accessibles en lecture
le plus lisible possible. Concrètement, tout objet avec une propriété writable
et une propriété readable
peut être diffusé
en tant que flux de transformation. Cependant, la classe TransformStream
standard facilite la création
une telle paire qui est correctement emmêlée.
Chaînes de tuyaux
Les flux sont principalement utilisés par transmission entre eux. Un flux lisible peut être dirigé directement
vers un flux accessible en écriture, à l'aide de la méthode pipeTo()
du flux lisible, ou vers un flux accessible en écriture.
ou plusieurs flux de transformation en premier, à l'aide de la méthode pipeThrough()
du flux lisible. Un ensemble de
embarqués de cette manière est appelé "chaîne pipe".
Contre-pression
Une fois qu'une chaîne de pipe est construite, elle propage des signaux concernant la vitesse à laquelle les fragments doivent circuler à travers ce réseau. Si une étape de la chaîne ne peut pas encore accepter les fragments, elle propage un signal en arrière. dans la chaîne du pipeline, jusqu'à ce que finalement la source d'origine cesse de produire des fragments rapidement. Ce processus de normalisation du flux est appelé "contre-pression".
Tee
Un flux lisible peut être nommé d'après la forme d'un "T" majuscule à l'aide de sa méthode tee()
.
Le flux est alors verrouillé, c'est-à-dire qu'il ne peut plus être utilisé directement. Cependant, cela créera deux nouvelles
flux, appelés branches, qui peuvent être utilisés indépendamment.
Le Teeing est également important car les flux ne peuvent pas être retournés ou redémarrés. Nous reviendrons sur ce point plus tard.
Fonctionnement d'un flux lisible
Un flux lisible est une source de données représentée en JavaScript par un
Un objet ReadableStream
qui
provenant d'une source sous-jacente. La
ReadableStream()
crée et renvoie un objet de flux lisible à partir des gestionnaires donnés. Il y a deux
types de sources sous-jacentes:
- Les sources push vous transmettent constamment les données lorsque vous y accédez, et c'est à vous de démarrer, suspendre ou annuler l'accès au flux. Il peut s'agir de flux vidéo en direct, d'événements envoyés par le serveur, ou WebSockets.
- Avec les sources pull, vous devez leur demander explicitement des données une fois la connexion établie. Exemples
incluent des opérations HTTP via des appels
fetch()
ouXMLHttpRequest
.
Les données de flux sont lues de manière séquentielle, sous la forme de petits morceaux appelés morceaux. Les fragments placés dans un flux sont considérés comme mis en file d'attente. Cela signifie qu'ils attendent dans une file d'attente prêts à être lus. Une file d'attente interne assure le suivi des fragments qui n'ont pas encore été lus.
Une stratégie de mise en file d'attente est un objet qui détermine la manière dont un flux doit signaler une contre-pression en fonction l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque fragment et compare les la taille totale de tous les fragments de la file d'attente jusqu'à un nombre spécifié, appelé marge haute.
Les fragments du flux sont lus par un lecteur. Ce lecteur récupère les données fragment par fragment ce qui vous permet d'effectuer le type d'opération que vous voulez faire. Le lecteur et l'autre qui accompagne ce code s'appelle un consommateur.
Dans ce contexte, la construction suivante est appelée contrôleur. Chaque flux lisible est associé à qui, comme son nom l'indique, vous permet de contrôler le flux.
Un seul lecteur peut lire un flux à la fois. Lorsqu'un lecteur est créé et commence à lire un flux (autrement dit, devient un lecteur actif), il est verrouillé sur celui-ci. Si vous voulez qu'un autre lecteur prenne le relais lecture de votre flux, vous devez généralement libérer le premier lecteur avant de faire quoi que ce soit d'autre (même si vous pouvez tirer des flux).
Créer un flux lisible
Pour créer un flux lisible, appelez son constructeur.
ReadableStream()
Le constructeur comporte un argument facultatif underlyingSource
, qui représente un objet
avec des méthodes et des propriétés qui définissent
le comportement de l'instance de flux construite.
underlyingSource
Vous pouvez utiliser les méthodes facultatives suivantes définies par le développeur:
start(controller)
: appelé immédiatement lors de la construction de l'objet. La peut accéder à la source du flux et faire toute autre action nécessaires pour configurer le fonctionnement du flux. Si ce processus doit être effectué de manière asynchrone, la méthode peut renvoyer une promesse pour signaler la réussite ou l'échec. Le paramètrecontroller
transmis à cette méthode est unReadableStreamDefaultController
pull(controller)
: permet de contrôler le flux lorsque davantage de fragments sont récupérés. Il est appelé à plusieurs reprises tant que la file d'attente interne de fragments du flux n'est pas pleine, jusqu'à ce que la file d'attente atteint son point culminant. Si le résultat de l'appel depull()
est une promesse,pull()
ne sera pas rappelé tant que cette promesse n'est pas remplie. Si la promesse est refusée, le flux sera erroné.cancel(reason)
: appelé lorsque le client du flux annule le flux.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
accepte les méthodes suivantes:
ReadableStreamDefaultController.close()
ferme le flux associé.ReadableStreamDefaultController.enqueue()
met en file d'attente un fragment donné dans le flux associé.ReadableStreamDefaultController.error()
entraîne l'erreur de toute interaction ultérieure avec le flux associé.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Le deuxième argument, également facultatif, du constructeur ReadableStream()
est queuingStrategy
.
Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, ce qui nécessite deux
paramètres:
highWaterMark
: nombre non négatif indiquant la marque des eaux haute du cours d'eau utilisant cette stratégie de mise en file d'attente.size(chunk)
: fonction qui calcule et renvoie la taille non négative finie de la valeur de fragment donnée. Le résultat permet de déterminer la contre-pression, qui se manifeste via la propriétéReadableStreamDefaultController.desiredSize
appropriée. Elle régit également l'appel de la méthodepull()
de la source sous-jacente.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Les méthodes getReader()
et read()
Pour lire à partir d'un flux lisible, vous avez besoin d'un lecteur, qui sera
ReadableStreamDefaultReader
La méthode getReader()
de l'interface ReadableStream
crée un lecteur et verrouille le flux
Lorsque le flux est verrouillé, aucun autre lecteur ne peut être acquis tant que celui-ci n'est pas libéré.
read()
de l'interface ReadableStreamDefaultReader
renvoie une promesse fournissant l'accès au prochain
dans la file d'attente interne du flux. Il traite ou rejette avec
un résultat en fonction de l’état de
le flux. Les différentes possibilités sont les suivantes:
- Si un fragment est disponible, la promesse est tenue avec un objet au format
.{ value: chunk, done: false }
- Si le flux est fermé, la promesse est tenue avec un objet au format
.{ value: undefined, done: true }
- Si le flux comporte une erreur, la promesse est refusée avec l'erreur correspondante.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Propriété locked
Vous pouvez vérifier si un flux lisible est verrouillé en accédant à son
ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemples de code de flux lisibles
L'exemple de code ci-dessous illustre toutes les étapes en action. Vous créez d'abord un ReadableStream
qui, dans son
L'argument underlyingSource
(c'est-à-dire la classe TimestampSource
) définit une méthode start()
.
Cette méthode indique au controller
du flux de
enqueue()
est un code temporel toutes les secondes pendant 10 secondes.
Enfin, il indique au contrôleur d'effectuer une opération close()
sur le flux. Vous consommez ceci
flux en créant un lecteur via la méthode getReader()
et en appelant read()
jusqu'à ce que le flux soit
done
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Itération asynchrone
Vérifier à chaque itération de la boucle read()
si le flux est done
n'est peut-être pas l'API la plus pratique.
Heureusement, il y aura bientôt une meilleure façon de le faire: l'itération asynchrone.
for await (const chunk of stream) {
console.log(chunk);
}
Une solution de contournement pour utiliser l'itération asynchrone aujourd'hui consiste à implémenter le comportement avec un polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Intégrer un flux lisible
La méthode tee()
du
L'interface ReadableStream
renvoie le flux lisible actuel et renvoie un tableau à deux éléments.
contenant les deux branches obtenues en tant que nouvelles instances ReadableStream
. Cela permet
deux lecteurs de lire
un flux simultanément. Vous pouvez effectuer cette opération, par exemple, dans un service worker si
vous voulez récupérer une réponse du serveur et la diffuser dans le navigateur, mais aussi dans le
dans le cache du nœud de calcul de service. Étant donné qu'un corps de réponse ne peut pas être consommé plusieurs fois, vous avez besoin de deux copies
pour ce faire. Pour annuler le flux, vous devez ensuite annuler les deux branches générées. Lancer un flux
le verrouillera généralement pendant cette durée, ce qui empêchera les autres lecteurs de le verrouiller.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Flux d'octets lisibles
Pour les flux représentant des octets, une version étendue du flux lisible est fournie pour gérer de manière efficace, en particulier en minimisant le nombre de copies. Les flux d'octets permettent l'utilisation de la mémoire tampon (BYOB) lecteurs à acquérir. L'implémentation par défaut peut générer différentes sorties, telles que en tant que chaînes ou tampons de tableau dans le cas de WebSockets, tandis que les flux d'octets garantissent une sortie en octets. De plus, les lecteurs BYOB offrent des avantages en termes de stabilité. C'est car si un tampon se détache, cela peut garantir qu'on n'écrira pas deux fois dans le même tampon, et d'éviter ainsi les conditions de concurrence. Les lecteurs BYOB permettent de réduire le nombre d'exécutions du navigateur la récupération de mémoire, car elle peut réutiliser les tampons.
Créer un flux d'octets lisible
Vous pouvez créer un flux d'octets lisible en transmettant un paramètre type
supplémentaire à la
ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
La source sous-jacente d'un flux d'octets lisible reçoit un ReadableByteStreamController
pour
manipuler. Sa méthode ReadableByteStreamController.enqueue()
accepte un argument chunk
dont la valeur
est un ArrayBufferView
. La propriété ReadableByteStreamController.byobRequest
renvoie la valeur
Requête d'extraction BYOB, ou valeur "null" en l'absence de requête Enfin, le ReadableByteStreamController.desiredSize
renvoie la taille souhaitée pour remplir la file d'attente interne du flux contrôlé.
queuingStrategy
Le deuxième argument, également facultatif, du constructeur ReadableStream()
est queuingStrategy
.
Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, ce qui nécessite
:
highWaterMark
: nombre non négatif d'octets indiquant la marque des hautes eaux du flux utilisant cette stratégie de mise en file d'attente. Cela permet de déterminer la contre-pression, qui se manifeste via la propriétéReadableByteStreamController.desiredSize
appropriée. Elle régit également l'appel de la méthodepull()
de la source sous-jacente.
Les méthodes getReader()
et read()
Vous pouvez ensuite accéder à un ReadableStreamBYOBReader
en définissant le paramètre mode
comme suit:
ReadableStream.getReader({ mode: "byob" })
Cela permet un contrôle plus précis de la mémoire tampon
afin d'éviter les copies. Pour lire à partir du flux d'octets, vous devez appeler
ReadableStreamBYOBReader.read(view)
, où view
est un
ArrayBufferView
Exemple de code de flux d'octets lisible
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
La fonction suivante renvoie des flux d'octets lisibles qui permettent de lire efficacement les données sans copie généré de manière aléatoire. Au lieu d'utiliser une taille de fragment prédéterminée de 1 024, il tente de remplir du tampon fourni par le développeur, ce qui permet un contrôle total.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Fonctionnement d'un flux accessible en écriture
Un flux accessible en écriture est une destination dans laquelle vous pouvez écrire des données.
objet WritableStream
. Ce
sert d'abstraction au-dessus d'un récepteur sous-jacent, un récepteur d'E/S de niveau inférieur dans lequel
les données brutes sont écrites.
Les données sont écrites dans le flux par le biais d'un rédacteur, fragment par morceau. Un fragment peut prendre sous une multitude de formes, tout comme les fragments d'un lecteur. Vous pouvez utiliser le code de votre choix les fragments prêts à être écrits ; l'auteur et le code associé sont appelés producteurs.
Lorsqu'un auteur est créé et commence à écrire dans un flux (un auteur actif), on dit qu'il est verrouillé dessus. Un seul rédacteur peut écrire dans un flux accessible en écriture à la fois. Si vous voulez une autre pour écrire dans votre flux, vous devez généralement le publier avant de joindre un autre rédacteur.
Une file d'attente interne assure le suivi des fragments qui ont été écrits dans le flux, mais pas encore. traités par le récepteur sous-jacent.
Une stratégie de mise en file d'attente est un objet qui détermine la manière dont un flux doit signaler une contre-pression en fonction l'état de sa file d'attente interne. La stratégie de mise en file d'attente attribue une taille à chaque fragment et compare les la taille totale de tous les fragments de la file d'attente jusqu'à un nombre spécifié, appelé marge haute.
La construction finale est appelée contrôleur. Chaque flux accessible en écriture est associé à un contrôleur vous permet de contrôler le flux (par exemple, de l'annuler).
Créer un flux accessible en écriture
L'interface WritableStream
de
L'API Streams fournit une abstraction standard pour écrire des flux de données dans une destination, connue
en tant que récepteur. Cet objet intègre une contre-pression et une mise en file d'attente. Pour créer un flux accessible en écriture
appeler son constructeur.
WritableStream()
Il comporte un paramètre underlyingSink
facultatif, qui représente un objet
avec des méthodes et des propriétés qui définissent
le comportement de l'instance de flux construite.
underlyingSink
underlyingSink
peut inclure les méthodes facultatives suivantes définies par le développeur. controller
transmis à certaines méthodes est
WritableStreamDefaultController
start(controller)
: cette méthode est appelée immédiatement lors de la construction de l'objet. La le contenu de cette méthode doit avoir pour objectif d'accéder au récepteur sous-jacent. Si ce processus doit être effectuée de manière asynchrone, elle peut renvoyer une promesse pour signaler la réussite ou l'échec.write(chunk, controller)
: cette méthode est appelée lorsqu'un nouveau fragment de données (spécifié dans lechunk
) est prêt à être écrit dans le récepteur sous-jacent. Elle peut renvoyer une promesse signalent la réussite ou l'échec de l'opération d'écriture. Cette méthode ne sera appelée qu'après la les écritures ont réussi, et jamais après la fermeture ou l'annulation du flux.close(controller)
: cette méthode est appelée si l'application indique qu'elle a fini d'écrire des fragments au flux. Le contenu doit faire tout ce qui est nécessaire pour finaliser les écritures sur le récepteur sous-jacent et en libérant l'accès. Si ce processus est asynchrone, il peut renvoyer une de signaler la réussite ou l'échec. Cette méthode ne sera appelée qu'une fois toutes les écritures en file d'attente ont réussi.abort(reason)
: cette méthode est appelée si l'application signale qu'elle souhaite se fermer brusquement. le flux et le mettre dans un état erroné. Il peut nettoyer toutes les ressources conservées,close()
, maisabort()
sera appelé même si les écritures sont en file d'attente. Ces morceaux seront générés à distance. Si ce processus est asynchrone, il peut renvoyer une promesse indiquant la réussite ou l'échec de l'opération. La Le paramètrereason
contient un élémentDOMString
décrivant la raison pour laquelle le flux a été annulé.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
La
WritableStreamDefaultController
interface de l'API Streams représente un contrôleur permettant de contrôler l'état d'une WritableStream
pendant la configuration, lorsque davantage de fragments sont envoyés pour écriture, ou à la fin de l'écriture. Lors de la construction
un WritableStream
, le récepteur sous-jacent reçoit un WritableStreamDefaultController
correspondant ;
une instance à manipuler. WritableStreamDefaultController
n'a qu'une seule méthode:
WritableStreamDefaultController.error()
,
ce qui entraîne une erreur pour toute interaction ultérieure avec le flux associé.
WritableStreamDefaultController
accepte également une propriété signal
qui renvoie une instance de
AbortSignal
permettant d'arrêter une opération WritableStream
si nécessaire.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Le deuxième argument, également facultatif, du constructeur WritableStream()
est queuingStrategy
.
Il s'agit d'un objet qui définit éventuellement une stratégie de mise en file d'attente pour le flux, ce qui nécessite deux
paramètres:
highWaterMark
: nombre non négatif indiquant la marque des eaux haute du cours d'eau utilisant cette stratégie de mise en file d'attente.size(chunk)
: fonction qui calcule et renvoie la taille non négative finie de la valeur de fragment donnée. Le résultat permet de déterminer la contre-pression, qui se manifeste via la propriétéWritableStreamDefaultWriter.desiredSize
appropriée.
Les méthodes getWriter()
et write()
Pour écrire dans un flux accessible en écriture, vous avez besoin d'un rédacteur.
WritableStreamDefaultWriter
La méthode getWriter()
de l'interface WritableStream
renvoie une
nouvelle instance de WritableStreamDefaultWriter
et verrouille le flux sur cette instance. Alors que le
flux est verrouillé, aucun autre scénariste ne peut être acquis tant que le flux actuel n'est pas libéré.
write()
de la classe
WritableStreamDefaultWriter
écrit un fragment de données transmis dans un WritableStream
et son récepteur sous-jacent, puis renvoie
une promesse qui se résout pour indiquer la réussite
ou l'échec de l'opération d'écriture. Notez que ce que
"succès" dépend du récepteur sous-jacent ; cela peut indiquer que
le bloc a été accepté,
et pas nécessairement qu’elle est enregistrée
en toute sécurité dans sa destination finale.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Propriété locked
Vous pouvez vérifier si un flux accessible en écriture est verrouillé en accédant à son
WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemple de code de flux accessible en écriture
L'exemple de code ci-dessous illustre toutes les étapes en action.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Rediriger un flux lisible vers un flux accessible en écriture
Un flux lisible peut être redirigé vers un flux accessible en écriture via la couche
pipeTo()
.
ReadableStream.pipeTo()
dirige le ReadableStream
actuel vers un WritableStream
donné et renvoie un
promesse qui se termine lorsque le processus de piping se termine avec succès, ou qui est rejeté si des erreurs se sont produites
rencontrés.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Créer un flux de transformation
L'interface TransformStream
de l'API Streams représente un ensemble de données transformables. Toi
créer un flux de transformation en appelant son constructeur TransformStream()
, qui crée et renvoie
un objet de flux de transformation à partir des gestionnaires donnés. Le constructeur TransformStream()
accepte comme
son premier argument est un objet JavaScript facultatif représentant la transformer
. De tels objets peuvent
contenant l'une des méthodes suivantes:
transformer
start(controller)
: cette méthode est appelée immédiatement lors de la construction de l'objet. Habituellement Elle permet de mettre en file d'attente des fragments de préfixes à l'aide decontroller.enqueue()
. Ces fragments seront lus du côté lisible, mais ne dépendent d'aucune écriture vers le côté accessible en écriture. Si cette initiale est asynchrone, par exemple parce qu'il faut un certain effort pour acquérir les fragments de préfixe, la fonction peut renvoyer une promesse pour signaler la réussite ou l'échec ; une promesse refusée génère une erreur flux. Toutes les exceptions générées sont renvoyées par le constructeurTransformStream()
.transform(chunk, controller)
: cette méthode est appelée lorsqu'un nouveau fragment écrit initialement dans le accessible en écriture est prêt à être transformé. L'implémentation de flux garantit que cette fonction est appelé uniquement après la réussite des transformations précédentes etstart()
n'a jamais été terminée ou après l'appel deflush()
. Cette fonction effectue la transformation du flux de transformation. Elle peut mettre les résultats en file d'attente à l'aide decontroller.enqueue()
. Ce permet à un seul bloc écrit sur le côté accessible en écriture d'obtenir zéro ou plusieurs fragments sur le du côté lisible, en fonction du nombre de fois oùcontroller.enqueue()
est appelé. Si le processus de est asynchrone, cette fonction peut renvoyer une promesse pour signaler la réussite ou l'échec la transformation. Une promesse refusée générera une erreur au niveau des côtés lisibles et en écriture de la flux de transformation. Si aucune méthodetransform()
n'est fournie, la transformation d'identité est utilisée, ce qui met en file d'attente les fragments inchangés du côté accessible en écriture vers le côté lisible.flush(controller)
: cette méthode est appelée une fois que tous les fragments écrits dans le côté accessible en écriture ont été transformées en passant avec succès viatransform()
, et le côté accessible en écriture est sur le point d'être fermé. Généralement utilisé pour mettre en file d'attente des fragments de suffixe du côté lisible, avant cela devient fermée. Si le processus de vidage est asynchrone, la fonction peut renvoyer une promesse signaler la réussite ou l'échec ; le résultat est communiqué à l'appelantstream.writable.write()
De plus, une promesse refusée générera une erreur au niveau accessibles en écriture du flux. Le traitement d'une exception est le même que pour le renvoi d'une exception prometteurs.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Stratégies de mise en file d'attente writableStrategy
et readableStrategy
Les deuxième et troisième paramètres facultatifs du constructeur TransformStream()
sont facultatifs
Stratégies de mise en file d'attente writableStrategy
et readableStrategy
. Elles sont définies comme indiqué dans les
accessible en lecture et le flux accessible en écriture
respectivement.
Exemple de code de flux de transformation
L'exemple de code suivant montre un flux de transformation simple en action.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Transmettre un flux lisible via un flux de transformation
pipeThrough()
de l'interface ReadableStream
fournit un moyen de chaîner le flux actuel
via un flux de transformation ou toute
autre paire accessible en écriture/lisible. La reprise d'un flux
permet de verrouiller
pendant toute la durée du pipe, ce qui
empêche les autres lecteurs de le verrouiller.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
L'exemple de code suivant (un peu artificiel) montre comment implémenter une requête version de fetch()
qui met tout le texte en majuscules en consommant la promesse de réponse renvoyée
sous forme de flux
et en majuscules, fragment par fragment. L'avantage de cette approche est que vous n'avez pas besoin d'attendre
l'ensemble du document à télécharger, ce qui peut faire une grande différence lors du traitement de fichiers volumineux.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Démo
La démonstration ci-dessous présente des flux lisibles et accessibles en écriture, ainsi que des flux de transformation en action. Il comprend également des exemples
de chaînes de pipeline pipeThrough()
et pipeTo()
, et illustre également tee()
. Vous pouvez aussi exécuter
la démonstration dans sa propre fenêtre ou afficher
code source.
Flux utiles disponibles dans le navigateur
Un certain nombre de flux utiles sont intégrés directement dans le navigateur. Vous pouvez facilement créer
ReadableStream
à partir d'un blob. Blob
la méthode stream() de l'interface renvoie
un ReadableStream
qui, lors de la lecture, renvoie les données contenues dans l'objet blob. N'oubliez pas non plus qu'un
L'objet File
est un type spécifique
Blob
, et peut être utilisé dans n'importe quel contexte qu'un blob peut.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Les variantes de streaming de TextDecoder.decode()
et TextEncoder.encode()
sont appelées
TextDecoderStream
et
TextEncoderStream
, respectivement.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
La compression et la décompression d'un fichier sont faciles
CompressionStream
et
Flux de transformation DecompressionStream
respectivement. L'exemple de code ci-dessous montre comment télécharger la spécification Streams, la compresser (gzip)
directement dans le navigateur et écrivez
le fichier compressé directement sur le disque.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
L'API File System Access
FileSystemWritableFileStream
et les fetch()
flux de requêtes expérimentaux
exemples de flux accessibles en écriture dans la nature.
L'API Serial utilise de manière intensive les flux accessibles en lecture et en écriture.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Enfin, l'API WebSocketStream
intègre les flux avec l'API WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Ressources utiles
- Spécifications concernant les flux
- Démonstrations associées
- Polyfills diffusés
- 2016 : l'année des flux Web
- Itérateurs et générateurs asynchrones
- Stream Visualizer
Remerciements
Cet article a été examiné par Jake Archibal, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley et Adam Rice. Les articles de blog de Jake Archibal m'ont beaucoup aidé à comprendre flux. Certains exemples de code sont inspirés par un utilisateur de GitHub les explorations de @bellbind et parties de la prose s'appuient fortement sur Documents Web MDN sur les flux. La Streams Standard auteurs ont accompli un travail remarquable d'écrire cette spécification. Image héros de Ryan Lara sur Unsplash.