Aprende a usar flujos legibles, grabables y de transformación con la API de Streams.
La API de Streams te permite acceder de forma programática a flujos de datos recibidos a través de la red o creados por cualquier medio de forma local, y procesarlos con JavaScript. La transmisión implica dividir un recurso que deseas recibir, enviar o transformar en fragmentos pequeños y, luego, procesar estos fragmentos bit por bit. Si bien la transmisión es algo que los navegadores hacen de todos modos cuando reciben recursos como HTML o videos para mostrar en las páginas web, esta capacidad nunca estuvo disponible para JavaScript antes de que se introdujera fetch
con transmisiones en 2015.
Anteriormente, si querías procesar un recurso de algún tipo (ya sea un video, un archivo de texto, etc.), debías descargar el archivo completo, esperar a que se deserializara en un formato adecuado y, luego, procesarlo. Con la disponibilidad de transmisiones para JavaScript, todo cambia. Ahora puedes procesar datos sin procesar con JavaScript de forma progresiva en cuanto estén disponibles en el cliente, sin necesidad de generar un búfer, una cadena o un BLOB. Esto habilita varios casos de uso, algunos de los cuales se enumeran a continuación:
- Efectos de video: Canalización de una transmisión de video legible a través de una transmisión de transformación que aplica efectos en tiempo real.
- Compresión y descompresión de datos: Canalización de un flujo de archivos a través de un flujo de transformación que lo comprime o descomprime de forma selectiva.
- Decodificación de imágenes: Canalización de un flujo de respuesta HTTP a través de un flujo de transformación que decodifica bytes en datos de mapa de bits y, luego, a través de otro flujo de transformación que traduce mapas de bits en PNGs. Si se instala dentro del controlador
fetch
de un trabajador de servicio, esto te permite realizar un polyfill de forma transparente para los nuevos formatos de imagen, como AVIF.
Navegadores compatibles
ReadableStream y WritableStream
TransformStream
Conceptos básicos
Antes de entrar en detalles sobre los distintos tipos de transmisiones, permítanme presentar algunos conceptos básicos.
Fragmentos
Un fragmento es una unidad de datos que se escribe en un flujo o se lee de él. Puede ser de cualquier tipo, y los flujos pueden incluso contener fragmentos de diferentes tipos. La mayoría de las veces, un fragmento no será la unidad de datos más atómica para una transmisión determinada. Por ejemplo, un flujo de bytes podría contener fragmentos que constan de unidades Uint8Array
de 16 KiB, en lugar de bytes individuales.
Transmisiones legibles
Un flujo legible representa una fuente de datos desde la que puedes leer. En otras palabras, los datos salen de un flujo legible. En concreto, un flujo legible es una instancia de la clase ReadableStream
.
Transmisiones con capacidad de escritura
Una transmisión grabable representa un destino de datos en el que puedes escribir. En otras palabras, los datos ingresan a un flujo de escritura. En concreto, un flujo de escritura es una instancia de la clase WritableStream
.
Transmisiones de transformación
Una transmisión de transformación consta de un par de transmisiones: una transmisión grabable, conocida como su lado grabable, y una transmisión legible, conocida como su lado legible.
Una metáfora del mundo real para esto sería un intérprete simultáneo que traduce de un idioma a otro sobre la marcha.
De una manera específica para el flujo de transformación, escribir en el lado de escritura genera que los datos nuevos estén disponibles para su lectura desde el lado de lectura. En concreto, cualquier objeto con una propiedad writable
y una propiedad readable
puede servir como una transmisión de transformación. Sin embargo, la clase TransformStream
estándar facilita la creación de un par de este tipo que esté correctamente entrelazado.
Cadenas para tuberías
Las transmisiones se usan principalmente canalizándolas entre sí. Se puede canalizar un flujo legible directamente a un flujo de escritura con el método pipeTo()
del flujo legible, o bien se puede canalizar primero a través de uno o más flujos de transformación con el método pipeThrough()
del flujo legible. Un conjunto de transmisiones conectadas de esta manera se denomina cadena de tuberías.
Contrapresión
Una vez que se construye una cadena de tuberías, esta propagará señales sobre la velocidad con la que deben fluir los fragmentos a través de ella. Si algún paso de la cadena aún no puede aceptar fragmentos, propaga una señal hacia atrás a través de la cadena de canalización hasta que, finalmente, se le indica a la fuente original que deje de producir fragmentos tan rápido. Este proceso de flujo de normalización se denomina contrapresión.
Salida
Se puede bifurcar una transmisión legible (el nombre proviene de la forma de una "T" mayúscula) con su método tee()
.
Esto bloqueará la transmisión, es decir, ya no se podrá usar directamente. Sin embargo, se crearán dos transmisiones nuevas, llamadas ramas, que se podrán consumir de forma independiente.
La bifurcación también es importante porque las transmisiones no se pueden rebobinar ni reiniciar. Hablaremos más sobre esto más adelante.
La mecánica de un flujo legible
Un flujo legible es una fuente de datos representada en JavaScript por un objeto ReadableStream
que fluye desde una fuente subyacente. El constructor ReadableStream()
crea y devuelve un objeto de transmisión legible a partir de los controladores proporcionados. Existen dos tipos de fuentes subyacentes:
- Las fuentes de envío te envían datos constantemente cuando accedes a ellas, y depende de ti iniciar, pausar o cancelar el acceso a la transmisión. Algunos ejemplos incluyen transmisiones de video en vivo, eventos enviados por el servidor o WebSockets.
- Las fuentes de extracción requieren que solicites datos de forma explícita una vez que te conectes a ellas. Entre los ejemplos, se incluyen las operaciones HTTP a través de llamadas a
fetch()
oXMLHttpRequest
.
Los datos de transmisión se leen de forma secuencial en partes pequeñas llamadas fragmentos. Se dice que los fragmentos colocados en una transmisión se ponen en cola. Esto significa que están esperando en una fila para ser leídos. Una cola interna realiza un seguimiento de los fragmentos que aún no se leyeron.
Una estrategia de encolamiento es un objeto que determina cómo una transmisión debe indicar contrapresión según el estado de su cola interna. La estrategia de encolamiento asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola con un número especificado, conocido como marca de nivel alto de agua.
Un lector lee los fragmentos dentro de la transmisión. Este lector recupera los datos de a un fragmento por vez, lo que te permite realizar cualquier tipo de operación que desees. El lector y el otro código de procesamiento que lo acompaña se denominan consumidor.
La siguiente construcción en este contexto se llama controlador. Cada flujo legible tiene un controlador asociado que, como su nombre lo indica, te permite controlar el flujo.
Solo un lector puede leer una transmisión a la vez. Cuando se crea un lector y comienza a leer una transmisión (es decir, se convierte en un lector activo), se bloquea para esa transmisión. Si quieres que otro lector se haga cargo de la lectura de tu transmisión, por lo general, debes liberar el primer lector antes de hacer cualquier otra cosa (aunque puedes bifurcar transmisiones).
Cómo crear un flujo legible
Para crear un flujo legible, llama a su constructor ReadableStream()
.
El constructor tiene un argumento opcional underlyingSource
, que representa un objeto con métodos y propiedades que definen cómo se comportará la instancia de transmisión creada.
underlyingSource
Puede usar los siguientes métodos opcionales definidos por el desarrollador:
start(controller)
: Se llama inmediatamente cuando se construye el objeto. El método puede acceder a la fuente de transmisión y hacer cualquier otra acción necesaria para configurar la funcionalidad de transmisión. Si este proceso se debe realizar de forma asíncrona, el método puede devolver una promesa para indicar el éxito o el fracaso. El parámetrocontroller
que se pasa a este método es unReadableStreamDefaultController
.pull(controller)
: Se puede usar para controlar la transmisión a medida que se recuperan más fragmentos. Se llama de forma reiterada mientras la cola interna de fragmentos de la transmisión no esté llena, hasta que la cola alcance su marca de nivel alto. Si el resultado de llamar apull()
es una promesa, no se volverá a llamar apull()
hasta que se cumpla dicha promesa. Si se rechaza la promesa, se producirá un error en la transmisión.cancel(reason)
: Se llama cuando el consumidor de la transmisión cancela la transmisión.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
admite los siguientes métodos:
ReadableStreamDefaultController.close()
cierra el flujo asociado.ReadableStreamDefaultController.enqueue()
pone en cola un fragmento determinado en la transmisión asociada.ReadableStreamDefaultController.error()
hace que se produzca un error en cualquier interacción futura con la transmisión asociada.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
El segundo argumento del constructor de ReadableStream()
, que también es opcional, es queuingStrategy
.
Es un objeto que define de forma opcional una estrategia de filas para la transmisión, que toma dos parámetros:
highWaterMark
: Es un número no negativo que indica la marca de agua alta de la transmisión que usa esta estrategia de filas.size(chunk)
: Es una función que calcula y devuelve el tamaño finito no negativo del valor de fragmento determinado. El resultado se usa para determinar la contrapresión, que se manifiesta a través de la propiedadReadableStreamDefaultController.desiredSize
adecuada. También rige cuándo se llama al métodopull()
de la fuente subyacente.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Los métodos getReader()
y read()
Para leer desde un flujo legible, necesitas un lector, que será un ReadableStreamDefaultReader
.
El método getReader()
de la interfaz ReadableStream
crea un lector y bloquea el flujo para él. Mientras el flujo esté bloqueado, no se podrá adquirir ningún otro lector hasta que se libere este.
El método read()
de la interfaz ReadableStreamDefaultReader
devuelve una promesa que proporciona acceso al siguiente fragmento de la cola interna de la transmisión. Se completa o rechaza con un resultado según el estado de la transmisión. Las diferentes posibilidades son las siguientes:
- Si hay un fragmento disponible, la promesa se cumplirá con un objeto del formulario
{ value: chunk, done: false }
. - Si se cierra la transmisión, la promesa se cumplirá con un objeto de la forma
{ value: undefined, done: true }
. - Si la transmisión genera un error, la promesa se rechazará con el error correspondiente.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
La propiedad locked
Puedes verificar si un flujo legible está bloqueado accediendo a su propiedad ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Muestras de código de transmisión legible
En el siguiente ejemplo de código, se muestran todos los pasos en acción. Primero, crea un ReadableStream
que, en su argumento underlyingSource
(es decir, la clase TimestampSource
), defina un método start()
.
Este método le indica al controller
de la transmisión que enqueue()
una marca de tiempo cada segundo durante diez segundos.
Por último, le indica al controlador que close()
la transmisión. Para consumir este flujo, crea un lector a través del método getReader()
y llama a read()
hasta que el flujo sea done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iteración asíncrona
Comprobar en cada iteración del bucle read()
si la transmisión es done
puede no ser la API más conveniente.
Por suerte, pronto habrá una mejor manera de hacerlo: la iteración asíncrona.
for await (const chunk of stream) {
console.log(chunk);
}
Una solución alternativa para usar la iteración asíncrona hoy es implementar el comportamiento con un polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Cómo bifurcar un flujo legible
El método tee()
de la interfaz ReadableStream
bifurca el flujo legible actual y muestra un array de dos elementos que contiene las dos ramas resultantes como nuevas instancias de ReadableStream
. Esto permite que dos lectores lean una transmisión de forma simultánea. Por ejemplo, puedes hacer esto en un service worker si quieres recuperar una respuesta del servidor y transmitirla al navegador, pero también transmitirla a la caché del service worker. Dado que un cuerpo de respuesta no se puede consumir más de una vez, necesitas dos copias para hacerlo. Para cancelar la transmisión, debes cancelar ambas ramas resultantes. Por lo general, la bifurcación de una transmisión la bloqueará durante toda su duración, lo que impedirá que otros lectores la bloqueen.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Transmisiones de bytes legibles
Para los flujos que representan bytes, se proporciona una versión extendida del flujo legible para controlar los bytes de manera eficiente, en particular, minimizando las copias. Los flujos de bytes permiten adquirir lectores de búfer propio (BYOB). La implementación predeterminada puede proporcionar un rango de diferentes resultados, como cadenas o búferes de arrays en el caso de WebSockets, mientras que los flujos de bytes garantizan la salida de bytes. Además, los lectores BYOB tienen beneficios de estabilidad. Esto se debe a que, si se separa un búfer, se puede garantizar que no se escriba en el mismo búfer dos veces, lo que evita las condiciones de carrera. Los lectores BYOB pueden reducir la cantidad de veces que el navegador necesita ejecutar la recolección de elementos no utilizados, ya que pueden reutilizar búferes.
Cómo crear un flujo de bytes legible
Puedes crear un flujo de bytes legible pasando un parámetro type
adicional al constructor ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
A la fuente subyacente de un flujo de bytes legible se le proporciona un ReadableByteStreamController
para manipularlo. Su método ReadableByteStreamController.enqueue()
toma un argumento chunk
cuyo valor es un ArrayBufferView
. La propiedad ReadableByteStreamController.byobRequest
devuelve la solicitud de extracción de BYOB actual o null si no hay ninguna. Por último, la propiedad ReadableByteStreamController.desiredSize
devuelve el tamaño deseado para completar la cola interna de la transmisión controlada.
queuingStrategy
El segundo argumento del constructor de ReadableStream()
, que también es opcional, es queuingStrategy
.
Es un objeto que, de manera opcional, define una estrategia de filas para la transmisión, que toma un parámetro:
highWaterMark
: Es un número no negativo de bytes que indica la marca de nivel alto de la transmisión que usa esta estrategia de encolamiento. Se usa para determinar la contrapresión, que se manifiesta a través de la propiedadReadableByteStreamController.desiredSize
adecuada. También rige cuándo se llama al métodopull()
de la fuente subyacente.
Los métodos getReader()
y read()
Luego, puedes acceder a un ReadableStreamBYOBReader
configurando el parámetro mode
de la siguiente manera:
ReadableStream.getReader({ mode: "byob" })
. Esto permite un control más preciso sobre la asignación de búferes para evitar copias. Para leer desde el flujo de bytes, debes llamar a ReadableStreamBYOBReader.read(view)
, donde view
es un ArrayBufferView
.
Muestra de código de transmisión de bytes legible
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
La siguiente función devuelve flujos de bytes legibles que permiten una lectura eficiente sin copias de un arreglo generado de forma aleatoria. En lugar de usar un tamaño de fragmento predeterminado de 1,024, intenta llenar el búfer proporcionado por el desarrollador, lo que permite un control total.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
La mecánica de un flujo de escritura
Un flujo de escritura es un destino en el que puedes escribir datos, representado en JavaScript por un objeto WritableStream
. Esto sirve como una abstracción sobre un receptor subyacente, un receptor de E/S de nivel inferior en el que se escriben los datos sin procesar.
Los datos se escriben en la transmisión a través de un escritor, de a un fragmento por vez. Un fragmento puede adoptar una multitud de formas, al igual que los fragmentos de un lector. Puedes usar el código que quieras para producir los fragmentos listos para escribir. El escritor y el código asociado se denominan productor.
Cuando se crea un escritor y comienza a escribir en una transmisión (un escritor activo), se dice que está bloqueado para ella. Solo un escritor puede escribir en un flujo grabable a la vez. Si quieres que otro escritor comience a escribir en tu transmisión, por lo general, debes liberarla antes de adjuntar otro escritor a ella.
Una cola interna realiza un seguimiento de los fragmentos que se escribieron en la transmisión, pero que el receptor subyacente aún no procesó.
Una estrategia de encolamiento es un objeto que determina cómo una transmisión debe indicar contrapresión según el estado de su cola interna. La estrategia de encolamiento asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola con un número especificado, conocido como marca de nivel alto de agua.
La construcción final se denomina controlador. Cada flujo de escritura tiene un controlador asociado que te permite controlar el flujo (por ejemplo, para anularlo).
Cómo crear una transmisión grabable
La interfaz WritableStream
de la API de Streams proporciona una abstracción estándar para escribir datos de transmisión en un destino, conocido como receptor. Este objeto incluye contrapresión y encolamiento integrados. Para crear un flujo de escritura, llama a su constructor WritableStream()
.
Tiene un parámetro underlyingSink
opcional, que representa un objeto con métodos y propiedades que definen cómo se comportará la instancia de transmisión creada.
underlyingSink
El underlyingSink
puede incluir los siguientes métodos opcionales definidos por el desarrollador. El parámetro controller
que se pasa a algunos de los métodos es un WritableStreamDefaultController
.
start(controller)
: Se llama a este método inmediatamente cuando se construye el objeto. El contenido de este método debe tener como objetivo obtener acceso al receptor subyacente. Si este proceso se realiza de forma asíncrona, puede devolver una promesa para indicar éxito o falla.write(chunk, controller)
: Se llamará a este método cuando un nuevo fragmento de datos (especificado en el parámetrochunk
) esté listo para escribirse en el receptor subyacente. Puede devolver una promesa para indicar el éxito o el error de la operación de escritura. Este método se llamará solo después de que se hayan realizado correctamente las escrituras anteriores y nunca después de que se cierre o anule la transmisión.close(controller)
: Se llamará a este método si la app indica que terminó de escribir fragmentos en la transmisión. El contenido debe hacer lo que sea necesario para finalizar las escrituras en el receptor subyacente y liberar el acceso a él. Si este proceso es asíncrono, puede devolver una promesa para indicar éxito o falla. Este método se llamará solo después de que se hayan realizado correctamente todas las escrituras en cola.abort(reason)
: Se llamará a este método si la app indica que desea cerrar abruptamente el flujo y ponerlo en un estado de error. Puede limpiar cualquier recurso retenido, de forma similar aclose()
, pero se llamará aabort()
incluso si las escrituras están en cola. Esos fragmentos se descartarán. Si este proceso es asíncrono, puede devolver una promesa para indicar éxito o falla. El parámetroreason
contiene unDOMString
que describe por qué se anuló la transmisión.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
La interfaz WritableStreamDefaultController
de la API de Streams representa un controlador que permite controlar el estado de un WritableStream
durante la configuración, a medida que se envían más fragmentos para escribir o al final de la escritura. Cuando se construye un WritableStream
, el receptor subyacente recibe una instancia de WritableStreamDefaultController
correspondiente para manipular. WritableStreamDefaultController
solo tiene un método: WritableStreamDefaultController.error()
, que provoca que cualquier interacción futura con la transmisión asociada genere un error.
WritableStreamDefaultController
también admite una propiedad signal
que devuelve una instancia de AbortSignal
, lo que permite detener una operación WritableStream
si es necesario.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
El segundo argumento del constructor de WritableStream()
, que también es opcional, es queuingStrategy
.
Es un objeto que define de forma opcional una estrategia de filas para la transmisión, que toma dos parámetros:
highWaterMark
: Es un número no negativo que indica la marca de agua alta de la transmisión que usa esta estrategia de filas.size(chunk)
: Es una función que calcula y devuelve el tamaño finito no negativo del valor de fragmento determinado. El resultado se usa para determinar la contrapresión, que se manifiesta a través de la propiedadWritableStreamDefaultWriter.desiredSize
adecuada.
Los métodos getWriter()
y write()
Para escribir en un flujo de escritura, necesitas un escritor, que será un WritableStreamDefaultWriter
. El método getWriter()
de la interfaz WritableStream
devuelve una instancia nueva de WritableStreamDefaultWriter
y bloquea la transmisión para esa instancia. Mientras la transmisión está bloqueada, no se puede adquirir ningún otro escritor hasta que se libere el actual.
El método write()
de la interfaz WritableStreamDefaultWriter
escribe un fragmento de datos pasado en un WritableStream
y su receptor subyacente, y, luego, devuelve una promesa que se resuelve para indicar el éxito o el fracaso de la operación de escritura. Ten en cuenta que el significado de "éxito" depende del receptor subyacente. Podría indicar que se aceptó el fragmento y no necesariamente que se guardó de forma segura en su destino final.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
La propiedad locked
Puedes verificar si un flujo de escritura está bloqueado accediendo a su propiedad WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Muestra de código de transmisión grabable
En el siguiente ejemplo de código, se muestran todos los pasos en acción.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Cómo canalizar una transmisión legible a una transmisión grabable
Se puede canalizar un flujo legible a un flujo de escritura a través del método pipeTo()
del flujo legible.
ReadableStream.pipeTo()
canaliza el ReadableStream
actual a un WritableStream
determinado y devuelve una promesa que se cumple cuando el proceso de canalización se completa correctamente o se rechaza si se encontraron errores.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Cómo crear una transmisión de transformación
La interfaz TransformStream
de la API de Streams representa un conjunto de datos transformables. Para crear un flujo de transformación, llama a su constructor TransformStream()
, que crea y devuelve un objeto de flujo de transformación a partir de los controladores proporcionados. El constructor TransformStream()
acepta como primer argumento un objeto JavaScript opcional que representa el transformer
. Estos objetos pueden contener cualquiera de los siguientes métodos:
transformer
start(controller)
: Se llama a este método inmediatamente cuando se construye el objeto. Por lo general, se usa para poner en cola fragmentos de prefijo concontroller.enqueue()
. Esos fragmentos se leerán desde el lado legible, pero no dependen de ninguna escritura en el lado grabable. Si este proceso inicial es asíncrono, por ejemplo, porque requiere cierto esfuerzo adquirir los fragmentos de prefijo, la función puede devolver una promesa para indicar éxito o falla. Una promesa rechazada generará un error en la transmisión. El constructor deTransformStream()
volverá a arrojar cualquier excepción que se haya arrojado.transform(chunk, controller)
: Se llama a este método cuando un nuevo fragmento escrito originalmente en el lado de escritura está listo para transformarse. La implementación de la transmisión garantiza que esta función se llamará solo después de que se hayan realizado correctamente las transformaciones anteriores y nunca antes de que se completestart()
o después de que se llame aflush()
. Esta función realiza el trabajo de transformación real del flujo de transformación. Puede poner en cola los resultados concontroller.enqueue()
. Esto permite que un solo fragmento escrito en el lado de escritura genere cero o varios fragmentos en el lado de lectura, según la cantidad de veces que se llame acontroller.enqueue()
. Si el proceso de transformación es asíncrono, esta función puede devolver una promesa para indicar el éxito o el fracaso de la transformación. Una promesa rechazada generará un error en los lados legibles y grabables de la secuencia de transformación. Si no se proporciona ningún métodotransform()
, se usa la transformación de identidad, que pone en cola fragmentos sin cambios del lado de escritura al lado de lectura.flush(controller)
: Se llama a este método después de que todos los fragmentos escritos en el lado de escritura se transformaron al pasar correctamente portransform()
y el lado de escritura está a punto de cerrarse. Por lo general, se usa para poner en cola fragmentos de sufijo en el lado legible, antes de que este también se cierre. Si el proceso de vaciado es asíncrono, la función puede devolver una promesa para indicar éxito o falla. El resultado se comunicará a la persona que llamó astream.writable.write()
. Además, una promesa rechazada generará un error en los lados de lectura y escritura de la transmisión. Lanzar una excepción se trata de la misma manera que devolver una promesa rechazada.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Las estrategias de filas writableStrategy
y readableStrategy
El segundo y el tercer parámetro opcional del constructor TransformStream()
son las estrategias de encolamiento opcionales writableStrategy
y readableStrategy
. Se definen como se describe en las secciones de transmisión legible y grabable, respectivamente.
Muestra de código de transformación de transmisión
En el siguiente ejemplo de código, se muestra un flujo de transformación simple en acción.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Canalización de una transmisión legible a través de una transmisión de transformación
El método pipeThrough()
de la interfaz ReadableStream
proporciona una forma encadenable de canalizar la transmisión actual a través de una transmisión de transformación o cualquier otro par de lectura/escritura. Por lo general, canalizar un flujo lo bloqueará durante la duración de la canalización, lo que impedirá que otros lectores lo bloqueen.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
En la siguiente muestra de código (un poco artificial), se muestra cómo podrías implementar una versión "gritando" de fetch()
que convierte todo el texto a mayúsculas consumiendo la promesa de respuesta devuelta como una transmisión y convirtiendo a mayúsculas fragmento por fragmento. La ventaja de este enfoque es que no necesitas esperar a que se descargue todo el documento, lo que puede marcar una gran diferencia cuando se trata de archivos grandes.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demostración
En la siguiente demostración, se muestran los flujos de lectura, escritura y transformación en acción. También incluye ejemplos de cadenas de tuberías pipeThrough()
y pipeTo()
, y también demuestra tee()
. De manera opcional, puedes ejecutar la demostración en su propia ventana o ver el código fuente.
Flujos útiles disponibles en el navegador
Hay varios flujos útiles integrados directamente en el navegador. Puedes crear fácilmente un ReadableStream
a partir de un blob. El método stream() de la interfaz Blob
devuelve un ReadableStream
que, al leerse, devuelve los datos contenidos en el BLOB. También recuerda que un objeto File
es un tipo específico de Blob
y se puede usar en cualquier contexto en el que se pueda usar un blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Las variantes de transmisión de TextDecoder.decode()
y TextEncoder.encode()
se llaman TextDecoderStream
y TextEncoderStream
, respectivamente.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Comprimir o descomprimir un archivo es fácil con los flujos de transformación CompressionStream
y DecompressionStream
, respectivamente. En la siguiente muestra de código, se muestra cómo puedes descargar la especificación de Streams, comprimirla (gzip) directamente en el navegador y escribir el archivo comprimido directamente en el disco.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Los flujos de solicitud FileSystemWritableFileStream
de la API de File System Access y los flujos de solicitud fetch()
experimentales son ejemplos de flujos de escritura en la naturaleza.
La API de Serial usa mucho los flujos legibles y los flujos de escritura.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Por último, la API de WebSocketStream
integra transmisiones con la API de WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Recursos útiles
- Especificación de Streams
- Demostraciones complementarias
- Polyfill de Streams
- 2016: El año de los flujos web
- Iteradores y generadores asíncronos
- Visualizador de transmisiones
Agradecimientos
Este artículo fue revisado por Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley y Adam Rice. Las entradas de blog de Jake Archibald me ayudaron mucho a comprender los streams. Algunas de las muestras de código se inspiran en las exploraciones del usuario de GitHub @bellbind, y partes del texto se basan en gran medida en la documentación de MDN Web Docs sobre Streams. Los autores de Streams Standard hicieron un trabajo increíble al escribir esta especificación. La imagen de héroe es de Ryan Lara en Unsplash.