Transmisiones: la guía definitiva

Aprende a usar transmisiones que se pueden leer, escribir y transformar con la API de Streams.

La API de Streams te permite acceder de manera programática a flujos de datos recibidos a través de la red o se crea a nivel local y procesarlos con JavaScript. La transmisión implica desglosar un recurso que quieres recibir, enviar o transformar. en trozos pequeños y, luego, los procesamos poco a poco. Aunque transmitir es algo de todos modos lo hacen cuando reciben recursos como HTML o videos que se muestran en las páginas web, esto nunca estuvo disponible para JavaScript antes de que se presentara fetch con transmisiones en 2015.

Anteriormente, si querías procesar un recurso de algún tipo (ya sea un video, un archivo de texto, etc.), tendrías que descargar el archivo completo, esperar a que se deserialice a un formato adecuado y, luego, procesarlos. Como las transmisiones están disponibles JavaScript, todo esto cambia. Ahora puedes procesar datos sin procesar con JavaScript de forma progresiva a medida que tan pronto como estén disponibles en el cliente, sin necesidad de generar un búfer, una cadena o un BLOB. Esto desbloquea una serie de casos de uso, algunos de los cuales enumeramos a continuación:

  • Efectos de video: Canaliza una transmisión de video legible a través de una transmisión de transformación que aplica efectos. en tiempo real.
  • (des)compresión de datos: Canalización de una transmisión de archivos a través de una transmisión de transformación que la (des)comprime.
  • Decodificación de imágenes: Canalización de un flujo de respuesta HTTP a través de una transmisión de transformación que decodifica bytes en datos de mapas de bits y, luego, a través de otra transmisión de transformación que traduce mapas de bits a PNG. Si Se instala dentro del controlador fetch de un service worker. Esto te permite aplicar polyfills con transparencia. formatos de imagen nuevos, como AVIF.

Navegadores compatibles

ReadableStream y WritableStream

Navegadores compatibles

  • Chrome: 43.
  • Límite: 14.
  • Firefox: 65.
  • Safari: 10.1.

Origen

TransformStream

Navegadores compatibles

  • Chrome: 67.
  • Borde: 79.
  • Firefox: 102.
  • Safari: 14.1.

Origen

Conceptos básicos

Antes de entrar en detalles sobre los diferentes tipos de transmisiones, permítanme presentar algunos conceptos básicos.

En trozos

Un fragmento es un solo dato que se escribe en un flujo o se lee de él. Puede ser de cualquier el tipo; incluso pueden contener fragmentos de diferentes tipos. La mayoría de las veces, un fragmento no será el más atómico unidad de datos para una transmisión determinada. Por ejemplo, un flujo de bytes puede contener fragmentos de 16 Unidades Uint8Array KiB, en lugar de bytes individuales.

Transmisiones legibles

Un flujo legible representa una fuente de datos desde la que puedes leer. En otras palabras, los datos provienen fuera de un flujo legible. En concreto, una transmisión legible es una instancia del ReadableStream. .

Transmisiones que admiten escritura

Una transmisión grabable representa un destino para los datos en los que se puede escribir. En otras palabras, los datos entra a una transmisión que admite escritura. En concreto, una transmisión grabable es una instancia del Clase WritableStream.

Transmisiones de transformación

Una transmisión de transformación consta de un par de transmisiones: una que admite escritura, conocida como su lado que admite escritura. y un flujo legible, conocido como su lado legible. Una metáfora del mundo real para esto sería una intérprete simultáneo que traduce de un idioma a otro en el momento. De una manera específica para la transmisión de transformación, escribir hacia el lado que admite escritura da como resultado nuevos datos disponibles para leer desde el más legible. De forma concreta, cualquier objeto con una propiedad writable y una propiedad readable puede entregar como una transmisión de transformación. Sin embargo, la clase TransformStream estándar facilita la creación un par que se enrede bien.

Cadenas de tuberías

Las transmisiones se usan principalmente mediante canalización entre ellas. Una transmisión legible se puede canalizar directamente a una transmisión con capacidad de escritura mediante el método pipeTo() de la transmisión legible o se puede canalizar a través de uno o más transmisiones de transformación primero, mediante el método pipeThrough() de la transmisión legible. Un conjunto de canalizadas de esta manera se conoce como cadena de tuberías.

Contrapresión

Una vez que se construye una cadena de tuberías, propaga señales sobre qué tan rápido debe fluir los fragmentos a través de ella. Si algún paso de la cadena aún no puede aceptar fragmentos, propaga una señal hacia atrás a través de la cadena de canalización, hasta que finalmente se le dice a la fuente original que deje de producir fragmentos para rápido. Este proceso de normalización del flujo se llama contrapresión.

T

Una transmisión legible se puede usar para establecer un valor (con el nombre de la forma de una “T” mayúscula) mediante el método tee(). Esto bloqueará la transmisión, es decir, dejará de usarla de forma directa. Sin embargo, se crearán dos nuevos de transmisión, denominadas ramas, que se pueden consumir de forma independiente. Usar Teeing también es importante porque las transmisiones no se pueden retroceder ni reiniciar. Hablaremos más al respecto más adelante.

Diagrama de una cadena de canalizaciones que consiste en una transmisión legible que proviene de una llamada a la API de recuperación y que, luego, se canaliza a través de una transmisión de transformación cuyo resultado se envía al navegador para la primera transmisión legible resultante y a la caché del service worker para la segunda transmisión legible resultante.
Una cadena de tuberías.

La mecánica de una transmisión legible

Una transmisión legible es una fuente de datos representada en JavaScript por un ReadableStream que fluye desde una fuente subyacente. El ReadableStream() Este constructor crea y muestra un objeto de flujo legible a partir de los controladores proporcionados. Existen dos tipos de fuente subyacente:

  • Las fuentes de envío te envían datos constantemente cuando accedes a ellas, y tú decides iniciar, pausar o cancelar el acceso a la transmisión. Entre los ejemplos, se incluyen las transmisiones de video en vivo, los eventos enviados por el servidor, o WebSockets.
  • Las fuentes de extracción requieren que les solicites datos de forma explícita una vez que te conectes. Ejemplos Incluyen operaciones HTTP mediante llamadas fetch() o XMLHttpRequest.

Los datos de transmisión se leen de forma secuencial en partes pequeñas llamadas fragmentos. Se dice que los fragmentos que se colocan en una transmisión están en cola. Esto significa que están esperando en una fila. listo para leer. Una cola interna realiza un seguimiento de los fragmentos que aún no se leyeron.

Una estrategia de cola es un objeto que determina cómo una transmisión debe indicar la contrapresión según el estado de su cola interna. La estrategia de cola asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola a un número específico, conocido como marca de agua alta.

Un lector lee los fragmentos dentro de la transmisión. Este lector recupera los datos un fragmento a la tiempo, lo que te permite hacer cualquier tipo de operación que desees. El lector y el otro que el código de procesamiento que lo acompaña se denomina consumidor.

La siguiente construcción en este contexto se llama controlador. Cada transmisión legible tiene un vínculo controlador de red que, como su nombre lo indica, le permite controlar la transmisión.

Solo un lector puede leer una transmisión a la vez. Cuando se crea un lector y comienza a leer una transmisión (es decir, se convierte en un lector activo), está bloqueado. Si quieres que otro lector se haga cargo cuando lees tu transmisión, por lo general, debes liberar al primer lector antes de hacer cualquier otra acción (aunque puedes tee).

Crea una transmisión legible

Llama a su constructor para crear una transmisión legible ReadableStream() El constructor tiene un argumento opcional underlyingSource, que representa un objeto. con métodos y propiedades que definan cómo se comportará la instancia de transmisión construida.

underlyingSource

Esto puede usar los siguientes métodos opcionales definidos por el desarrollador:

  • start(controller): Se llama de inmediato cuando se construye el objeto. El puede acceder a la fuente de transmisión y hacer cualquier otra cosa necesario para configurar la funcionalidad de transmisión. Si este proceso se debe realizar de forma asíncrona, el método puede devolverá una promesa para indicar el éxito o el fracaso. El parámetro controller que se pasa a este método es pañal ReadableStreamDefaultController
  • pull(controller): Se puede usar para controlar la transmisión a medida que se recuperan más fragmentos. Integra Se llama repetidamente, siempre que la cola interna de fragmentos de la transmisión no esté completa, hasta la cola cuando alcance su marca de agua alta. Si el resultado de la llamada a pull() es una promesa, No se volverá a llamar a pull() hasta que se cumpla esa promesa. Si se rechaza la promesa, se producirá un error en la transmisión.
  • cancel(reason): Se llama cuando el consumidor de la transmisión la cancela.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController admite los siguientes métodos:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

El segundo argumento, también opcional, del constructor ReadableStream() es queuingStrategy. Es un objeto que, de manera opcional, define una estrategia de puesta en cola para la transmisión, que toma dos parámetros:

  • highWaterMark: Es un número no negativo que indica la marca de agua alta de la transmisión con esta estrategia de fila.
  • size(chunk): Es una función que calcula y muestra el tamaño finito no negativo del valor del fragmento dado. El resultado se usa para determinar la contrapresión, que se manifiesta mediante la propiedad ReadableStreamDefaultController.desiredSize adecuada. También determina cuándo se llama al método pull() de la fuente subyacente.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Los métodos getReader() y read()

Para leer desde una transmisión legible, necesitas un lector, que será un ReadableStreamDefaultReader El método getReader() de la interfaz ReadableStream crea un lector y bloquea la transmisión en que la modifica. Mientras la transmisión esté bloqueada, no se podrá adquirir otro lector hasta que se lance este.

La read() de la interfaz ReadableStreamDefaultReader devuelve una promesa que proporciona acceso a la siguiente en la cola interna de la transmisión. Se completa o se rechaza con un resultado según el estado de la transmisión. Las diferentes posibilidades son las siguientes:

  • Si hay un fragmento disponible, la promesa se cumplirá con un objeto con el formato
    . { value: chunk, done: false }
  • Si se cierra la transmisión, la promesa se cumplirá con un objeto con el formato
    . { value: undefined, done: true }
  • Si se produce un error en la transmisión, se rechazará la promesa con el error relevante.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

La propiedad locked

Para comprobar si una transmisión legible está bloqueada, accede a su ReadableStream.locked propiedad.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Muestras de código de transmisión legibles

En la siguiente muestra de código, se muestran todos los pasos en acción. Primero, crea un ReadableStream que en su El argumento underlyingSource (es decir, la clase TimestampSource) define un método start(). Este método le indica al elemento controller de la transmisión que haga lo siguiente: enqueue() una marca de tiempo cada segundo durante diez segundos. Por último, le indica al controlador que haga close() la transmisión. Consumes este producto de transmisión creando un lector a través del método getReader() y llamando a read() hasta que se complete la transmisión. done

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iteración asíncrona

Es posible que verificar cada iteración de bucle read() si la transmisión es done no sea la API más conveniente. Por suerte, pronto habrá una mejor forma de hacerlo: la iteración asíncrona.

for await (const chunk of stream) {
  console.log(chunk);
}

Una solución alternativa para usar la iteración asíncrona en la actualidad es implementar el comportamiento con un polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Cómo vincular una transmisión legible

El método tee() de la La interfaz ReadableStream prepara la transmisión legible actual y muestra un array de dos elementos. que contiene las dos ramas resultantes como instancias nuevas de ReadableStream. Esto permite dos lectores lean una transmisión simultáneamente. Puedes hacer esto, por ejemplo, en un service worker si Si quieres recuperar una respuesta del servidor y transmitirla al navegador, y transmitirla al en la caché del service worker. Debido a que el cuerpo de una respuesta no se puede consumir más de una vez, necesitas dos copias para hacerlo. Para cancelar la transmisión, debes cancelar las dos ramas resultantes. Transmitir una transmisión en el teletipo por lo general, lo bloqueará durante el tiempo que esté, lo que evitará que otros lectores lo bloqueen.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Flujos de bytes legibles

Para las transmisiones que representan bytes, se proporciona una versión extendida de la transmisión legible para controlar bytes de forma eficiente, en particular, minimizando las copias. Las transmisiones de bytes permiten usar el búfer propio (BYOB) que se deben adquirir. La implementación predeterminada puede brindar una variedad de resultados como cadenas o búferes de array en el caso de WebSockets, mientras que las transmisiones de bytes garantizan la salida en bytes. Además, los lectores de BYOB ofrecen estabilidad. Este es porque, si un búfer se desconecta, puede garantizar que no se escriba dos veces en el mismo búfer. por lo que se evitan las condiciones de carrera. Los lectores de BYOB pueden reducir la cantidad de veces que debe ejecutarse el navegador. a la recolección de elementos no utilizados, ya que puede reutilizar búferes.

Crea un flujo de bytes legible

Puedes crear un flujo de bytes legible pasando un parámetro type adicional al ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

La fuente subyacente de un flujo de bytes legible recibe un ReadableByteStreamController para manipular. Su método ReadableByteStreamController.enqueue() toma un argumento chunk cuyo valor es un ArrayBufferView. La propiedad ReadableByteStreamController.byobRequest muestra el valor Solicitud de extracción BYOB o nula si no hay ninguna. Por último, el elemento ReadableByteStreamController.desiredSize La propiedad muestra el tamaño deseado para llenar la cola interna de la transmisión controlada.

queuingStrategy

El segundo argumento, también opcional, del constructor ReadableStream() es queuingStrategy. Es un objeto que, de forma opcional, define una estrategia de puesta en cola para la transmisión, parámetro:

  • highWaterMark: Es una cantidad no negativa de bytes que indica la marca de agua alta de la transmisión con esta estrategia de fila. Se usa para determinar la contrapresión, que se manifiesta a través de la propiedad ReadableByteStreamController.desiredSize adecuada. También determina cuándo se llama al método pull() de la fuente subyacente.

Los métodos getReader() y read()

Luego, puedes acceder a un ReadableStreamBYOBReader configurando el parámetro mode según corresponda: ReadableStream.getReader({ mode: "byob" }) Esto permite un control más preciso sobre el búfer una asignación para evitar las copias. Para leer desde el flujo de bytes, debes llamar ReadableStreamBYOBReader.read(view), donde view es un ArrayBufferView

Muestra de código de flujo de bytes legible

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

La siguiente función muestra flujos de bytes legibles que permiten una lectura eficiente de copia cero de un array generado de forma aleatoria. En lugar de usar un tamaño de fragmento predeterminado de 1,024, intenta rellenar el búfer proporcionado por el desarrollador, lo que permite un control total.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

La mecánica de una transmisión que admite escritura

Una transmisión grabable es un destino en el que puedes escribir datos, representados en JavaScript por un Objeto WritableStream. Esta sirve como una abstracción sobre un receptor subyacente, un receptor de E/S de nivel inferior en el que se escriben datos sin procesar.

Los datos se escriben en el flujo mediante un escritor, un fragmento a la vez. Un bloque puede tomar un múltiples formas, como los fragmentos de un lector. Puedes usar el código que quieras producir los fragmentos listos para escribir; el escritor y el código asociado se denominan productor.

Cuando se crea un escritor y comienza a escribir en una transmisión (un escritor activo), se dice que bloqueado en él. Solo un escritor puede escribir en una transmisión con escritura a la vez. Si quieres otro escritor para comenzar a escribir en tu transmisión, por lo general, debes liberarlo, antes de adjuntarlo a otro escritor.

Una cola interna realiza un seguimiento de los fragmentos que se escribieron en la transmisión, pero que aún no lo hicieron. y procesada por el receptor subyacente.

Una estrategia de cola es un objeto que determina cómo una transmisión debe indicar la contrapresión según el estado de su cola interna. La estrategia de cola asigna un tamaño a cada fragmento y compara el tamaño total de todos los fragmentos de la cola a un número específico, conocido como marca de agua alta.

La construcción final se denomina controlador. Cada transmisión con capacidad de escritura tiene un controlador asociado que te permite controlar la transmisión (por ejemplo, anularla).

Cómo crear una transmisión que admite escritura

La interfaz WritableStream de La API de Streams proporciona una abstracción estándar para escribir datos de transmisión en un destino, conocido como receptor. Este objeto viene con contrapresión y puesta en cola integrados. Creas una transmisión con capacidad de escritura llamando a su constructor WritableStream() Tiene un parámetro underlyingSink opcional, que representa un objeto. con métodos y propiedades que definan cómo se comportará la instancia de transmisión construida.

underlyingSink

underlyingSink puede incluir los siguientes métodos opcionales definidos por el desarrollador. El controller que se pasa a algunos métodos es la WritableStreamDefaultController

  • start(controller): Se llama a este método de inmediato cuando se construye el objeto. El El contenido de este método debería tener como objetivo obtener acceso al receptor subyacente. Si este proceso se va a de forma asíncrona, puede mostrar una promesa para indicar el éxito o el fracaso.
  • write(chunk, controller): se llamará a este método cuando un nuevo fragmento de datos (especificado en parámetro chunk) está listo para escribirse en el receptor subyacente. Puede devolver una promesa a indica el éxito o fracaso de la operación de escritura. Se llamará a este método solo después de la acción anterior escrituras correctas, y nunca después de que se cierre o anule la transmisión.
  • close(controller): Se llamará a este método si la app indica que terminó de escribir. bloques al flujo. El contenido debe hacer todo lo necesario para finalizar las operaciones de escritura en el receptor subyacente y liberar acceso a él. Si este proceso es asíncrono, puede mostrar un prometen indicar el éxito o el fracaso. Se llamará a este método solo después de todas las escrituras en cola tuvieron éxito.
  • abort(reason): Se llamará a este método si la app indica que desea cerrarse de forma abrupta. la transmisión y ponerla en un estado de error. Puede limpiar los recursos retenidos, al igual que close(), pero se llamará a abort() incluso si las operaciones de escritura están en cola. Se arrojarán esos fragmentos de distancia. Si este proceso es asíncrono, puede mostrar una promesa para indicar el éxito o el fracaso. El El parámetro reason contiene un DOMString que describe por qué se anuló la transmisión.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

El WritableStreamDefaultController de la API de Streams, que representa un controlador que permite controlar el estado de WritableStream durante la configuración, a medida que se envían más bloques para que se escriban o al final de la escritura. Al construir un WritableStream, el receptor subyacente recibe un WritableStreamDefaultController correspondiente para manipular. WritableStreamDefaultController tiene solo un método: WritableStreamDefaultController.error(), lo que genera errores en todas las interacciones futuras con la transmisión asociada. WritableStreamDefaultController también admite una propiedad signal que muestra una instancia de AbortSignal: lo que permite que se detenga una operación WritableStream si es necesario.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

El segundo argumento, también opcional, del constructor WritableStream() es queuingStrategy. Es un objeto que, de manera opcional, define una estrategia de puesta en cola para la transmisión, que toma dos parámetros:

  • highWaterMark: Es un número no negativo que indica la marca de agua alta de la transmisión con esta estrategia de fila.
  • size(chunk): Es una función que calcula y muestra el tamaño finito no negativo del valor del fragmento dado. El resultado se usa para determinar la contrapresión, que se manifiesta mediante la propiedad WritableStreamDefaultWriter.desiredSize adecuada.

Los métodos getWriter() y write()

Para escribir en una transmisión que admite escritura, necesitas un escritor, que será un WritableStreamDefaultWriter El método getWriter() de la interfaz WritableStream muestra un nueva instancia de WritableStreamDefaultWriter y bloquea la transmisión a esa instancia. Si bien el la transmisión está bloqueada y no se puede adquirir ningún otro escritor hasta que se lance el actual.

La write() método de la WritableStreamDefaultWriter de datos, escribe un fragmento de datos pasado a WritableStream y su receptor subyacente, y luego muestra una promesa que se resuelve para indicar el éxito o el fracaso de la operación de escritura. Ten en cuenta "éxito" depende del receptor subyacente; puede indicar que el bloque ha sido aceptado, y no necesariamente que se guarde de forma segura en su destino final.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

La propiedad locked

Para comprobar si una transmisión grabable está bloqueada, accede a su WritableStream.locked propiedad.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Muestra de código de transmisión que admite escritura

En la siguiente muestra de código, se muestran todos los pasos en acción.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Canaliza una transmisión legible a una que admite escritura

Una transmisión legible se puede canalizar a una transmisión grabable a través del pipeTo(). ReadableStream.pipeTo() canaliza el objeto ReadableStream actual a una WritableStream determinada y muestra un prometida que se cumple cuando el proceso de canalización se completa con éxito o se rechaza si se produjeron errores que encuentran.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Crea una transmisión de transformación

La interfaz TransformStream de la API de Streams representa un conjunto de datos transformables. Tú crea una transmisión de transformación llamando a su constructor TransformStream(), que crea y muestra un objeto de transmisión de transformación a partir de los controladores dados. El constructor TransformStream() acepta como su primer argumento es un objeto de JavaScript opcional que representa el transformer. Estos objetos pueden contener cualquiera de los siguientes métodos:

transformer

  • start(controller): Se llama a este método de inmediato cuando se construye el objeto. Precio habitual se usa para poner en cola fragmentos de prefijo con controller.enqueue(). Esos fragmentos se leerán del lado legible, pero no depende de las operaciones de escritura. Si esta inicial es asíncrono, por ejemplo, porque se necesita un poco de esfuerzo para adquirir los fragmentos de prefijos, la función puede devolver una promesa para indicar el éxito o el fracaso; una promesa rechazada generará un error en tiempo real. El constructor TransformStream() volverá a arrojar cualquier excepción que se genere.
  • transform(chunk, controller): se llama a este método cuando se escribe originalmente un nuevo fragmento en el del lado que admite escritura está listo para transformarse. La implementación de transmisión garantiza que esta función se llamará solo después de que las transformaciones anteriores se hayan completado correctamente, y nunca antes de que start() haya completado o después de que se haya llamado a flush(). Esta función realiza la transformación real trabajo de la transmisión de transformación. Puede poner los resultados en cola con controller.enqueue(). Esta permite que un solo fragmento escrito en el lado que admite escritura resulte en cero o múltiples fragmentos en el legible, según la cantidad de veces que se llame a controller.enqueue() Si el proceso de una transformación es asíncrona, esta función puede mostrar una promesa para indicar el éxito o el fracaso de la transformación. Una promesa rechazada generará errores tanto en el lado legible como en el que admite escritura de la y una transmisión de transformación. Si no se proporciona un método transform(), se usa la transformación de identidad, que pone en cola fragmentos sin cambios desde el lado que admite escritura hacia el lado legible.
  • flush(controller): Se llama a este método después de que se hayan escrito todos los fragmentos escritos en el lado que admite escritura. se transforman pasando con éxito a través de transform(), y el lado que admite escritura está a punto de ser cerrado. Por lo general, se usa para poner en cola fragmentos de sufijos en el lado legible, antes de eso. se cierra. Si el proceso de limpieza es asíncrono, la función puede mostrar una promesa a indicar un éxito o un fracaso; el resultado se comunicará al emisor de stream.writable.write() Además, una promesa rechazada generará errores tanto en lados de escritura de la transmisión. Si se arroja una excepción, se considera igual que si se muestra una solicitud muy prometedores.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Las estrategias de cola writableStrategy y readableStrategy

El segundo y el tercer parámetro opcional del constructor TransformStream() son opcionales Estrategias de cola writableStrategy y readableStrategy. Se definen tal como se describe en el legible y el flujo writable respectivamente.

Muestra de código de transmisión de transformación

En la siguiente muestra de código, se observa una transmisión de transformación simple en acción.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Canaliza una transmisión legible a través de una transmisión de transformación

La pipeThrough() de la interfaz ReadableStream proporciona una manera encadenada de canalizar la transmisión actual. a través de una transmisión de transformación o cualquier otro par de lectura/escritura. Por lo general, si se canaliza una transmisión, se bloqueará mientras dure la canalización, lo que evita que otros lectores la bloqueen.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

La siguiente muestra de código (un poco forzada) muestra cómo podrías implementar un “grito” versión de fetch() que pone en mayúsculas todo el texto consumiendo la promesa de respuesta que se muestra como una transmisión y a mayúsculas y minúsculas fragmento por fragmento. La ventaja de este enfoque es que no debes esperar todo el documento en descargarse, lo que puede marcar una gran diferencia cuando se trata de archivos grandes.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demostración

En la siguiente demostración, se muestran transmisiones legibles, de escritura y de transformación en acción. También se incluyen ejemplos de las cadenas de canalización pipeThrough() y pipeTo(), y también muestra tee(). De manera opcional, puedes ejecutar la demostración en su propia ventana o consulta la código fuente.

Transmisiones útiles disponibles en el navegador

Existen varios flujos útiles integrados directamente en el navegador. Puedes crear fácilmente un ReadableStream de un BLOB. La Blob el método stream() de la interfaz muestra Un objeto ReadableStream que, durante la lectura, muestra los datos contenidos en el BLOB. Recuerda también que un El objeto File es un tipo específico de Blob, y se puede usar en cualquier contexto que un BLOB.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Las variantes de transmisión de TextDecoder.decode() y TextEncoder.encode() se llaman TextDecoderStream y TextEncoderStream respectivamente.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Comprimir o descomprimir un archivo es fácil con el CompressionStream y Transmisiones de transformación DecompressionStream respectivamente. En el siguiente ejemplo de código, se muestra cómo descargar y comprimir (gzip) la especificación de transmisiones directamente en el navegador y escribir el archivo comprimido directamente en el disco.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Las API de File System Access FileSystemWritableFileStream y las transmisiones de solicitudes fetch() experimentales son ejemplos de flujos escribibles en el exterior.

La API de Serial usa en gran medida las transmisiones legibles y que admiten escritura.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Por último, la API de WebSocketStream integra transmisiones con la API de WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Recursos útiles

Agradecimientos

Este artículo fue revisado por Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley y Arroz Adán. Las entradas de blog de Jake Archibald me ayudaron mucho a comprender transmisiones continuas. Algunas de las muestras de código están inspiradas en un usuario de GitHub las exploraciones de @bellbind y partes de la prosa se basan en gran medida en la Documentos web de MDN sobre transmisiones. El Streams Standard autores han hecho un gran trabajo en escribiendo esta especificación. Hero image de Ryan Lara en Quitar salpicaduras.