Transmisiones: la guía definitiva

Aprende a usar transmisiones legibles, de escritura y de transformación con la API de Streams.

La API de Streams te permite acceder de manera programática a flujos de datos recibidos a través de la red o creados por cualquier medio de forma local y procesarlos con JavaScript. La transmisión implica desglosar un recurso que deseas recibir, enviar o transformar en fragmentos pequeños y, luego, procesarlos poco a poco. Si bien la transmisión es algo que los navegadores hacen de todas formas cuando reciben elementos como HTML o videos para mostrar en las páginas web, esta función nunca estuvo disponible para JavaScript antes de que se presentara la fetch con transmisiones en 2015.

Antes, si querías procesar algún tipo de recurso (ya sea un video, un archivo de texto, etc.), tenías que descargar el archivo completo, esperar a que se deserialice en un formato adecuado y, luego, procesarlo. Como las transmisiones están disponibles para JavaScript, todo esto cambia. Ahora puedes procesar datos sin procesar con JavaScript de manera progresiva en cuanto estén disponibles en el cliente, sin necesidad de generar un búfer, una cadena o un BLOB. Esto desbloquea una serie de casos de uso, algunos de los cuales se enumeran a continuación:

  • Efectos de video: Canalización de una transmisión de video legible a través de una transmisión de transformación que aplica efectos en tiempo real
  • Descompresión (des)compresión de datos: Canalización de una transmisión de archivos a través de una transmisión de transformación que la descomprime de forma selectiva.
  • Decodificación de imágenes: Canalización de una transmisión de respuesta HTTP a través de una transmisión de transformación que decodifica bytes en datos de mapa de bits y, luego, a través de otra transmisión de transformación que traduce mapas de bits a PNG. Si se instala dentro del controlador fetch de un service worker, puedes usar polyfills nuevos en formatos de imagen de manera transparente, como AVIF.

Navegadores compatibles

ReadableStream y WritableStream

Navegadores compatibles

  • 43
  • 14
  • 65
  • 10.1

Origen

TransformStream

Navegadores compatibles

  • 67
  • 79
  • 102
  • 14.1

Origen

Conceptos básicos

Antes de entrar en detalles sobre los distintos tipos de transmisiones, presentaré algunos conceptos básicos.

En trozos

Un fragmento es un dato único que se escribe en una transmisión o se lee desde ella. Pueden ser de cualquier tipo; las transmisiones incluso pueden contener fragmentos de diferentes tipos. La mayoría de las veces, un fragmento no será la unidad de datos más atómica de un flujo determinado. Por ejemplo, un flujo de bytes podría contener fragmentos que constan de unidades Uint8Array de 16 KiB, en lugar de bytes individuales.

Transmisiones legibles

Una transmisión legible representa una fuente de datos desde la que puedes leer. En otras palabras, los datos provienen de un flujo legible. En concreto, una transmisión legible es una instancia de la clase ReadableStream.

Transmisiones con capacidad de escritura

Una transmisión con capacidad de escritura representa un destino para datos en los que puedes escribir. En otras palabras, los datos entran a un flujo que admite escritura. En concreto, una transmisión que admite escritura es una instancia de la clase WritableStream.

Transformar transmisiones

Una transmisión de transformación consta de un par de transmisiones: una transmisión que admite escritura, conocida como su lado que admite escritura, y una transmisión legible, conocida como su lado legible. Una metáfora del mundo real para esto sería un intérprete simultáneo que traduce de un idioma a otro sobre la marcha. De una manera específica para la transmisión de transformación, escribir en el lado que admite escritura hace que los datos nuevos estén disponibles para leer desde el lado legible. En concreto, cualquier objeto con una propiedad writable y una propiedad readable puede servir como un flujo de transformaciones. Sin embargo, la clase TransformStream estándar facilita la creación de este par que se enreda correctamente.

Cadenas de tuberías

Principalmente, las transmisiones se canalizan entre sí. Una transmisión legible se puede canalizar directamente a una transmisión con escritura mediante el método pipeTo() de la transmisión legible, o bien se puede canalizar primero a través de una o más transmisiones de transformación con el método pipeThrough() de la transmisión legible. Un conjunto de transmisiones canalizadas juntas de esta manera se denomina cadena de canalización.

Contrapresión

Una vez que se crea una cadena de tuberías, se propagan indicadores respecto de qué tan rápido deben fluir los fragmentos a través de ella. Si algún paso de la cadena aún no puede aceptar fragmentos, propaga una señal hacia atrás a través de la cadena de tubo, hasta que finalmente se le dice a la fuente original que deje de producir fragmentos tan rápido. Este proceso de normalización del flujo se denomina contrapresión.

Golpe inicial

Se puede agregar una transmisión legible (que lleva su nombre según la forma de una "T" mayúscula) con su método tee(). Esto bloqueará la transmisión, es decir, dejará de usarla directamente; sin embargo, creará dos transmisiones nuevas, llamadas ramas, que se pueden consumir de forma independiente. El inicio de sesión también es importante, ya que las transmisiones no se pueden retroceder ni reiniciar. Se trata de un tema más adelante.

Diagrama de una cadena de canalización que consta de una transmisión legible que proviene de una llamada a la API de recuperación que luego se canaliza a través de un flujo de transformación cuyo resultado se envía y, luego, se envía al navegador para la primera transmisión legible resultante y a la caché del service worker para la segunda transmisión legible resultante.
Una cadena de tuberías.

La mecánica de una transmisión legible

Una transmisión legible es una fuente de datos representada en JavaScript por un objeto ReadableStream que fluye desde una fuente subyacente. El constructor ReadableStream() crea y muestra un objeto de transmisión legible de los controladores dados. Hay dos tipos de fuentes subyacentes:

  • Las fuentes de envío te envían datos constantemente cuando accedes a ellas, y depende de ti iniciar, pausar o cancelar el acceso a la transmisión. Los ejemplos incluyen transmisiones de video en vivo, eventos enviados por el servidor o WebSockets.
  • Las fuentes de extracción requieren que solicites datos de forma explícita una vez que estén conectadas. Algunos ejemplos incluyen operaciones HTTP a través de llamadas fetch() o XMLHttpRequest.

Los datos de transmisión se leen de forma secuencial en partes pequeñas llamadas fragmentos. Se dice que los fragmentos que se colocan en una transmisión están en cola. Esto significa que están en una cola lista para leerla. Una cola interna realiza un seguimiento de los fragmentos que aún no se leyeron.

Una estrategia de cola es un objeto que determina cómo una transmisión debe indicar la contrapresión en función del estado de su cola interna. La estrategia de colas asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola con un número especificado, conocido como la marca de agua alta.

Un lector lee los fragmentos dentro de la transmisión. Este lector recupera los datos de a un fragmento a la vez, lo que te permite realizar cualquier tipo de operación que desees en ellos. El lector y el otro código de procesamiento que lo acompaña se denominan consumidor.

La siguiente construcción en este contexto se denomina controlador. Cada transmisión legible tiene un controlador asociado que, como su nombre sugiere, te permite controlarla.

Solo un lector puede leer una transmisión a la vez; cuando se crea un lector y comienza a leer una transmisión (es decir, se convierte en un lector activo), se lo bloquea. Si quieres que otro lector se haga cargo de la lectura de tu transmisión, en general, debes liberar el primer lector antes de hacer cualquier otra cosa (aunque puedes aplicar las transmisiones).

Crea una transmisión legible

Para crear una transmisión legible, llama a su constructor ReadableStream(). El constructor tiene un argumento opcional underlyingSource, que representa un objeto con métodos y propiedades que definen cómo se comportará la instancia de transmisión construida.

underlyingSource

Esto puede usar los siguientes métodos opcionales definidos por el desarrollador:

  • start(controller): Se llama de inmediato cuando se construye el objeto. El método puede acceder a la fuente de la transmisión y realizar cualquier otra cosa necesaria para configurar la funcionalidad de la transmisión. Si este proceso se realiza de forma asíncrona, el método puede mostrar una promesa que indique el éxito o el fracaso. El parámetro controller que se pasa a este método es un ReadableStreamDefaultController.
  • pull(controller): Se puede usar para controlar la transmisión a medida que se recuperan más fragmentos. Se llama repetidamente mientras la cola interna de fragmentos de la transmisión no esté llena, hasta que la cola alcance su marca de agua alta. Si el resultado de la llamada a pull() es una promesa, no se volverá a llamar a pull() hasta que se cumpla esa promesa. Si se rechaza la promesa, se producirá un error en la transmisión.
  • cancel(reason): Se llama cuando el consumidor de la transmisión la cancela.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController admite los siguientes métodos:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

El segundo argumento del constructor ReadableStream(), también opcional, es queuingStrategy. Es un objeto que define de forma opcional una estrategia de fila para la transmisión, que toma dos parámetros:

  • highWaterMark: Es un número no negativo que indica la marca de agua más alta del flujo con esta estrategia de fila.
  • size(chunk): Es una función que calcula y muestra el tamaño finito no negativo del valor de fragmento dado. El resultado se usa para determinar la contrapresión, que se manifiesta a través de la propiedad ReadableStreamDefaultController.desiredSize adecuada. También controla cuándo se llama al método pull() de la fuente subyacente.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Los métodos getReader() y read()

Para leer de una transmisión legible, necesitas un lector, que será un ReadableStreamDefaultReader. El método getReader() de la interfaz ReadableStream crea un lector y bloquea la transmisión en él. Mientras la transmisión está bloqueada, no se podrá adquirir otro lector hasta que se lance este.

El método read() de la interfaz ReadableStreamDefaultReader muestra una promesa que brinda acceso al siguiente fragmento de la cola interna de la transmisión. Se entrega o rechaza con un resultado según el estado de la transmisión. Las diferentes posibilidades son las siguientes:

  • Si hay un fragmento disponible, la promesa se cumplirá con un objeto del formato
    { value: chunk, done: false }.
  • Si se cierra la transmisión, la promesa se cumplirá con un objeto con el formato
    { value: undefined, done: true }.
  • Si se produce un error en la transmisión, la promesa se rechaza con el error correspondiente.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

La propiedad locked

Puedes verificar si una transmisión legible está bloqueada si accedes a su propiedad ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Muestras de código de transmisión legibles

En la siguiente muestra de código, se muestran todos los pasos en acción. Primero, crea un objeto ReadableStream que, en su argumento underlyingSource (es decir, la clase TimestampSource) defina un método start(). Este método le indica al controller de la transmisión que enqueue() una marca de tiempo cada segundo durante diez segundos. Por último, le indica al controlador que close() la transmisión. Para consumir esta transmisión, crea un lector con el método getReader() y llama a read() hasta que la transmisión sea done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iteración asíncrona

Es posible que verificar cada iteración de bucle read() si la transmisión es done no sea la API más conveniente. Afortunadamente, en poco tiempo habrá una mejor manera de hacerlo: la iteración asíncrona.

for await (const chunk of stream) {
  console.log(chunk);
}

Una solución alternativa para usar la iteración asíncrona en la actualidad es implementar el comportamiento con un polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Cómo crear un comienzo de una transmisión legible

El método tee() de la interfaz ReadableStream agrega la transmisión legible actual y muestra un array de dos elementos que contiene las dos ramas resultantes como instancias de ReadableStream nuevas. Esto permite que dos lectores lean una transmisión simultáneamente. Puedes hacerlo, por ejemplo, en un service worker si deseas recuperar una respuesta del servidor y transmitirla al navegador, pero también transmitirla a la caché del service worker. Como el cuerpo de una respuesta no se puede consumir más de una vez, necesitas dos copias para hacerlo. Para cancelar la transmisión, debes cancelar las dos ramas resultantes. Por lo general, el inicio de una transmisión continua la bloqueará por el tiempo, lo que evita que otros lectores la bloqueen.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Flujos de bytes legibles

En el caso de las transmisiones que representan bytes, se proporciona una versión extendida de la transmisión legible para controlar los bytes de manera eficiente, en particular mediante la minimización de copias. Las transmisiones de bytes permiten adquirir lectores de tipo "trae tu propio búfer" (BYOB). La implementación predeterminada puede proporcionar un rango de resultados diferentes, como strings o búferes de array en el caso de WebSockets, mientras que las transmisiones de bytes garantizan la salida de bytes. Además, los lectores BYOB tienen beneficios de estabilidad. Esto se debe a que, si un búfer se desconecta, puede garantizar que no se escriba dos veces en el mismo búfer, lo que evita condiciones de carrera. Los lectores BYOB pueden reducir la cantidad de veces que el navegador necesita ejecutar la recolección de elementos no utilizados, ya que puede reutilizar búferes.

Cómo crear un flujo de bytes legible

Puedes crear un flujo de bytes legible pasando un parámetro type adicional al constructor ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

La fuente subyacente de un flujo de bytes legible recibe un ReadableByteStreamController para manipular. Su método ReadableByteStreamController.enqueue() toma un argumento chunk cuyo valor es un ArrayBufferView. La propiedad ReadableByteStreamController.byobRequest muestra la solicitud de extracción BYOB actual, o nula si no hay ninguna. Por último, la propiedad ReadableByteStreamController.desiredSize muestra el tamaño deseado para llenar la cola interna de la transmisión controlada.

queuingStrategy

El segundo argumento del constructor ReadableStream(), también opcional, es queuingStrategy. Es un objeto que define de forma opcional una estrategia de cola para la transmisión, que toma un parámetro:

  • highWaterMark: Es una cantidad no negativa de bytes que indica la marca de agua alta del flujo que usa esta estrategia de cola. Se utiliza para determinar la contrapresión, que se manifiesta a través de la propiedad ReadableByteStreamController.desiredSize adecuada. También controla cuándo se llama al método pull() de la fuente subyacente.

Los métodos getReader() y read()

Luego, puedes obtener acceso a un ReadableStreamBYOBReader si configuras el parámetro mode según corresponda: ReadableStream.getReader({ mode: "byob" }). Esto permite un control más preciso sobre la asignación del búfer para evitar copias. Para leer desde el flujo de bytes, debes llamar a ReadableStreamBYOBReader.read(view), donde view es un ArrayBufferView.

Muestra de código de flujo de bytes legible

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

La siguiente función muestra flujos de bytes legibles que permiten una lectura eficiente de copia cero de un arreglo generado de forma aleatoria. En lugar de usar un tamaño de fragmento predeterminado de 1,024, intenta llenar el búfer proporcionado por el desarrollador, lo que permite un control total.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

La mecánica de una transmisión con escritura

Una transmisión con capacidad de escritura es un destino en el que puedes escribir datos, representado en JavaScript por un objeto WritableStream. Esto sirve como una abstracción sobre un receptor subyacente, que es un receptor de E/S de nivel inferior en el que se escriben datos sin procesar.

Los datos se escriben en el flujo a través de un escritor, un fragmento a la vez. Un fragmento puede tener muchas formas, como los fragmentos en un lector. Puedes usar el código que desees para producir los fragmentos listos para escribir; el escritor y el código asociado se llaman productor.

Cuando se crea un escritor y comienza a escribir en una transmisión (un escritor activo), se dice que está bloqueado a ella. Solo un escritor a la vez puede escribir en una transmisión con capacidad de escritura. Si quieres que otro escritor comience a escribir en tu transmisión, por lo general, debes liberarlo antes de adjuntarle otro.

Una cola interna realiza un seguimiento de los fragmentos que se escribieron en la transmisión, pero que el receptor subyacente todavía no los procesó.

Una estrategia de cola es un objeto que determina cómo una transmisión debe indicar la contrapresión en función del estado de su cola interna. La estrategia de colas asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola con un número especificado, conocido como la marca de agua alta.

La construcción final se denomina controlador. Cada transmisión con capacidad de escritura tiene un controlador asociado que te permite controlarla (por ejemplo, anularla).

Cómo crear una transmisión que admite escritura

La interfaz WritableStream de la API de Streams proporciona una abstracción estándar para escribir datos de transmisión en un destino, conocido como un receptor. Este objeto incluye contrapresión y puesta en cola integradas. Para crear una transmisión con capacidad de escritura, llama a su constructor WritableStream(). Tiene un parámetro underlyingSink opcional, que representa un objeto con métodos y propiedades que definen cómo se comportará la instancia de transmisión construida.

underlyingSink

underlyingSink puede incluir los siguientes métodos opcionales definidos por el desarrollador. El parámetro controller que se pasa a algunos de los métodos es un WritableStreamDefaultController.

  • start(controller): Se llama a este método de inmediato cuando se construye el objeto. El contenido de este método debe apuntar a obtener acceso al receptor subyacente. Si este proceso se realiza de forma asíncrona, puede mostrar una promesa que indica el éxito o el fracaso.
  • write(chunk, controller): Se llamará a este método cuando un fragmento nuevo de datos (especificado en el parámetro chunk) esté listo para escribirse en el receptor subyacente. Puede mostrar una promesa para indicar el éxito o la falla de la operación de escritura. Se llamará a este método solo después de que se hayan realizado correctamente las operaciones de escritura anteriores y nunca después de que se cierre o anule la transmisión.
  • close(controller): Se llamará a este método si la app indica que terminó de escribir fragmentos en la transmisión. El contenido debe hacer lo que sea necesario para finalizar las operaciones de escritura en el receptor subyacente y liberar acceso a él. Si este proceso es asíncrono, puede mostrar una promesa para indicar el éxito o el fracaso. Solo se llamará a este método después de que todas las escrituras en cola se realicen correctamente.
  • abort(reason): Se llamará a este método si la app indica que desea cerrar la transmisión de manera repentina y ponerla en un estado de error. Puede limpiar los recursos retenidos, como close(), pero se llamará a abort() incluso si las operaciones de escritura están en cola. Esos fragmentos se desecharán. Si este proceso es asíncrono, puede mostrar una promesa para indicar el éxito o el fracaso. El parámetro reason contiene un DOMString que describe por qué se anuló la transmisión.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

La interfaz WritableStreamDefaultController de la API de Streams representa un controlador que permite controlar el estado de un WritableStream durante la configuración, a medida que se envían más fragmentos para escribir o al final de la escritura. Cuando se construye un WritableStream, el receptor subyacente recibe una instancia de WritableStreamDefaultController correspondiente para manipular. WritableStreamDefaultController tiene un solo método, WritableStreamDefaultController.error(), que genera un error en cualquier interacción futura con la transmisión asociada. WritableStreamDefaultController también admite una propiedad signal que muestra una instancia de AbortSignal, lo que permite detener una operación WritableStream si es necesario.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

El segundo argumento del constructor WritableStream(), también opcional, es queuingStrategy. Es un objeto que define de forma opcional una estrategia de fila para la transmisión, que toma dos parámetros:

  • highWaterMark: Es un número no negativo que indica la marca de agua más alta del flujo con esta estrategia de fila.
  • size(chunk): Es una función que calcula y muestra el tamaño finito no negativo del valor de fragmento dado. El resultado se usa para determinar la contrapresión, que se manifiesta a través de la propiedad WritableStreamDefaultWriter.desiredSize adecuada.

Los métodos getWriter() y write()

Para escribir en una transmisión que admite escritura, necesitas un escritor, que será un WritableStreamDefaultWriter. El método getWriter() de la interfaz WritableStream muestra una instancia nueva de WritableStreamDefaultWriter y bloquea la transmisión a esa instancia. Mientras la transmisión está bloqueada, no se podrá adquirir otro escritor hasta que se lance el actual.

El método write() de la interfaz WritableStreamDefaultWriter escribe un fragmento de datos pasado a un WritableStream y a su receptor subyacente y, luego, muestra una promesa que se resuelve para indicar el éxito o el fracaso de la operación de escritura. Ten en cuenta que lo que significa “éxito” depende del receptor subyacente; podría indicar que se aceptó el fragmento y no necesariamente que se guardó de forma segura en su destino final.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

La propiedad locked

Para verificar si una transmisión con capacidad de escritura está bloqueada, accede a su propiedad WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Muestra de código de transmisión que admite escritura

En la siguiente muestra de código, se muestran todos los pasos en acción.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Canalizar una transmisión legible a una transmisión con escritura

Se puede canalizar una transmisión legible a través del método pipeTo() de la transmisión legible. ReadableStream.pipeTo() canaliza el ReadableStream actual a un WritableStream determinado y muestra una promesa que se cumple cuando el proceso de canalización se completa correctamente o se rechaza si se encuentran errores.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Crea una transmisión de transformación

La interfaz TransformStream de la API de Streams representa un conjunto de datos transformables. Para crear una transmisión de transformación, llama a su constructor TransformStream(), que crea y muestra un objeto de flujo de transformación de los controladores determinados. El constructor TransformStream() acepta como primer argumento un objeto de JavaScript opcional que representa el transformer. Esos objetos pueden contener cualquiera de los siguientes métodos:

transformer

  • start(controller): Se llama a este método de inmediato cuando se construye el objeto. Por lo general, se usa para poner fragmentos de prefijos en cola mediante controller.enqueue(). Esos fragmentos se leerán desde el lado legible, pero no dependerán de ninguna escritura en el lado que admite escritura. Si este proceso inicial es asíncrono, por ejemplo, porque se requiere cierto esfuerzo para adquirir los fragmentos de prefijo, la función puede mostrar una promesa que indique el éxito o el fracaso; una promesa rechazada generará un error en la transmisión. El constructor TransformStream() volverá a mostrar las excepciones arrojadas.
  • transform(chunk, controller): Se llama a este método cuando un fragmento nuevo escrito originalmente en el lado con capacidad de escritura está listo para transformarse. La implementación de transmisión garantiza que se llamará a esta función solo después de que se hayan realizado correctamente las transformaciones anteriores y nunca antes de que se complete start() o después de que se haya llamado a flush(). Esta función realiza el trabajo de transformación real del flujo de transformación. Puede poner los resultados en cola con controller.enqueue(). Esto permite que un solo fragmento escrito en el lado que admite escritura da como resultado cero o varios fragmentos en el lado legible, según la cantidad de veces que se llame a controller.enqueue(). Si el proceso de transformación es asíncrono, esta función puede mostrar una promesa que indica el éxito o el fracaso de la transformación. Una promesa rechazada generará un error en los lados de lectura y escritura del flujo de transformaciones. Si no se proporciona un método transform(), se usa la transformación de identidad, que pone en cola los fragmentos sin cambios del lado que admite escritura al lado de lectura.
  • flush(controller): Se llama a este método después de que todos los fragmentos escritos en el lado que admiten escritura se transformaron pasando correctamente por transform(), y el lado que admite escritura está a punto de cerrarse. Por lo general, se usa para poner fragmentos de sufijo en cola en el lado legible, antes de que se cierre. Si el proceso de limpieza es asíncrono, la función puede mostrar una promesa para indicar el éxito o el fracaso; el resultado se comunicará al llamador de stream.writable.write(). Además, una promesa rechazada generará un error en los lados de lectura y escritura de la transmisión. Arrojar una excepción se trata de la misma manera que mostrar una promesa rechazada.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Las estrategias de cola writableStrategy y readableStrategy

El segundo y tercer parámetros opcionales del constructor TransformStream() son estrategias de cola opcionales writableStrategy y readableStrategy. Se definen como se describe en las secciones de transmisión legibles y que admiten escritura, respectivamente.

Muestra de código de flujo de transformación

En la siguiente muestra de código, se observa un flujo de transformaciones simple en acción.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Canalización de una transmisión legible a través de una transmisión de transformación

El método pipeThrough() de la interfaz ReadableStream proporciona una forma encadenable de canalizar la transmisión actual a través de una transmisión de transformación o cualquier otro par que admita escritura o lectura. Por lo general, la canalización de una transmisión la bloqueará mientras dure la canalización, lo que evita que otros lectores la bloqueen.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

En la siguiente muestra de código (un poco forzada) se indica cómo implementar una versión “gritando” de fetch() que convierte el texto en mayúsculas consumiendo la promesa de respuesta que se muestra como una transmisión y mayúsculas en fragmento por fragmento. La ventaja de este enfoque es que no necesitas esperar a que se descargue todo el documento, lo que puede suponer una gran diferencia cuando trabajas con archivos grandes.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demostración

En la siguiente demostración, se muestran transmisiones legibles, de escritura y de transformación en acción. También se incluyen ejemplos de cadenas de canalización pipeThrough() y pipeTo(), y se muestra tee(). De manera opcional, puedes ejecutar la demostración en su propia ventana o ver el código fuente.

Transmisiones útiles disponibles en el navegador

Hay una serie de transmisiones útiles integradas en el navegador. Puedes crear un ReadableStream con facilidad a partir de un BLOB. El método stream() de la interfaz Blob muestra un ReadableStream que, al momento de la lectura, muestra los datos contenidos en el BLOB. Recuerda también que un objeto File es un tipo específico de Blob y se puede usar en cualquier contexto que pueda usar un BLOB.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Las variantes de transmisión de TextDecoder.decode() y TextEncoder.encode() se denominan TextDecoderStream y TextEncoderStream, respectivamente.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Comprimir o descomprimir un archivo es fácil con las transmisiones de transformaciones CompressionStream y DecompressionStream, respectivamente. En la siguiente muestra de código, se indica cómo descargar la especificación de flujos, comprimirla (gzip) directamente en el navegador y escribir el archivo comprimido directamente en el disco.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Las FileSystemWritableFileStream de la API de File System Access y las transmisiones de solicitudes fetch() experimentales son ejemplos de transmisiones en las que se pueden escribir.

La API de Serial hace un uso intensivo de las transmisiones legibles y con capacidad de escritura.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Por último, la API de WebSocketStream integra transmisiones con la API de WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Recursos útiles

Agradecimientos

Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley y Adam Rice revisaron este artículo. Las entradas de blog de Jake Archibald me ayudaron mucho a comprender los flujos. Algunas de las muestras de código están inspiradas en las exploraciones del usuario de GitHub @bellbind y las partes de la prosa se compilan en gran medida en los documentos web de MDN en transmisiones. Los autores de Streams Standard hicieron un gran trabajo al escribir esta especificación. Hero image de Ryan Lara en Unsplash.